import asyncio import argparse import contextlib import logging import re from typing import Any import obd from textual.app import App, ComposeResult from textual.containers import Container from textual.widgets import DataTable, Digits, Footer, Header, Input, Label, RichLog, Static from models import Report from obd2_interface import OBD2Interface, SimulatedOBD2Interface, format_obd_value METRICS = [ ("speed", "Speed", "mph"), ("rpm", "RPM", ""), ("fuel", "Fuel", "%"), ("coolant-temp", "Coolant Temp", "F"), ("oil-temp", "Oil Temp", "F"), ] MODE_LABELS = { 1: "Mode 01 Current Data", 2: "Mode 02 Freeze Frame", 3: "Mode 03 Stored DTCs", 4: "Mode 04 Clear DTCs", 6: "Mode 06 Onboard Monitoring", 7: "Mode 07 Pending DTCs", 9: "Mode 09 Vehicle Info", } MODE_ORDER = [1, 2, 3, 4, 6, 7, 9] def format_metric_value(report: Report, metric_id: str) -> str: values = { "speed": f"{report.speed_mph:05.1f}", "rpm": f"{report.rpm:05.0f}", "fuel": f"{report.fuel_level_pct:05.1f}", "oil-temp": f"{report.oil_temp_f:05.1f}", "coolant-temp": f"{report.coolant_temp_f:05.1f}", } return values[metric_id] def parse_duration_ms(value: str) -> int: match = re.fullmatch(r"\s*(\d+(?:\.\d+)?)\s*(ms|s|m|h)?\s*", value, re.IGNORECASE) if match is None: raise ValueError(f"Invalid duration: {value}") amount = float(match.group(1)) unit = (match.group(2) or "ms").lower() multipliers = { "ms": 1, "s": 1000, "m": 60_000, "h": 3_600_000, } return max(0, int(round(amount * multipliers[unit]))) class RichLogHandler(logging.Handler): def __init__(self, app: "OBD2App") -> None: super().__init__() self.app = app def emit(self, record: logging.LogRecord) -> None: message = self.format(record) try: self.app.call_from_thread(self.app.write_log, message) except RuntimeError: self.app.write_log(message) class OBD2App(App[None]): CSS_PATH = "ui.css" POLL_CONCURRENCY = 12 POLL_INTERVAL = 0.01 RECONNECT_DELAY = 5.0 BINDINGS = [ ("q", "quit", "Quit"), ("b", "toggle_border", "Toggle border"), ("e", "focus_ttl", "Edit TTL"), ("escape", "focus_table", "Focus Table"), ("left", "previous_mode", "Prev Mode"), ("right", "next_mode", "Next Mode"), ("shift+up", "jump_table_top", "Top"), ("shift+down", "jump_table_bottom", "Bottom"), ("1", "select_mode(1)", "Mode 1"), ("2", "select_mode(2)", "Mode 2"), ("3", "select_mode(3)", "Mode 3"), ("4", "select_mode(4)", "Mode 4"), ("6", "select_mode(6)", "Mode 6"), ("7", "select_mode(7)", "Mode 7"), ("9", "select_mode(9)", "Mode 9"), ] def __init__( self, report: Report, interface_factory: type[OBD2Interface] = OBD2Interface, query_rate_limit: float = 10.0, ttl_config_path: str | None = None, ): super().__init__() self.report = report self.interface_factory = interface_factory self.query_rate_limit = query_rate_limit self.ttl_config_path = ttl_config_path self.logger = logging.getLogger("obd2") self.interface: OBD2Interface | None = None self.poll_task: asyncio.Task[None] | None = None self.double_border = False self.selected_mode = 1 self.last_command_values: dict[str, str] = {} self.command_value_cache: dict[str, str] = {} self.log_handler: RichLogHandler | None = None self.ttl_edit_command: str | None = None def compose(self) -> ComposeResult: yield Header() yield Container( Container(*[self.compose_metric(metric_id, label, unit) for metric_id, label, unit in METRICS], id="dashboard"), Container( DataTable(id="commands-table"), Container( Label("Selected TTL (ms)", id="ttl-editor-label"), Input(placeholder="e.g. 10ms, 30s, 1m, 1h", id="ttl-editor"), id="ttl-editor-pane", ), id="table-pane", ), id="main-pane", ) yield Static("", id="mode-banner") yield RichLog(id="log-panel", auto_scroll=True, wrap=True, highlight=True, markup=False) yield Footer() def compose_metric(self, metric_id: str, label: str, unit: str) -> Container: unit_text = unit or "value" return Container( Label(label, classes="metric-label"), Digits(format_metric_value(self.report, metric_id), id=f"{metric_id}-digits", classes="metric-digits"), Label(unit_text, classes="metric-unit"), classes="metric-card", ) async def on_mount(self) -> None: self.configure_logging() self.interface = self.interface_factory( self.logger, query_rate_limit=self.query_rate_limit, ttl_config_path=self.ttl_config_path, ) await self.interface.reload_ttl_config(force=True) self.configure_commands_table() self.load_mode_table(self.selected_mode) self.poll_task = asyncio.create_task(self.poll_commands()) self.logger.info("Mounted OBD2 dashboard") async def on_unmount(self) -> None: if self.interface is not None: await self.interface.stop() if self.poll_task is not None: self.poll_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self.poll_task self.teardown_logging() def action_toggle_border(self) -> None: self.double_border = not self.double_border border = ("double", "yellow") if self.double_border else ("solid", "white") for card in self.query(".metric-card"): card.styles.border = border def action_select_mode(self, mode: int) -> None: if mode not in MODE_LABELS: return self.selected_mode = mode self.load_mode_table(mode) self.logger.info("Selected %s", MODE_LABELS[mode]) def action_previous_mode(self) -> None: index = MODE_ORDER.index(self.selected_mode) self.action_select_mode(MODE_ORDER[(index - 1) % len(MODE_ORDER)]) def action_next_mode(self) -> None: index = MODE_ORDER.index(self.selected_mode) self.action_select_mode(MODE_ORDER[(index + 1) % len(MODE_ORDER)]) def action_focus_ttl(self) -> None: editor = self.query_one("#ttl-editor", Input) editor.focus() editor.action_end() def action_focus_table(self) -> None: self.query_one("#commands-table", DataTable).focus() def action_jump_table_top(self) -> None: table = self.query_one("#commands-table", DataTable) table.focus() if table.row_count > 0: table.move_cursor(row=0) table.scroll_to(y=0, animate=False) def action_jump_table_bottom(self) -> None: table = self.query_one("#commands-table", DataTable) table.focus() if table.row_count > 0: last_row = table.row_count - 1 table.move_cursor(row=last_row) table.scroll_to(y=table.max_scroll_y, animate=False) def on_click(self, event) -> None: if getattr(event.widget, "id", None) in {"ttl-editor", "ttl-editor-pane", "ttl-editor-label"}: self.action_focus_ttl() def configure_logging(self) -> None: self.log_handler = RichLogHandler(self) self.log_handler.setFormatter( logging.Formatter( "%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) ) root_logger = logging.getLogger() root_logger.handlers.clear() root_logger.setLevel(logging.INFO) root_logger.addHandler(self.log_handler) self.logger.propagate = True def teardown_logging(self) -> None: if self.log_handler is None: return root_logger = logging.getLogger() with contextlib.suppress(ValueError): root_logger.removeHandler(self.log_handler) self.log_handler = None def write_log(self, message: str) -> None: self.query_one("#log-panel", RichLog).write(message) def configure_commands_table(self) -> None: table = self.query_one("#commands-table", DataTable) table.cursor_type = "row" table.zebra_stripes = True table.show_row_labels = False table.add_column("Code", key="code", width=8) table.add_column("Name", key="name", width=24) table.add_column("Description", key="desc", width=34) table.add_column("TTL (ms)", key="ttl", width=10) table.add_column("Value", key="value", width=22) def load_mode_table(self, mode: int) -> None: table = self.query_one("#commands-table", DataTable) table.clear(columns=False) self.last_command_values = {} commands = self.get_mode_commands(mode) for command in commands: pid = getattr(command, "pid", None) code = f"{mode:02X}.{pid:02X}" if pid is not None else f"{mode:02X}" cached_value = self.command_value_cache.get(command.name, "--") ttl_value = self.get_command_ttl_display(command.name) table.add_row(code, command.name, getattr(command, "desc", ""), ttl_value, cached_value, key=command.name) self.last_command_values[command.name] = cached_value banner = self.query_one("#mode-banner", Static) banner.update(f"{MODE_LABELS[mode]} Press 1,2,3,4,6,7,9 to switch modes") if commands: self.set_ttl_editor(commands[0].name) def get_mode_commands(self, mode: int) -> list[object]: return [command for command in obd.commands[mode] if command is not None] def get_command_ttl_display(self, command_name: str) -> str: if self.interface is None: return "--" return str(self.interface.get_command_ttl_ms(command_name)) def set_ttl_editor(self, command_name: str) -> None: self.ttl_edit_command = command_name self.query_one("#ttl-editor-label", Label).update(f"Selected TTL (ms) for {command_name}") self.query_one("#ttl-editor", Input).value = self.get_command_ttl_display(command_name) def update_metric_from_command(self, metric_id: str, raw_value: object) -> None: if self.interface is None: return numeric_value = self.interface.normalize_metric_value(metric_id, raw_value) setters = { "speed": self.report.set_speed, "rpm": self.report.set_rpm, "fuel": self.report.set_fuel, "oil-temp": self.report.set_oil_temp, "coolant-temp": self.report.set_coolant_temp, } setters[metric_id](numeric_value) self.query_one(f"#{metric_id}-digits", Digits).update(format_metric_value(self.report, metric_id)) def build_query_plan(self, mode: int) -> list[tuple[str | None, Any]]: if self.interface is None: return [] query_plan: list[tuple[str | None, Any]] = [] seen_names: set[str] = set() for metric_id, command in self.interface.telemetry_commands.items(): command_name = getattr(command, "name", str(command)) query_plan.append((metric_id, command)) seen_names.add(command_name) visible_command_names = self.get_visible_command_names() for command in self.get_mode_commands(mode): command_name = getattr(command, "name", str(command)) if command_name not in visible_command_names: continue if command_name in seen_names: continue query_plan.append((None, command)) seen_names.add(command_name) return query_plan def get_visible_command_names(self) -> set[str]: table = self.query_one("#commands-table", DataTable) if table.row_count == 0: return set() first_visible_row = max(0, int(table.scroll_y)) visible_height = max(1, table.scrollable_content_region.height) last_visible_row = min(table.row_count, first_visible_row + visible_height) visible_names: set[str] = set() for row_index in range(first_visible_row, last_visible_row): row = table.get_row_at(row_index) if len(row) < 2: continue visible_names.add(str(row[1])) return visible_names async def query_commands_concurrently(self, query_plan: list[tuple[str | None, Any]]) -> list[tuple[str | None, Any, Any | None]]: if self.interface is None: return [] semaphore = asyncio.Semaphore(self.POLL_CONCURRENCY) async def query_one(metric_id: str | None, command: Any) -> tuple[str | None, Any, Any | None]: async with semaphore: raw_value = await self.interface.query_command(command) return (metric_id, command, raw_value) tasks = [query_one(metric_id, command) for metric_id, command in query_plan] return await asyncio.gather(*tasks) def apply_query_results(self, results: list[tuple[str | None, Any, Any | None]]) -> None: table = self.query_one("#commands-table", DataTable) for metric_id, command, raw_value in results: command_name = getattr(command, "name", str(command)) value = "--" if raw_value is None else format_obd_value(raw_value) if metric_id is not None and raw_value is not None: self.update_metric_from_command(metric_id, raw_value) if self.last_command_values.get(command_name) == value: continue try: table.update_cell(command_name, "value", value) self.last_command_values[command_name] = value self.command_value_cache[command_name] = value except Exception: continue def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: row = self.query_one("#commands-table", DataTable).get_row(event.row_key) if len(row) >= 2: self.set_ttl_editor(str(row[1])) async def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id != "ttl-editor" or self.interface is None or self.ttl_edit_command is None: return raw_value = event.value.strip() try: ttl_ms = parse_duration_ms(raw_value) except ValueError: self.logger.error("Invalid TTL value: %s", raw_value) self.set_ttl_editor(self.ttl_edit_command) return await self.interface.update_ttl_override(self.ttl_edit_command, ttl_ms) self.query_one("#commands-table", DataTable).update_cell(self.ttl_edit_command, "ttl", str(ttl_ms)) self.logger.info("Updated TTL for %s to %d ms", self.ttl_edit_command, ttl_ms) self.set_ttl_editor(self.ttl_edit_command) self.action_focus_table() async def poll_commands(self) -> None: while True: if self.interface is None: await asyncio.sleep(0.1) continue try: await self.interface.connect() mode = self.selected_mode query_plan = self.build_query_plan(mode) results = await self.query_commands_concurrently(query_plan) if mode == self.selected_mode: self.apply_query_results(results) await asyncio.sleep(self.POLL_INTERVAL) except ConnectionError as exc: self.logger.error("%s", exc) await asyncio.sleep(self.RECONNECT_DELAY) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="OBD2 telemetry dashboard") parser.add_argument( "--simulated", action="store_true", help="Run the dashboard with continuously generated telemetry data", ) parser.add_argument( "--qps", type=float, default=10.0, help="Maximum OBD queries per second across the app", ) parser.add_argument( "--ttl-config", default="command_ttl.conf", help="Path to COMMAND,ttl_ms cache TTL overrides file", ) return parser if __name__ == "__main__": args = build_parser().parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s.%(msecs)03d - [%(levelname)s] - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) interface_factory = SimulatedOBD2Interface if args.simulated else OBD2Interface app = OBD2App( Report(), interface_factory=interface_factory, query_rate_limit=args.qps, ttl_config_path=args.ttl_config, ) app.run()