Source code for osi_utilities.tracefile.binary_writer

# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: Copyright (c) 2026, Bayerische Motoren Werke Aktiengesellschaft (BMW AG)

"""Binary (.osi) trace file writer.

Writes OSI trace files in the single-channel binary format where each message
is prefixed with a 4-byte little-endian uint32 length.
"""

from __future__ import annotations

import logging
import struct
from pathlib import Path
from typing import IO

from google.protobuf.message import EncodeError, Message

from osi_utilities.tracefile.writer import TraceFileWriter

logger = logging.getLogger(__name__)


[docs] class BinaryTraceFileWriter(TraceFileWriter): """Writer for single-channel binary OSI trace files (.osi). Each message is stored as: [4-byte LE length][serialized protobuf bytes] """ def __init__(self) -> None: self._file: IO[bytes] | None = None self._path: Path | None = None self._written_count = 0
[docs] def open(self, path: Path) -> bool: """Open a binary .osi trace file for writing. Args: path: Path to the output file. Must have .osi extension. Returns: True on success, False on failure. """ if self._file is not None: logger.error("Opening file '%s', writer has already a file opened", path) return False if path.suffix.lower() != ".osi": logger.error("Binary trace files must have .osi extension, got '%s'", path.suffix) return False try: self._file = open(path, "wb") # noqa: SIM115 self._path = path self._written_count = 0 return True except OSError as e: logger.error("Failed to open file '%s' for writing: %s", path, e) return False
[docs] def write_message(self, message: Message, topic: str = "") -> bool: """Write a protobuf message to the binary trace file. Args: message: The protobuf message to serialize and write. topic: Ignored for single-channel binary files. Returns: True on success, False on failure. """ if self._file is None: logger.error("Writer is not open") return False try: data = message.SerializeToString() self._file.write(struct.pack("<I", len(data))) self._file.write(data) self._written_count += 1 return True except (OSError, EncodeError) as e: logger.error("Failed to write message: %s", e) return False
[docs] def close(self) -> None: """Close the trace file.""" if self._file is not None: self._file.close() logger.info("Wrote %d messages to '%s'", self._written_count, self._path) self._file = None
@property def written_count(self) -> int: """Number of messages written so far.""" return self._written_count