Files
odbc-dashboard/obd2_tui.py
Jimmy Allen a994900a6a initial
2026-04-01 19:34:26 -04:00

391 lines
14 KiB
Python

import asyncio
import argparse
import contextlib
import logging
from typing import Any
import obd
from textual.app import App, ComposeResult
from textual.containers import Container
from textual.widgets import DataTable, Digits, Footer, Header, Input, Label, RichLog, Static
from models import Report
from obd2_interface import OBD2Interface, SimulatedOBD2Interface, format_obd_value
METRICS = [
("speed", "Speed", "mph"),
("rpm", "RPM", ""),
("fuel", "Fuel", "%"),
("coolant-temp", "Coolant Temp", "F"),
("oil-temp", "Oil Temp", "F"),
]
MODE_LABELS = {
1: "Mode 01 Current Data",
2: "Mode 02 Freeze Frame",
3: "Mode 03 Stored DTCs",
4: "Mode 04 Clear DTCs",
6: "Mode 06 Onboard Monitoring",
7: "Mode 07 Pending DTCs",
9: "Mode 09 Vehicle Info",
}
def format_metric_value(report: Report, metric_id: str) -> str:
values = {
"speed": f"{report.speed_mph:05.1f}",
"rpm": f"{report.rpm:05.0f}",
"fuel": f"{report.fuel_level_pct:05.1f}",
"oil-temp": f"{report.oil_temp_f:05.1f}",
"coolant-temp": f"{report.coolant_temp_f:05.1f}",
}
return values[metric_id]
class RichLogHandler(logging.Handler):
def __init__(self, app: "OBD2App") -> None:
super().__init__()
self.app = app
def emit(self, record: logging.LogRecord) -> None:
message = self.format(record)
try:
self.app.call_from_thread(self.app.write_log, message)
except RuntimeError:
self.app.write_log(message)
class OBD2App(App[None]):
CSS_PATH = "ui.css"
POLL_CONCURRENCY = 12
POLL_INTERVAL = 0.01
RECONNECT_DELAY = 5.0
BINDINGS = [
("q", "quit", "Quit"),
("b", "toggle_border", "Toggle border"),
("1", "select_mode(1)", "Mode 1"),
("2", "select_mode(2)", "Mode 2"),
("3", "select_mode(3)", "Mode 3"),
("4", "select_mode(4)", "Mode 4"),
("6", "select_mode(6)", "Mode 6"),
("7", "select_mode(7)", "Mode 7"),
("9", "select_mode(9)", "Mode 9"),
]
def __init__(
self,
report: Report,
interface_factory: type[OBD2Interface] = OBD2Interface,
query_rate_limit: float = 10.0,
ttl_config_path: str | None = None,
):
super().__init__()
self.report = report
self.interface_factory = interface_factory
self.query_rate_limit = query_rate_limit
self.ttl_config_path = ttl_config_path
self.logger = logging.getLogger("obd2")
self.interface: OBD2Interface | None = None
self.poll_task: asyncio.Task[None] | None = None
self.double_border = False
self.selected_mode = 1
self.last_command_values: dict[str, str] = {}
self.command_value_cache: dict[str, str] = {}
self.log_handler: RichLogHandler | None = None
self.ttl_edit_command: str | None = None
def compose(self) -> ComposeResult:
yield Header()
yield Container(
Container(*[self.compose_metric(metric_id, label, unit) for metric_id, label, unit in METRICS], id="dashboard"),
Container(
DataTable(id="commands-table"),
Container(
Label("Selected TTL (ms)", id="ttl-editor-label"),
Input(placeholder="TTL in ms", id="ttl-editor"),
id="ttl-editor-pane",
),
id="table-pane",
),
id="main-pane",
)
yield Static("", id="mode-banner")
yield RichLog(id="log-panel", auto_scroll=True, wrap=True, highlight=True, markup=False)
yield Footer()
def compose_metric(self, metric_id: str, label: str, unit: str) -> Container:
unit_text = unit or "value"
return Container(
Label(label, classes="metric-label"),
Digits(format_metric_value(self.report, metric_id), id=f"{metric_id}-digits", classes="metric-digits"),
Label(unit_text, classes="metric-unit"),
classes="metric-card",
)
def on_mount(self) -> None:
self.configure_logging()
self.interface = self.interface_factory(
self.logger,
query_rate_limit=self.query_rate_limit,
ttl_config_path=self.ttl_config_path,
)
self.configure_commands_table()
self.load_mode_table(self.selected_mode)
self.poll_task = asyncio.create_task(self.poll_commands())
self.logger.info("Mounted OBD2 dashboard")
async def on_unmount(self) -> None:
if self.interface is not None:
await self.interface.stop()
if self.poll_task is not None:
self.poll_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.poll_task
self.teardown_logging()
def action_toggle_border(self) -> None:
self.double_border = not self.double_border
border = ("double", "yellow") if self.double_border else ("solid", "white")
for card in self.query(".metric-card"):
card.styles.border = border
def action_select_mode(self, mode: int) -> None:
if mode not in MODE_LABELS:
return
self.selected_mode = mode
self.load_mode_table(mode)
self.logger.info("Selected %s", MODE_LABELS[mode])
def configure_logging(self) -> None:
self.log_handler = RichLogHandler(self)
self.log_handler.setFormatter(
logging.Formatter(
"%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
)
root_logger = logging.getLogger()
root_logger.handlers.clear()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(self.log_handler)
self.logger.propagate = True
def teardown_logging(self) -> None:
if self.log_handler is None:
return
root_logger = logging.getLogger()
with contextlib.suppress(ValueError):
root_logger.removeHandler(self.log_handler)
self.log_handler = None
def write_log(self, message: str) -> None:
self.query_one("#log-panel", RichLog).write(message)
def configure_commands_table(self) -> None:
table = self.query_one("#commands-table", DataTable)
table.cursor_type = "row"
table.zebra_stripes = True
table.add_column("Code", key="code", width=10)
table.add_column("Name", key="name", width=28)
table.add_column("Description", key="desc", width=44)
table.add_column("TTL (ms)", key="ttl", width=12)
table.add_column("Value", key="value", width=24)
def load_mode_table(self, mode: int) -> None:
table = self.query_one("#commands-table", DataTable)
table.clear(columns=False)
self.last_command_values = {}
commands = self.get_mode_commands(mode)
for command in commands:
pid = getattr(command, "pid", None)
code = f"{mode:02X}.{pid:02X}" if pid is not None else f"{mode:02X}"
cached_value = self.command_value_cache.get(command.name, "--")
ttl_value = self.get_command_ttl_display(command.name)
table.add_row(code, command.name, getattr(command, "desc", ""), ttl_value, cached_value, key=command.name)
self.last_command_values[command.name] = cached_value
banner = self.query_one("#mode-banner", Static)
banner.update(f"{MODE_LABELS[mode]} Press 1,2,3,4,6,7,9 to switch modes")
if commands:
self.set_ttl_editor(commands[0].name)
def get_mode_commands(self, mode: int) -> list[object]:
return [command for command in obd.commands[mode] if command is not None]
def get_command_ttl_display(self, command_name: str) -> str:
if self.interface is None:
return "--"
return str(self.interface.get_command_ttl_ms(command_name))
def set_ttl_editor(self, command_name: str) -> None:
self.ttl_edit_command = command_name
self.query_one("#ttl-editor-label", Label).update(f"Selected TTL (ms) for {command_name}")
self.query_one("#ttl-editor", Input).value = self.get_command_ttl_display(command_name)
def update_metric_from_command(self, metric_id: str, raw_value: object) -> None:
if self.interface is None:
return
numeric_value = self.interface.normalize_metric_value(metric_id, raw_value)
setters = {
"speed": self.report.set_speed,
"rpm": self.report.set_rpm,
"fuel": self.report.set_fuel,
"oil-temp": self.report.set_oil_temp,
"coolant-temp": self.report.set_coolant_temp,
}
setters[metric_id](numeric_value)
self.query_one(f"#{metric_id}-digits", Digits).update(format_metric_value(self.report, metric_id))
def build_query_plan(self, mode: int) -> list[tuple[str | None, Any]]:
if self.interface is None:
return []
query_plan: list[tuple[str | None, Any]] = []
seen_names: set[str] = set()
for metric_id, command in self.interface.telemetry_commands.items():
command_name = getattr(command, "name", str(command))
query_plan.append((metric_id, command))
seen_names.add(command_name)
visible_command_names = self.get_visible_command_names()
for command in self.get_mode_commands(mode):
command_name = getattr(command, "name", str(command))
if command_name not in visible_command_names:
continue
if command_name in seen_names:
continue
query_plan.append((None, command))
seen_names.add(command_name)
return query_plan
def get_visible_command_names(self) -> set[str]:
table = self.query_one("#commands-table", DataTable)
if table.row_count == 0:
return set()
first_visible_row = max(0, int(table.scroll_y))
visible_height = max(1, table.scrollable_content_region.height)
last_visible_row = min(table.row_count, first_visible_row + visible_height)
visible_names: set[str] = set()
for row_index in range(first_visible_row, last_visible_row):
row = table.get_row_at(row_index)
if len(row) < 2:
continue
visible_names.add(str(row[1]))
return visible_names
async def query_commands_concurrently(self, query_plan: list[tuple[str | None, Any]]) -> list[tuple[str | None, Any, Any | None]]:
if self.interface is None:
return []
semaphore = asyncio.Semaphore(self.POLL_CONCURRENCY)
async def query_one(metric_id: str | None, command: Any) -> tuple[str | None, Any, Any | None]:
async with semaphore:
raw_value = await self.interface.query_command(command)
return (metric_id, command, raw_value)
tasks = [query_one(metric_id, command) for metric_id, command in query_plan]
return await asyncio.gather(*tasks)
def apply_query_results(self, results: list[tuple[str | None, Any, Any | None]]) -> None:
table = self.query_one("#commands-table", DataTable)
for metric_id, command, raw_value in results:
command_name = getattr(command, "name", str(command))
value = "--" if raw_value is None else format_obd_value(raw_value)
if metric_id is not None and raw_value is not None:
self.update_metric_from_command(metric_id, raw_value)
if self.last_command_values.get(command_name) == value:
continue
try:
table.update_cell(command_name, "value", value)
self.last_command_values[command_name] = value
self.command_value_cache[command_name] = value
except Exception:
continue
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
self.set_ttl_editor(str(event.row_key))
async def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id != "ttl-editor" or self.interface is None or self.ttl_edit_command is None:
return
raw_value = event.value.strip()
try:
ttl_ms = int(raw_value)
except ValueError:
self.logger.error("Invalid TTL value: %s", raw_value)
self.set_ttl_editor(self.ttl_edit_command)
return
await self.interface.update_ttl_override(self.ttl_edit_command, ttl_ms)
self.query_one("#commands-table", DataTable).update_cell(self.ttl_edit_command, "ttl", str(ttl_ms))
self.logger.info("Updated TTL for %s to %d ms", self.ttl_edit_command, ttl_ms)
self.set_ttl_editor(self.ttl_edit_command)
async def poll_commands(self) -> None:
while True:
if self.interface is None:
await asyncio.sleep(0.1)
continue
try:
await self.interface.connect()
mode = self.selected_mode
query_plan = self.build_query_plan(mode)
results = await self.query_commands_concurrently(query_plan)
if mode == self.selected_mode:
self.apply_query_results(results)
await asyncio.sleep(self.POLL_INTERVAL)
except ConnectionError as exc:
self.logger.error("%s", exc)
await asyncio.sleep(self.RECONNECT_DELAY)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="OBD2 telemetry dashboard")
parser.add_argument(
"--simulated",
action="store_true",
help="Run the dashboard with continuously generated telemetry data",
)
parser.add_argument(
"--qps",
type=float,
default=10.0,
help="Maximum OBD queries per second across the app",
)
parser.add_argument(
"--ttl-config",
default="command_ttl.conf",
help="Path to COMMAND,ttl_ms cache TTL overrides file",
)
return parser
if __name__ == "__main__":
args = build_parser().parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d - [%(levelname)s] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
interface_factory = SimulatedOBD2Interface if args.simulated else OBD2Interface
app = OBD2App(
Report(),
interface_factory=interface_factory,
query_rate_limit=args.qps,
ttl_config_path=args.ttl_config,
)
app.run()