Source code for osi_utilities.tracefile.txth_reader
# SPDX-License-Identifier: MPL-2.0
# SPDX-FileCopyrightText: Copyright (c) 2026, Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
"""Text human-readable (.txth) trace file reader.
Reads OSI trace files in Google protobuf TextFormat where messages
are separated by a known top-level field boundary.
"""
from __future__ import annotations
import logging
from pathlib import Path
from google.protobuf import text_format
from osi_utilities.tracefile._types import (
MessageType,
ReadResult,
_get_message_class,
infer_message_type_from_filename,
)
from osi_utilities.tracefile.reader import TraceFileReader
logger = logging.getLogger(__name__)
[docs]
class TXTHTraceFileReader(TraceFileReader):
"""Reader for text human-readable OSI trace files (.txth).
Messages are stored in Google protobuf TextFormat. Each message is
delimited by reading until the text can be parsed as a complete message.
"""
def __init__(self, message_type: MessageType = MessageType.UNKNOWN) -> None:
self._message_type = message_type
self._message_class: type | None = None
self._has_next = False
self._buffer = ""
[docs]
def open(self, path: Path) -> bool:
"""Open a .txth trace file.
Args:
path: Path to the .txth file.
Returns:
True on success, False on failure.
"""
if self._message_type == MessageType.UNKNOWN:
self._message_type = infer_message_type_from_filename(path.name)
if self._message_type == MessageType.UNKNOWN:
logger.error("Cannot determine message type for '%s'. Specify it explicitly.", path)
return False
try:
self._message_class = _get_message_class(self._message_type)
except ValueError as e:
logger.error("Failed to get message class: %s", e)
return False
try:
self._buffer = Path(path).read_text(encoding="utf-8")
except OSError as e:
logger.error("Failed to open file '%s': %s", path, e)
return False
self._has_next = len(self._buffer.strip()) > 0
return True
[docs]
def read_message(self) -> ReadResult | None:
"""Read the next message from the text trace file.
Returns:
ReadResult on success, None if no more messages.
Raises:
RuntimeError: If parsing fails.
"""
if self._message_class is None or not self._buffer.strip():
self._has_next = False
return None
# Try to parse the entire remaining buffer as one message.
# If it contains multiple messages, we parse greedily: TextFormat
# will consume as much as it can for one message.
message = self._message_class()
try:
text_format.Parse(self._buffer, message)
self._buffer = ""
self._has_next = False
return ReadResult(message=message, message_type=self._message_type)
except text_format.ParseError:
# If full buffer fails, the file may have multiple concatenated messages.
# Try splitting on the first top-level field name appearing again.
logger.debug("Buffer contains multiple messages, splitting at field boundary.")
# Multi-message: find the boundary by looking for a repeated top-level field
# The C++ implementation reads line by line and tries parsing.
lines = self._buffer.split("\n")
if not lines:
self._has_next = False
return None
# Detect the first top-level field name (not indented, contains ':' or '{')
first_field = None
for line in lines:
stripped = line.strip()
if stripped and not stripped.startswith("#"):
# Extract field name
if ":" in stripped:
first_field = stripped.split(":")[0].strip()
elif "{" in stripped:
first_field = stripped.split("{")[0].strip()
break
if first_field is None:
self._has_next = False
return None
# Find the next occurrence of this field after the first message
split_idx = None
in_first_message = True
for i, line in enumerate(lines):
if i == 0:
continue
stripped = line.strip()
if stripped.startswith(first_field) and not line[0].isspace():
# This is a new top-level field with the same name = new message boundary
if in_first_message:
split_idx = i
break
if split_idx is not None:
msg_text = "\n".join(lines[:split_idx])
self._buffer = "\n".join(lines[split_idx:])
else:
msg_text = self._buffer
self._buffer = ""
message = self._message_class()
try:
text_format.Parse(msg_text, message)
except text_format.ParseError as e:
raise RuntimeError(f"Failed to parse text format message: {e}") from e
self._has_next = len(self._buffer.strip()) > 0
return ReadResult(message=message, message_type=self._message_type)
[docs]
def has_next(self) -> bool:
return self._has_next
[docs]
def close(self) -> None:
self._buffer = ""
self._has_next = False
@property
def message_type(self) -> MessageType:
return self._message_type