diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c01995a --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +venv +.idea diff --git a/command_ttl.conf b/command_ttl.conf index 8f8c662..f530277 100644 --- a/command_ttl.conf +++ b/command_ttl.conf @@ -1,7 +1,7 @@ # Format: COMMAND_NAME,ttl_ms PIDS_A,5000 STATUS,5000 -FREEZE_DTC,5000 +FREEZE_DTC,7500 FUEL_STATUS,5000 ENGINE_LOAD,5000 COOLANT_TEMP,5000 @@ -191,8 +191,8 @@ DTC_OIL_TEMP,5000 DTC_FUEL_INJECT_TIMING,5000 DTC_FUEL_RATE,5000 DTC_EMISSION_REQ,5000 -GET_DTC,5000 -CLEAR_DTC,5000 +GET_DTC,86400000 +CLEAR_DTC,86400000 MIDS_A,5000 MONITOR_O2_B1S1,5000 MONITOR_O2_B1S2,5000 @@ -281,10 +281,10 @@ MONITOR_MISFIRE_CYLINDER_11,5000 MONITOR_MISFIRE_CYLINDER_12,5000 MONITOR_PM_FILTER_B1,5000 MONITOR_PM_FILTER_B2,5000 -GET_CURRENT_DTC,5000 +GET_CURRENT_DTC,86400000 PIDS_9A,5000 VIN_MESSAGE_COUNT,5000 -VIN,5000 +VIN,3600000 CALIBRATION_ID_MESSAGE_COUNT,5000 CALIBRATION_ID,5000 CVN_MESSAGE_COUNT,5000 diff --git a/obd2_tui.py b/obd2_tui.py index 7b63c7e..aaff856 100644 --- a/obd2_tui.py +++ b/obd2_tui.py @@ -2,6 +2,7 @@ import asyncio import argparse import contextlib import logging +import re from typing import Any import obd @@ -31,6 +32,8 @@ MODE_LABELS = { 9: "Mode 09 Vehicle Info", } +MODE_ORDER = [1, 2, 3, 4, 6, 7, 9] + def format_metric_value(report: Report, metric_id: str) -> str: values = { @@ -43,6 +46,22 @@ def format_metric_value(report: Report, metric_id: str) -> str: return values[metric_id] +def parse_duration_ms(value: str) -> int: + match = re.fullmatch(r"\s*(\d+(?:\.\d+)?)\s*(ms|s|m|h)?\s*", value, re.IGNORECASE) + if match is None: + raise ValueError(f"Invalid duration: {value}") + + amount = float(match.group(1)) + unit = (match.group(2) or "ms").lower() + multipliers = { + "ms": 1, + "s": 1000, + "m": 60_000, + "h": 3_600_000, + } + return max(0, int(round(amount * multipliers[unit]))) + + class RichLogHandler(logging.Handler): def __init__(self, app: "OBD2App") -> None: super().__init__() @@ -64,6 +83,12 @@ class OBD2App(App[None]): BINDINGS = [ ("q", "quit", "Quit"), ("b", "toggle_border", "Toggle border"), + ("e", "focus_ttl", "Edit TTL"), + ("escape", "focus_table", "Focus Table"), + ("left", "previous_mode", "Prev Mode"), + ("right", "next_mode", "Next Mode"), + ("shift+up", "jump_table_top", "Top"), + ("shift+down", "jump_table_bottom", "Bottom"), ("1", "select_mode(1)", "Mode 1"), ("2", "select_mode(2)", "Mode 2"), ("3", "select_mode(3)", "Mode 3"), @@ -103,7 +128,7 @@ class OBD2App(App[None]): DataTable(id="commands-table"), Container( Label("Selected TTL (ms)", id="ttl-editor-label"), - Input(placeholder="TTL in ms", id="ttl-editor"), + Input(placeholder="e.g. 10ms, 30s, 1m, 1h", id="ttl-editor"), id="ttl-editor-pane", ), id="table-pane", @@ -123,13 +148,14 @@ class OBD2App(App[None]): classes="metric-card", ) - def on_mount(self) -> None: + async def on_mount(self) -> None: self.configure_logging() self.interface = self.interface_factory( self.logger, query_rate_limit=self.query_rate_limit, ttl_config_path=self.ttl_config_path, ) + await self.interface.reload_ttl_config(force=True) self.configure_commands_table() self.load_mode_table(self.selected_mode) self.poll_task = asyncio.create_task(self.poll_commands()) @@ -157,6 +183,41 @@ class OBD2App(App[None]): self.load_mode_table(mode) self.logger.info("Selected %s", MODE_LABELS[mode]) + def action_previous_mode(self) -> None: + index = MODE_ORDER.index(self.selected_mode) + self.action_select_mode(MODE_ORDER[(index - 1) % len(MODE_ORDER)]) + + def action_next_mode(self) -> None: + index = MODE_ORDER.index(self.selected_mode) + self.action_select_mode(MODE_ORDER[(index + 1) % len(MODE_ORDER)]) + + def action_focus_ttl(self) -> None: + editor = self.query_one("#ttl-editor", Input) + editor.focus() + editor.action_end() + + def action_focus_table(self) -> None: + self.query_one("#commands-table", DataTable).focus() + + def action_jump_table_top(self) -> None: + table = self.query_one("#commands-table", DataTable) + table.focus() + if table.row_count > 0: + table.move_cursor(row=0) + table.scroll_to(y=0, animate=False) + + def action_jump_table_bottom(self) -> None: + table = self.query_one("#commands-table", DataTable) + table.focus() + if table.row_count > 0: + last_row = table.row_count - 1 + table.move_cursor(row=last_row) + table.scroll_to(y=table.max_scroll_y, animate=False) + + def on_click(self, event) -> None: + if getattr(event.widget, "id", None) in {"ttl-editor", "ttl-editor-pane", "ttl-editor-label"}: + self.action_focus_ttl() + def configure_logging(self) -> None: self.log_handler = RichLogHandler(self) self.log_handler.setFormatter( @@ -186,11 +247,12 @@ class OBD2App(App[None]): table = self.query_one("#commands-table", DataTable) table.cursor_type = "row" table.zebra_stripes = True - table.add_column("Code", key="code", width=10) - table.add_column("Name", key="name", width=28) - table.add_column("Description", key="desc", width=44) - table.add_column("TTL (ms)", key="ttl", width=12) - table.add_column("Value", key="value", width=24) + table.show_row_labels = False + table.add_column("Code", key="code", width=8) + table.add_column("Name", key="name", width=24) + table.add_column("Description", key="desc", width=34) + table.add_column("TTL (ms)", key="ttl", width=10) + table.add_column("Value", key="value", width=22) def load_mode_table(self, mode: int) -> None: table = self.query_one("#commands-table", DataTable) @@ -312,7 +374,9 @@ class OBD2App(App[None]): continue def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: - self.set_ttl_editor(str(event.row_key)) + row = self.query_one("#commands-table", DataTable).get_row(event.row_key) + if len(row) >= 2: + self.set_ttl_editor(str(row[1])) async def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id != "ttl-editor" or self.interface is None or self.ttl_edit_command is None: @@ -320,7 +384,7 @@ class OBD2App(App[None]): raw_value = event.value.strip() try: - ttl_ms = int(raw_value) + ttl_ms = parse_duration_ms(raw_value) except ValueError: self.logger.error("Invalid TTL value: %s", raw_value) self.set_ttl_editor(self.ttl_edit_command) @@ -330,6 +394,7 @@ class OBD2App(App[None]): self.query_one("#commands-table", DataTable).update_cell(self.ttl_edit_command, "ttl", str(ttl_ms)) self.logger.info("Updated TTL for %s to %d ms", self.ttl_edit_command, ttl_ms) self.set_ttl_editor(self.ttl_edit_command) + self.action_focus_table() async def poll_commands(self) -> None: while True: diff --git a/test.py b/test.py new file mode 100644 index 0000000..f91bc43 --- /dev/null +++ b/test.py @@ -0,0 +1,269 @@ +import asyncio +import logging +import tempfile +import unittest +from pathlib import Path + +from models import Report +from obd2_interface import OBD2Interface, SimulatedOBD2Interface, SimulatedOBDConnection, format_obd_value +from obd2_tui import OBD2App, format_metric_value, parse_duration_ms + + +class FakeQuantity: + def __init__(self, magnitude: float): + self.magnitude = magnitude + + def to(self, _unit: str) -> "FakeQuantity": + return self + + +class FakeResponse: + def __init__(self, value): + self.value = value + + def is_null(self) -> bool: + return self.value is None + + +class FakeConnection: + def __init__(self, values): + self.values = list(values) + self.closed = False + self.query_count = 0 + + def is_connected(self) -> bool: + return True + + def query(self, _cmd): + self.query_count += 1 + if self.values: + return self.values.pop(0) + return FakeResponse(None) + + def close(self) -> None: + self.closed = True + + +class ReportTests(unittest.TestCase): + def test_setters_normalize_quantities(self) -> None: + report = Report() + report.set_speed(FakeQuantity(32.5)) + report.set_rpm(FakeQuantity(1800)) + report.set_fuel(FakeQuantity(67.0)) + report.set_oil_temp(FakeQuantity(194)) + report.set_coolant_temp(FakeQuantity(201)) + + self.assertEqual(report.speed_mph, 32.5) + self.assertEqual(report.rpm, 1800.0) + self.assertEqual(report.fuel_level_pct, 67.0) + self.assertEqual(report.oil_temp_f, 194.0) + self.assertEqual(report.coolant_temp_f, 201.0) + + +class InterfaceTests(unittest.IsolatedAsyncioTestCase): + async def test_query_value_returns_none_for_empty_response(self) -> None: + interface = OBD2Interface(logging.getLogger("test"), connection=FakeConnection([FakeResponse(None)])) + value = await interface.query_value(interface.scans[0]) + self.assertIsNone(value) + + async def test_query_rate_limit_defaults_to_ten_per_second(self) -> None: + interface = OBD2Interface( + logging.getLogger("test"), + connection=FakeConnection([FakeResponse(1), FakeResponse(2)]), + ) + + start = asyncio.get_running_loop().time() + await asyncio.gather( + interface.query_command(interface.telemetry_commands["speed"]), + interface.query_command(interface.telemetry_commands["rpm"]), + ) + elapsed = asyncio.get_running_loop().time() - start + + self.assertGreaterEqual(elapsed, 0.09) + + async def test_query_command_uses_ttl_cache(self) -> None: + connection = FakeConnection([FakeResponse(12.3)]) + interface = OBD2Interface( + logging.getLogger("test"), + connection=connection, + cache_ttl=0.25, + ) + + command = interface.telemetry_commands["speed"] + first = await interface.query_command(command) + second = await interface.query_command(command) + + self.assertEqual(first, 12.3) + self.assertEqual(second, 12.3) + self.assertEqual(connection.query_count, 1) + + async def test_query_command_cache_expires(self) -> None: + connection = FakeConnection([FakeResponse(12.3), FakeResponse(45.6)]) + interface = OBD2Interface( + logging.getLogger("test"), + connection=connection, + cache_ttl=0.01, + ) + + command = interface.telemetry_commands["speed"] + first = await interface.query_command(command) + await asyncio.sleep(0.02) + second = await interface.query_command(command) + + self.assertEqual(first, 12.3) + self.assertEqual(second, 45.6) + self.assertEqual(connection.query_count, 2) + + async def test_ttl_config_overrides_cache_ttl(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + config_path = Path(tmp_dir) / "command_ttl.conf" + config_path.write_text("SPEED,10\n", encoding="utf-8") + connection = FakeConnection([FakeResponse(12.3), FakeResponse(45.6)]) + interface = OBD2Interface( + logging.getLogger("test"), + connection=connection, + cache_ttl=10.0, + ttl_config_path=str(config_path), + ) + + await interface.reload_ttl_config(force=True) + command = interface.telemetry_commands["speed"] + first = await interface.query_command(command) + await asyncio.sleep(0.02) + second = await interface.query_command(command) + + self.assertEqual(first, 12.3) + self.assertEqual(second, 45.6) + self.assertEqual(connection.query_count, 2) + + async def test_ttl_config_reload_updates_overrides(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + config_path = Path(tmp_dir) / "command_ttl.conf" + config_path.write_text("SPEED,250\n", encoding="utf-8") + interface = OBD2Interface( + logging.getLogger("test"), + connection=FakeConnection([FakeResponse(12.3)]), + ttl_config_path=str(config_path), + ) + + await interface.reload_ttl_config(force=True) + self.assertEqual(interface._ttl_for_command("SPEED"), 0.25) + + await asyncio.sleep(0.02) + config_path.write_text("SPEED,1000\nRPM,100\n", encoding="utf-8") + await interface.reload_ttl_config(force=True) + + self.assertEqual(interface._ttl_for_command("SPEED"), 1.0) + self.assertEqual(interface._ttl_for_command("RPM"), 0.1) + + async def test_update_ttl_override_writes_config_and_reloads(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + config_path = Path(tmp_dir) / "command_ttl.conf" + config_path.write_text("SPEED,250\nRPM,500\n", encoding="utf-8") + interface = OBD2Interface( + logging.getLogger("test"), + connection=FakeConnection([FakeResponse(1)]), + ttl_config_path=str(config_path), + ) + + await interface.reload_ttl_config(force=True) + await interface.update_ttl_override("SPEED", 750) + await interface.update_ttl_override("FUEL_LEVEL", 1000) + + self.assertEqual(interface.get_command_ttl_ms("SPEED"), 750) + self.assertEqual(interface.get_command_ttl_ms("FUEL_LEVEL"), 1000) + contents = config_path.read_text(encoding="utf-8") + self.assertIn("SPEED,750", contents) + self.assertIn("FUEL_LEVEL,1000", contents) + + async def test_sensor_loop_updates_report_without_fake_values(self) -> None: + connection = FakeConnection([FakeResponse(FakeQuantity(41.2))]) + interface = OBD2Interface(logging.getLogger("test"), connection=connection) + report = Report() + scan = interface.scans[0].model_copy(update={"callback": report.set_speed, "interval": 0.01}) + + task = asyncio.create_task(interface.sensor_data_loop(scan)) + await asyncio.sleep(0.03) + await interface.stop() + await task + + self.assertEqual(report.speed_mph, 41.2) + self.assertTrue(connection.closed) + + async def test_simulated_connection_generates_all_telemetry(self) -> None: + interface = SimulatedOBD2Interface(logging.getLogger("test"), query_rate_limit=200.0) + report = Report() + + scans = interface.scans + scans[0].callback = report.set_speed + scans[1].callback = report.set_fuel + scans[2].callback = report.set_oil_temp + scans[3].callback = report.set_rpm + scans[4].callback = report.set_coolant_temp + + tasks = [asyncio.create_task(interface.sensor_data_loop(scan.model_copy(update={"interval": 0.01}))) for scan in scans] + await asyncio.sleep(0.05) + await interface.stop() + await asyncio.gather(*tasks) + + self.assertGreater(report.speed_mph, 0.0) + self.assertGreater(report.rpm, 0.0) + self.assertGreater(report.fuel_level_pct, 0.0) + self.assertGreater(report.oil_temp_f, 0.0) + self.assertGreater(report.coolant_temp_f, 0.0) + + def test_simulated_connection_reports_connected_until_closed(self) -> None: + connection = SimulatedOBDConnection() + self.assertTrue(connection.is_connected()) + connection.close() + self.assertFalse(connection.is_connected()) + + async def test_query_display_value_formats_non_numeric_commands(self) -> None: + interface = SimulatedOBD2Interface(logging.getLogger("test")) + import obd + + vin = await interface.query_display_value(obd.commands.VIN) + dtc = await interface.query_display_value(obd.commands.GET_DTC) + + self.assertEqual(vin, "1HGBH41JXMN109186") + self.assertIn("P0301", dtc) + + +class DashboardTests(unittest.TestCase): + def test_format_metric_value_uses_report_fields(self) -> None: + report = Report( + speed_mph=55.4, + rpm=2100, + fuel_level_pct=75.0, + oil_temp_f=195.2, + coolant_temp_f=201.8, + ) + + self.assertEqual(format_metric_value(report, "speed"), "055.4") + self.assertEqual(format_metric_value(report, "rpm"), "02100") + self.assertEqual(format_metric_value(report, "fuel"), "075.0") + self.assertEqual(format_metric_value(report, "oil-temp"), "195.2") + self.assertEqual(format_metric_value(report, "coolant-temp"), "201.8") + + def test_format_obd_value_formats_numeric_and_none(self) -> None: + self.assertEqual(format_obd_value(None), "--") + self.assertEqual(format_obd_value(42.0), "42") + self.assertEqual(format_obd_value(42.5), "42.5") + + def test_mode_6_command_list_excludes_none_entries(self) -> None: + app = OBD2App(Report()) + commands = app.get_mode_commands(6) + + self.assertTrue(commands) + self.assertTrue(all(command is not None for command in commands)) + + def test_parse_duration_ms_supports_duration_suffixes(self) -> None: + self.assertEqual(parse_duration_ms("10ms"), 10) + self.assertEqual(parse_duration_ms("30s"), 30000) + self.assertEqual(parse_duration_ms("1m"), 60000) + self.assertEqual(parse_duration_ms("1h"), 3600000) + self.assertEqual(parse_duration_ms("250"), 250) + + +if __name__ == "__main__": + unittest.main()