# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: Copyright (c) 2026, Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
"""MCAP trace file writer with OSI-compliant metadata support.
Writes OSI trace files in the MCAP container format with multi-channel support,
schema registration via FileDescriptorSet, and OSI metadata validation.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import IO
import google.protobuf
from google.protobuf.message import EncodeError, Message
from mcap.exceptions import McapError
from mcap.well_known import MessageEncoding
from mcap.writer import CompressionType
from mcap.writer import Writer as McapRawWriter
from osi_utilities.tracefile._config import (
DEFAULT_CHUNK_SIZE,
MAX_CHUNK_SIZE,
MIN_CHUNK_SIZE,
OSI_CHANNEL_RECOMMENDED_METADATA_KEYS,
OSI_CHANNEL_REQUIRED_METADATA_KEYS,
OSI_TRACE_METADATA_NAME,
OSI_TRACE_RECOMMENDED_METADATA_KEYS,
OSI_TRACE_REQUIRED_METADATA_KEYS,
)
from osi_utilities.tracefile._mcap_utils import build_file_descriptor_set
from osi_utilities.tracefile.timestamp import timestamp_to_nanoseconds
from osi_utilities.tracefile.writer import TraceFileWriter
logger = logging.getLogger(__name__)
_COMPRESSION_MAP: dict[str, CompressionType] = {
"none": CompressionType.NONE,
"lz4": CompressionType.LZ4,
"zstd": CompressionType.ZSTD,
}
def _get_package_version() -> str:
"""Get the package version from importlib.metadata, falling back to vcpkg.json."""
try:
from importlib.metadata import version
return version("asam-osi-utilities")
except (ImportError, ModuleNotFoundError):
return "0.0.0"
def prepare_required_file_metadata() -> dict[str, str]:
"""Prepare the required 'net.asam.osi.trace' metadata with default values.
Returns a dict with required keys populated with placeholder/current values.
"""
return {
"version": _get_package_version(),
"min_osi_version": "",
"max_osi_version": "",
"min_protobuf_version": google.protobuf.__version__,
"max_protobuf_version": google.protobuf.__version__,
"creation_time": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
def _validate_file_metadata(metadata: dict[str, str]) -> None:
"""Validate net.asam.osi.trace metadata completeness."""
missing_required = OSI_TRACE_REQUIRED_METADATA_KEYS - metadata.keys()
missing_recommended = OSI_TRACE_RECOMMENDED_METADATA_KEYS - metadata.keys()
if missing_required:
logger.warning("Missing required 'net.asam.osi.trace' metadata: %s", ", ".join(missing_required))
if missing_recommended:
logger.info("Missing recommended 'net.asam.osi.trace' metadata: %s", ", ".join(missing_recommended))
def _validate_channel_metadata(metadata: dict[str, str]) -> None:
"""Validate net.asam.osi.trace.channel metadata completeness."""
missing_required = OSI_CHANNEL_REQUIRED_METADATA_KEYS - metadata.keys()
missing_recommended = OSI_CHANNEL_RECOMMENDED_METADATA_KEYS - metadata.keys()
if missing_required:
logger.warning("Missing required channel metadata: %s", ", ".join(missing_required))
if missing_recommended:
logger.info("Missing recommended channel metadata: %s", ", ".join(missing_recommended))
[docs]
class MCAPTraceFileWriter(TraceFileWriter):
"""Writer for MCAP-format OSI trace files (.mcap).
Supports multi-channel writing with schema registration,
OSI-compliant file/channel metadata, and FileDescriptorSet-based schemas.
"""
def __init__(self) -> None:
self._file: IO[bytes] | None = None
self._mcap_writer: McapRawWriter | None = None
self._path: Path | None = None
self._active_channels: dict[str, int] = {} # topic -> channel_id
self._channel_metadata: dict[str, dict[str, str]] = {}
self._schema_cache: dict[str, int] = {} # schema_name -> schema_id
self._written_count = 0
[docs]
def open(
self,
path: Path,
metadata: dict[str, str] | None = None,
*,
compression: str | None = None,
chunk_size: int | None = None,
) -> bool:
"""Open an MCAP file for writing.
Args:
path: Path to the output file. Must have .mcap extension.
metadata: Optional net.asam.osi.trace file metadata. If None, default metadata is used.
compression: Compression algorithm — ``"none"``, ``"lz4"``, or ``"zstd"``.
If *None*, the mcap library default is used.
chunk_size: Chunk size in bytes. Must be between ``MIN_CHUNK_SIZE`` and ``MAX_CHUNK_SIZE``.
If *None*, ``DEFAULT_CHUNK_SIZE`` is used.
Returns:
True on success, False on failure.
"""
if self._file is not None:
logger.error("Opening file '%s', writer has already a file opened", path)
return False
if path.suffix.lower() != ".mcap":
logger.error("MCAP files must have .mcap extension, got '%s'", path.suffix)
return False
if compression is not None:
compression_lower = compression.lower()
if compression_lower not in _COMPRESSION_MAP:
logger.error(
"Invalid compression '%s'. Must be one of: %s",
compression,
", ".join(_COMPRESSION_MAP),
)
return False
mcap_compression = _COMPRESSION_MAP[compression_lower]
else:
mcap_compression = None
effective_chunk_size = chunk_size if chunk_size is not None else DEFAULT_CHUNK_SIZE
if effective_chunk_size < MIN_CHUNK_SIZE or effective_chunk_size > MAX_CHUNK_SIZE:
logger.error(
"chunk_size %d out of range [%d, %d]",
effective_chunk_size,
MIN_CHUNK_SIZE,
MAX_CHUNK_SIZE,
)
return False
try:
self._file = open(path, "wb") # noqa: SIM115
writer_kwargs: dict[str, object] = {"chunk_size": effective_chunk_size}
if mcap_compression is not None:
writer_kwargs["compression"] = mcap_compression
self._mcap_writer = McapRawWriter(self._file, **writer_kwargs) # type: ignore[arg-type]
self._mcap_writer.start(library="osi-utilities-python")
self._path = path
file_metadata = metadata if metadata is not None else prepare_required_file_metadata()
_validate_file_metadata(file_metadata)
self._mcap_writer.add_metadata(name=OSI_TRACE_METADATA_NAME, data=file_metadata)
self._written_count = 0
return True
except (OSError, McapError) as e:
logger.error("Failed to open MCAP file '%s' for writing: %s", path, e)
if self._file is not None:
self._file.close()
self._file = None
return False
[docs]
def add_channel(
self,
topic: str,
message_class: type[Message],
metadata: dict[str, str] | None = None,
) -> int:
"""Register an OSI channel with schema.
Args:
topic: Channel topic name.
message_class: The protobuf message class for this channel.
metadata: Optional channel metadata dict.
Returns:
The channel ID.
Raises:
RuntimeError: If writer is not open or topic already exists.
"""
if self._mcap_writer is None:
raise RuntimeError("Writer is not open")
if topic in self._active_channels:
raise RuntimeError(f"Channel with topic '{topic}' already exists")
channel_meta = metadata if metadata is not None else {}
_validate_channel_metadata(channel_meta)
# Auto-fill protobuf version if not set
if "net.asam.osi.trace.channel.protobuf_version" not in channel_meta:
channel_meta["net.asam.osi.trace.channel.protobuf_version"] = google.protobuf.__version__
schema_name = f"osi3.{message_class.DESCRIPTOR.name}"
# Reuse schema if already registered
if schema_name not in self._schema_cache:
fds = build_file_descriptor_set(message_class)
schema_id = self._mcap_writer.register_schema(
name=schema_name,
encoding=MessageEncoding.Protobuf,
data=fds.SerializeToString(),
)
self._schema_cache[schema_name] = schema_id
channel_id = self._mcap_writer.register_channel(
topic=topic,
message_encoding=MessageEncoding.Protobuf,
schema_id=self._schema_cache[schema_name],
metadata=channel_meta,
)
self._active_channels[topic] = channel_id
self._channel_metadata[topic] = channel_meta
return channel_id
[docs]
def write_message(self, message: Message, topic: str = "") -> bool:
"""Write a protobuf message to the specified topic channel.
If no channels are registered, auto-creates one using the message type.
Args:
message: The protobuf message to write.
topic: The channel topic. If empty and only one channel exists, uses that channel.
Returns:
True on success, False on failure.
"""
if self._mcap_writer is None:
logger.error("Writer is not open")
return False
# Auto-create channel if none exist
if not self._active_channels:
auto_topic = topic or message.DESCRIPTOR.name
self.add_channel(auto_topic, type(message))
topic = auto_topic
# Default to single channel if topic not specified
if not topic and len(self._active_channels) == 1:
topic = next(iter(self._active_channels))
if topic not in self._active_channels:
logger.error("Topic '%s' not found. Available: %s", topic, list(self._active_channels.keys()))
return False
try:
data = message.SerializeToString()
log_time = timestamp_to_nanoseconds(message)
self._mcap_writer.add_message(
channel_id=self._active_channels[topic],
log_time=log_time,
data=data,
publish_time=log_time,
)
self._written_count += 1
return True
except (OSError, EncodeError, McapError) as e:
logger.error("Failed to write message to topic '%s': %s", topic, e)
return False
[docs]
def close(self) -> None:
"""Finalize and close the MCAP file."""
if self._mcap_writer is not None:
self._mcap_writer.finish()
logger.info(
"Wrote %d messages to channels [%s] in '%s'",
self._written_count,
", ".join(self._active_channels.keys()),
self._path,
)
self._mcap_writer = None
if self._file is not None:
self._file.close()
self._file = None
self._active_channels.clear()
self._channel_metadata.clear()
self._schema_cache.clear()
@property
def written_count(self) -> int:
return self._written_count