Source code for osi_utilities.tracefile.binary_reader
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: Copyright (c) 2026, Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
"""Binary (.osi) trace file reader.
Reads OSI trace files in the single-channel binary format where each message
is prefixed with a 4-byte little-endian uint32 length.
"""
from __future__ import annotations
import logging
import struct
from pathlib import Path
from typing import IO
from osi_utilities.tracefile._config import BINARY_MESSAGE_LENGTH_PREFIX_SIZE, MAX_EXPECTED_MESSAGE_SIZE
from osi_utilities.tracefile._types import (
MessageType,
ReadResult,
_get_message_class,
infer_message_type_from_filename,
)
from osi_utilities.tracefile.reader import TraceFileReader
logger = logging.getLogger(__name__)
[docs]
class BinaryTraceFileReader(TraceFileReader):
"""Reader for single-channel binary OSI trace files (.osi).
Each message is stored as: [4-byte LE length][serialized protobuf bytes]
The message type can be specified explicitly or inferred from the filename.
"""
def __init__(self, message_type: MessageType = MessageType.UNKNOWN) -> None:
"""Initialize the binary trace file reader.
Args:
message_type: The expected message type. If UNKNOWN, will be inferred from filename.
"""
self._message_type = message_type
self._file: IO[bytes] | None = None
self._message_class: type | None = None
self._has_next = False
[docs]
def open(self, path: Path) -> bool:
"""Open a binary .osi trace file.
Args:
path: Path to the .osi file.
Returns:
True on success, False on failure.
"""
if self._message_type == MessageType.UNKNOWN:
self._message_type = infer_message_type_from_filename(path.name)
if self._message_type == MessageType.UNKNOWN:
logger.error("Cannot determine message type for '%s'. Specify it explicitly.", path)
return False
try:
self._message_class = _get_message_class(self._message_type)
except ValueError as e:
logger.error("Failed to get message class: %s", e)
return False
try:
self._file = open(path, "rb") # noqa: SIM115
except OSError as e:
logger.error("Failed to open file '%s': %s", path, e)
return False
self._has_next = self._peek_has_data()
return True
[docs]
def open_with_type(self, path: Path, message_type: MessageType) -> bool:
"""Open a binary .osi trace file with an explicit message type.
Args:
path: Path to the .osi file.
message_type: The message type to use.
Returns:
True on success, False on failure.
"""
self._message_type = message_type
return self.open(path)
[docs]
def read_message(self) -> ReadResult | None:
"""Read the next message from the binary trace file.
Returns:
ReadResult on success, None if no more messages.
Raises:
RuntimeError: If the message is truncated or deserialization fails.
"""
if self._file is None or self._message_class is None:
return None
length_bytes = self._file.read(BINARY_MESSAGE_LENGTH_PREFIX_SIZE)
if not length_bytes:
self._has_next = False
return None
if len(length_bytes) < BINARY_MESSAGE_LENGTH_PREFIX_SIZE:
raise RuntimeError("Truncated length header in binary trace file")
(msg_len,) = struct.unpack("<I", length_bytes)
if msg_len > MAX_EXPECTED_MESSAGE_SIZE:
raise RuntimeError(f"Message size {msg_len} exceeds maximum expected size {MAX_EXPECTED_MESSAGE_SIZE}")
data = self._file.read(msg_len)
if len(data) < msg_len:
raise RuntimeError(f"Truncated message body: expected {msg_len} bytes, got {len(data)}")
message = self._message_class()
try:
message.ParseFromString(data)
except Exception as e:
raise RuntimeError(f"Failed to deserialize protobuf message ({msg_len} bytes): {e}") from e
self._has_next = self._peek_has_data()
return ReadResult(message=message, message_type=self._message_type)
[docs]
def has_next(self) -> bool:
"""Check if there are more messages to read."""
return self._has_next
[docs]
def close(self) -> None:
"""Close the trace file."""
if self._file is not None:
self._file.close()
self._file = None
self._has_next = False
@property
def message_type(self) -> MessageType:
"""The message type being read."""
return self._message_type
def _peek_has_data(self) -> bool:
"""Check if there is more data without consuming it."""
if self._file is None:
return False
pos = self._file.tell()
data = self._file.read(1)
if data:
self._file.seek(pos)
return True
return False