Source code for osi_utilities.tracefile.mcap_reader
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: Copyright (c) 2026, Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
"""MCAP trace file reader.
Reads OSI trace files in the MCAP container format with multi-channel support.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import IO
from mcap.exceptions import McapError
from mcap.reader import make_reader
from mcap.well_known import MessageEncoding
from mcap_protobuf.decoder import DecoderFactory
from osi_utilities.tracefile._types import (
SCHEMA_NAME_TO_MESSAGE_TYPE,
MessageType,
ReadResult,
)
from osi_utilities.tracefile.reader import TraceFileReader
logger = logging.getLogger(__name__)
[docs]
class MCAPTraceFileReader(TraceFileReader):
"""Reader for MCAP-format OSI trace files (.mcap).
Supports multi-channel reading with topic-based filtering,
schema-based message type detection, and metadata access.
"""
def __init__(self) -> None:
self._file: IO[bytes] | None = None
self._reader = None
self._message_iterator = None
self._summary = None
self._skip_non_osi_msgs = False
self._next_result: ReadResult | None = None
self._topics: list[str] | None = None
self._decoder_factory = DecoderFactory()
self._decoders: dict[int, object] = {}
[docs]
def open(self, path: Path) -> bool:
"""Open an MCAP trace file.
Args:
path: Path to the .mcap file.
Returns:
True on success, False on failure.
"""
try:
self._file = open(path, "rb") # noqa: SIM115
self._reader = make_reader(self._file, decoder_factories=[self._decoder_factory])
self._summary = self._reader.get_summary()
self._decoders.clear()
self._start_iteration()
return True
except (OSError, McapError) as e:
logger.error("Failed to open MCAP file '%s': %s", path, e)
if self._file is not None:
self._file.close()
self._file = None
return False
[docs]
def set_topics(self, topics: list[str]) -> None:
"""Filter reading to specific topics.
Args:
topics: List of topic names to read. If empty, reads all topics.
"""
self._topics = topics if topics else None
self._start_iteration()
[docs]
def set_skip_non_osi_msgs(self, skip: bool) -> None:
"""Configure whether to skip non-OSI messages.
Args:
skip: If True, silently skip messages with unrecognized schemas.
If False (default), they are skipped with a warning.
"""
self._skip_non_osi_msgs = skip
[docs]
def read_message(self) -> ReadResult | None:
"""Read the next message from the MCAP file.
Returns:
ReadResult on success, None if no more messages.
"""
if self._message_iterator is None:
return None
while True:
try:
schema, channel, raw_message = next(self._message_iterator)
except StopIteration:
return None
# Skip non-protobuf messages (e.g. JSON channels in mixed files)
if channel.message_encoding != MessageEncoding.Protobuf:
if not self._skip_non_osi_msgs:
logger.warning(
"Skipping non-protobuf message on channel '%s' (encoding: %s)",
channel.topic,
channel.message_encoding,
)
continue
# Decode protobuf message
decoder = self._decoders.get(raw_message.channel_id)
if decoder is None:
decoder = self._decoder_factory.decoder_for(channel.message_encoding, schema)
if decoder is None:
if not self._skip_non_osi_msgs:
logger.warning(
"Skipping message with no decoder for schema '%s'",
schema.name if schema else "None",
)
continue
self._decoders[raw_message.channel_id] = decoder
try:
proto_msg = decoder(raw_message.data)
except Exception as e:
logger.warning("Failed to decode message on channel '%s': %s", channel.topic, e)
continue
msg_type = SCHEMA_NAME_TO_MESSAGE_TYPE.get(schema.name, MessageType.UNKNOWN)
if msg_type == MessageType.UNKNOWN:
if not self._skip_non_osi_msgs:
logger.warning("Skipping non-OSI message with schema '%s'", schema.name)
continue
return ReadResult(
message=proto_msg,
message_type=msg_type,
channel_name=channel.topic,
)
[docs]
def has_next(self) -> bool:
"""Check if there are more messages.
Note: May return True even if only non-OSI messages remain.
"""
return self._message_iterator is not None
[docs]
def close(self) -> None:
"""Close the MCAP file."""
self._message_iterator = None
self._summary = None
self._reader = None
if self._file is not None:
self._file.close()
self._file = None
[docs]
def get_available_topics(self) -> list[str]:
"""Return list of available topic names in the file."""
if self._summary is None:
return []
return [channel.topic for channel in self._summary.channels.values()]
[docs]
def get_message_type_for_topic(self, topic: str) -> MessageType | None:
"""Return the MessageType for a given topic based on its schema."""
if self._summary is None:
return None
for channel in self._summary.channels.values():
if channel.topic == topic:
schema = self._summary.schemas.get(channel.schema_id)
if schema is not None:
return SCHEMA_NAME_TO_MESSAGE_TYPE.get(schema.name)
return None
def _start_iteration(self) -> None:
"""(Re)start the raw message iterator."""
if self._reader is not None:
self._message_iterator = iter(self._reader.iter_messages(topics=self._topics))