Update TUI controls and TTL editing
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
__pycache__
|
||||
venv
|
||||
.idea
|
||||
@ -1,7 +1,7 @@
|
||||
# Format: COMMAND_NAME,ttl_ms
|
||||
PIDS_A,5000
|
||||
STATUS,5000
|
||||
FREEZE_DTC,5000
|
||||
FREEZE_DTC,7500
|
||||
FUEL_STATUS,5000
|
||||
ENGINE_LOAD,5000
|
||||
COOLANT_TEMP,5000
|
||||
@ -191,8 +191,8 @@ DTC_OIL_TEMP,5000
|
||||
DTC_FUEL_INJECT_TIMING,5000
|
||||
DTC_FUEL_RATE,5000
|
||||
DTC_EMISSION_REQ,5000
|
||||
GET_DTC,5000
|
||||
CLEAR_DTC,5000
|
||||
GET_DTC,86400000
|
||||
CLEAR_DTC,86400000
|
||||
MIDS_A,5000
|
||||
MONITOR_O2_B1S1,5000
|
||||
MONITOR_O2_B1S2,5000
|
||||
@ -281,10 +281,10 @@ MONITOR_MISFIRE_CYLINDER_11,5000
|
||||
MONITOR_MISFIRE_CYLINDER_12,5000
|
||||
MONITOR_PM_FILTER_B1,5000
|
||||
MONITOR_PM_FILTER_B2,5000
|
||||
GET_CURRENT_DTC,5000
|
||||
GET_CURRENT_DTC,86400000
|
||||
PIDS_9A,5000
|
||||
VIN_MESSAGE_COUNT,5000
|
||||
VIN,5000
|
||||
VIN,3600000
|
||||
CALIBRATION_ID_MESSAGE_COUNT,5000
|
||||
CALIBRATION_ID,5000
|
||||
CVN_MESSAGE_COUNT,5000
|
||||
|
||||
83
obd2_tui.py
83
obd2_tui.py
@ -2,6 +2,7 @@ import asyncio
|
||||
import argparse
|
||||
import contextlib
|
||||
import logging
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
import obd
|
||||
@ -31,6 +32,8 @@ MODE_LABELS = {
|
||||
9: "Mode 09 Vehicle Info",
|
||||
}
|
||||
|
||||
MODE_ORDER = [1, 2, 3, 4, 6, 7, 9]
|
||||
|
||||
|
||||
def format_metric_value(report: Report, metric_id: str) -> str:
|
||||
values = {
|
||||
@ -43,6 +46,22 @@ def format_metric_value(report: Report, metric_id: str) -> str:
|
||||
return values[metric_id]
|
||||
|
||||
|
||||
def parse_duration_ms(value: str) -> int:
|
||||
match = re.fullmatch(r"\s*(\d+(?:\.\d+)?)\s*(ms|s|m|h)?\s*", value, re.IGNORECASE)
|
||||
if match is None:
|
||||
raise ValueError(f"Invalid duration: {value}")
|
||||
|
||||
amount = float(match.group(1))
|
||||
unit = (match.group(2) or "ms").lower()
|
||||
multipliers = {
|
||||
"ms": 1,
|
||||
"s": 1000,
|
||||
"m": 60_000,
|
||||
"h": 3_600_000,
|
||||
}
|
||||
return max(0, int(round(amount * multipliers[unit])))
|
||||
|
||||
|
||||
class RichLogHandler(logging.Handler):
|
||||
def __init__(self, app: "OBD2App") -> None:
|
||||
super().__init__()
|
||||
@ -64,6 +83,12 @@ class OBD2App(App[None]):
|
||||
BINDINGS = [
|
||||
("q", "quit", "Quit"),
|
||||
("b", "toggle_border", "Toggle border"),
|
||||
("e", "focus_ttl", "Edit TTL"),
|
||||
("escape", "focus_table", "Focus Table"),
|
||||
("left", "previous_mode", "Prev Mode"),
|
||||
("right", "next_mode", "Next Mode"),
|
||||
("shift+up", "jump_table_top", "Top"),
|
||||
("shift+down", "jump_table_bottom", "Bottom"),
|
||||
("1", "select_mode(1)", "Mode 1"),
|
||||
("2", "select_mode(2)", "Mode 2"),
|
||||
("3", "select_mode(3)", "Mode 3"),
|
||||
@ -103,7 +128,7 @@ class OBD2App(App[None]):
|
||||
DataTable(id="commands-table"),
|
||||
Container(
|
||||
Label("Selected TTL (ms)", id="ttl-editor-label"),
|
||||
Input(placeholder="TTL in ms", id="ttl-editor"),
|
||||
Input(placeholder="e.g. 10ms, 30s, 1m, 1h", id="ttl-editor"),
|
||||
id="ttl-editor-pane",
|
||||
),
|
||||
id="table-pane",
|
||||
@ -123,13 +148,14 @@ class OBD2App(App[None]):
|
||||
classes="metric-card",
|
||||
)
|
||||
|
||||
def on_mount(self) -> None:
|
||||
async def on_mount(self) -> None:
|
||||
self.configure_logging()
|
||||
self.interface = self.interface_factory(
|
||||
self.logger,
|
||||
query_rate_limit=self.query_rate_limit,
|
||||
ttl_config_path=self.ttl_config_path,
|
||||
)
|
||||
await self.interface.reload_ttl_config(force=True)
|
||||
self.configure_commands_table()
|
||||
self.load_mode_table(self.selected_mode)
|
||||
self.poll_task = asyncio.create_task(self.poll_commands())
|
||||
@ -157,6 +183,41 @@ class OBD2App(App[None]):
|
||||
self.load_mode_table(mode)
|
||||
self.logger.info("Selected %s", MODE_LABELS[mode])
|
||||
|
||||
def action_previous_mode(self) -> None:
|
||||
index = MODE_ORDER.index(self.selected_mode)
|
||||
self.action_select_mode(MODE_ORDER[(index - 1) % len(MODE_ORDER)])
|
||||
|
||||
def action_next_mode(self) -> None:
|
||||
index = MODE_ORDER.index(self.selected_mode)
|
||||
self.action_select_mode(MODE_ORDER[(index + 1) % len(MODE_ORDER)])
|
||||
|
||||
def action_focus_ttl(self) -> None:
|
||||
editor = self.query_one("#ttl-editor", Input)
|
||||
editor.focus()
|
||||
editor.action_end()
|
||||
|
||||
def action_focus_table(self) -> None:
|
||||
self.query_one("#commands-table", DataTable).focus()
|
||||
|
||||
def action_jump_table_top(self) -> None:
|
||||
table = self.query_one("#commands-table", DataTable)
|
||||
table.focus()
|
||||
if table.row_count > 0:
|
||||
table.move_cursor(row=0)
|
||||
table.scroll_to(y=0, animate=False)
|
||||
|
||||
def action_jump_table_bottom(self) -> None:
|
||||
table = self.query_one("#commands-table", DataTable)
|
||||
table.focus()
|
||||
if table.row_count > 0:
|
||||
last_row = table.row_count - 1
|
||||
table.move_cursor(row=last_row)
|
||||
table.scroll_to(y=table.max_scroll_y, animate=False)
|
||||
|
||||
def on_click(self, event) -> None:
|
||||
if getattr(event.widget, "id", None) in {"ttl-editor", "ttl-editor-pane", "ttl-editor-label"}:
|
||||
self.action_focus_ttl()
|
||||
|
||||
def configure_logging(self) -> None:
|
||||
self.log_handler = RichLogHandler(self)
|
||||
self.log_handler.setFormatter(
|
||||
@ -186,11 +247,12 @@ class OBD2App(App[None]):
|
||||
table = self.query_one("#commands-table", DataTable)
|
||||
table.cursor_type = "row"
|
||||
table.zebra_stripes = True
|
||||
table.add_column("Code", key="code", width=10)
|
||||
table.add_column("Name", key="name", width=28)
|
||||
table.add_column("Description", key="desc", width=44)
|
||||
table.add_column("TTL (ms)", key="ttl", width=12)
|
||||
table.add_column("Value", key="value", width=24)
|
||||
table.show_row_labels = False
|
||||
table.add_column("Code", key="code", width=8)
|
||||
table.add_column("Name", key="name", width=24)
|
||||
table.add_column("Description", key="desc", width=34)
|
||||
table.add_column("TTL (ms)", key="ttl", width=10)
|
||||
table.add_column("Value", key="value", width=22)
|
||||
|
||||
def load_mode_table(self, mode: int) -> None:
|
||||
table = self.query_one("#commands-table", DataTable)
|
||||
@ -312,7 +374,9 @@ class OBD2App(App[None]):
|
||||
continue
|
||||
|
||||
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
|
||||
self.set_ttl_editor(str(event.row_key))
|
||||
row = self.query_one("#commands-table", DataTable).get_row(event.row_key)
|
||||
if len(row) >= 2:
|
||||
self.set_ttl_editor(str(row[1]))
|
||||
|
||||
async def on_input_submitted(self, event: Input.Submitted) -> None:
|
||||
if event.input.id != "ttl-editor" or self.interface is None or self.ttl_edit_command is None:
|
||||
@ -320,7 +384,7 @@ class OBD2App(App[None]):
|
||||
|
||||
raw_value = event.value.strip()
|
||||
try:
|
||||
ttl_ms = int(raw_value)
|
||||
ttl_ms = parse_duration_ms(raw_value)
|
||||
except ValueError:
|
||||
self.logger.error("Invalid TTL value: %s", raw_value)
|
||||
self.set_ttl_editor(self.ttl_edit_command)
|
||||
@ -330,6 +394,7 @@ class OBD2App(App[None]):
|
||||
self.query_one("#commands-table", DataTable).update_cell(self.ttl_edit_command, "ttl", str(ttl_ms))
|
||||
self.logger.info("Updated TTL for %s to %d ms", self.ttl_edit_command, ttl_ms)
|
||||
self.set_ttl_editor(self.ttl_edit_command)
|
||||
self.action_focus_table()
|
||||
|
||||
async def poll_commands(self) -> None:
|
||||
while True:
|
||||
|
||||
269
test.py
Normal file
269
test.py
Normal file
@ -0,0 +1,269 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from models import Report
|
||||
from obd2_interface import OBD2Interface, SimulatedOBD2Interface, SimulatedOBDConnection, format_obd_value
|
||||
from obd2_tui import OBD2App, format_metric_value, parse_duration_ms
|
||||
|
||||
|
||||
class FakeQuantity:
|
||||
def __init__(self, magnitude: float):
|
||||
self.magnitude = magnitude
|
||||
|
||||
def to(self, _unit: str) -> "FakeQuantity":
|
||||
return self
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def is_null(self) -> bool:
|
||||
return self.value is None
|
||||
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, values):
|
||||
self.values = list(values)
|
||||
self.closed = False
|
||||
self.query_count = 0
|
||||
|
||||
def is_connected(self) -> bool:
|
||||
return True
|
||||
|
||||
def query(self, _cmd):
|
||||
self.query_count += 1
|
||||
if self.values:
|
||||
return self.values.pop(0)
|
||||
return FakeResponse(None)
|
||||
|
||||
def close(self) -> None:
|
||||
self.closed = True
|
||||
|
||||
|
||||
class ReportTests(unittest.TestCase):
|
||||
def test_setters_normalize_quantities(self) -> None:
|
||||
report = Report()
|
||||
report.set_speed(FakeQuantity(32.5))
|
||||
report.set_rpm(FakeQuantity(1800))
|
||||
report.set_fuel(FakeQuantity(67.0))
|
||||
report.set_oil_temp(FakeQuantity(194))
|
||||
report.set_coolant_temp(FakeQuantity(201))
|
||||
|
||||
self.assertEqual(report.speed_mph, 32.5)
|
||||
self.assertEqual(report.rpm, 1800.0)
|
||||
self.assertEqual(report.fuel_level_pct, 67.0)
|
||||
self.assertEqual(report.oil_temp_f, 194.0)
|
||||
self.assertEqual(report.coolant_temp_f, 201.0)
|
||||
|
||||
|
||||
class InterfaceTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_query_value_returns_none_for_empty_response(self) -> None:
|
||||
interface = OBD2Interface(logging.getLogger("test"), connection=FakeConnection([FakeResponse(None)]))
|
||||
value = await interface.query_value(interface.scans[0])
|
||||
self.assertIsNone(value)
|
||||
|
||||
async def test_query_rate_limit_defaults_to_ten_per_second(self) -> None:
|
||||
interface = OBD2Interface(
|
||||
logging.getLogger("test"),
|
||||
connection=FakeConnection([FakeResponse(1), FakeResponse(2)]),
|
||||
)
|
||||
|
||||
start = asyncio.get_running_loop().time()
|
||||
await asyncio.gather(
|
||||
interface.query_command(interface.telemetry_commands["speed"]),
|
||||
interface.query_command(interface.telemetry_commands["rpm"]),
|
||||
)
|
||||
elapsed = asyncio.get_running_loop().time() - start
|
||||
|
||||
self.assertGreaterEqual(elapsed, 0.09)
|
||||
|
||||
async def test_query_command_uses_ttl_cache(self) -> None:
|
||||
connection = FakeConnection([FakeResponse(12.3)])
|
||||
interface = OBD2Interface(
|
||||
logging.getLogger("test"),
|
||||
connection=connection,
|
||||
cache_ttl=0.25,
|
||||
)
|
||||
|
||||
command = interface.telemetry_commands["speed"]
|
||||
first = await interface.query_command(command)
|
||||
second = await interface.query_command(command)
|
||||
|
||||
self.assertEqual(first, 12.3)
|
||||
self.assertEqual(second, 12.3)
|
||||
self.assertEqual(connection.query_count, 1)
|
||||
|
||||
async def test_query_command_cache_expires(self) -> None:
|
||||
connection = FakeConnection([FakeResponse(12.3), FakeResponse(45.6)])
|
||||
interface = OBD2Interface(
|
||||
logging.getLogger("test"),
|
||||
connection=connection,
|
||||
cache_ttl=0.01,
|
||||
)
|
||||
|
||||
command = interface.telemetry_commands["speed"]
|
||||
first = await interface.query_command(command)
|
||||
await asyncio.sleep(0.02)
|
||||
second = await interface.query_command(command)
|
||||
|
||||
self.assertEqual(first, 12.3)
|
||||
self.assertEqual(second, 45.6)
|
||||
self.assertEqual(connection.query_count, 2)
|
||||
|
||||
async def test_ttl_config_overrides_cache_ttl(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
config_path = Path(tmp_dir) / "command_ttl.conf"
|
||||
config_path.write_text("SPEED,10\n", encoding="utf-8")
|
||||
connection = FakeConnection([FakeResponse(12.3), FakeResponse(45.6)])
|
||||
interface = OBD2Interface(
|
||||
logging.getLogger("test"),
|
||||
connection=connection,
|
||||
cache_ttl=10.0,
|
||||
ttl_config_path=str(config_path),
|
||||
)
|
||||
|
||||
await interface.reload_ttl_config(force=True)
|
||||
command = interface.telemetry_commands["speed"]
|
||||
first = await interface.query_command(command)
|
||||
await asyncio.sleep(0.02)
|
||||
second = await interface.query_command(command)
|
||||
|
||||
self.assertEqual(first, 12.3)
|
||||
self.assertEqual(second, 45.6)
|
||||
self.assertEqual(connection.query_count, 2)
|
||||
|
||||
async def test_ttl_config_reload_updates_overrides(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
config_path = Path(tmp_dir) / "command_ttl.conf"
|
||||
config_path.write_text("SPEED,250\n", encoding="utf-8")
|
||||
interface = OBD2Interface(
|
||||
logging.getLogger("test"),
|
||||
connection=FakeConnection([FakeResponse(12.3)]),
|
||||
ttl_config_path=str(config_path),
|
||||
)
|
||||
|
||||
await interface.reload_ttl_config(force=True)
|
||||
self.assertEqual(interface._ttl_for_command("SPEED"), 0.25)
|
||||
|
||||
await asyncio.sleep(0.02)
|
||||
config_path.write_text("SPEED,1000\nRPM,100\n", encoding="utf-8")
|
||||
await interface.reload_ttl_config(force=True)
|
||||
|
||||
self.assertEqual(interface._ttl_for_command("SPEED"), 1.0)
|
||||
self.assertEqual(interface._ttl_for_command("RPM"), 0.1)
|
||||
|
||||
async def test_update_ttl_override_writes_config_and_reloads(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
config_path = Path(tmp_dir) / "command_ttl.conf"
|
||||
config_path.write_text("SPEED,250\nRPM,500\n", encoding="utf-8")
|
||||
interface = OBD2Interface(
|
||||
logging.getLogger("test"),
|
||||
connection=FakeConnection([FakeResponse(1)]),
|
||||
ttl_config_path=str(config_path),
|
||||
)
|
||||
|
||||
await interface.reload_ttl_config(force=True)
|
||||
await interface.update_ttl_override("SPEED", 750)
|
||||
await interface.update_ttl_override("FUEL_LEVEL", 1000)
|
||||
|
||||
self.assertEqual(interface.get_command_ttl_ms("SPEED"), 750)
|
||||
self.assertEqual(interface.get_command_ttl_ms("FUEL_LEVEL"), 1000)
|
||||
contents = config_path.read_text(encoding="utf-8")
|
||||
self.assertIn("SPEED,750", contents)
|
||||
self.assertIn("FUEL_LEVEL,1000", contents)
|
||||
|
||||
async def test_sensor_loop_updates_report_without_fake_values(self) -> None:
|
||||
connection = FakeConnection([FakeResponse(FakeQuantity(41.2))])
|
||||
interface = OBD2Interface(logging.getLogger("test"), connection=connection)
|
||||
report = Report()
|
||||
scan = interface.scans[0].model_copy(update={"callback": report.set_speed, "interval": 0.01})
|
||||
|
||||
task = asyncio.create_task(interface.sensor_data_loop(scan))
|
||||
await asyncio.sleep(0.03)
|
||||
await interface.stop()
|
||||
await task
|
||||
|
||||
self.assertEqual(report.speed_mph, 41.2)
|
||||
self.assertTrue(connection.closed)
|
||||
|
||||
async def test_simulated_connection_generates_all_telemetry(self) -> None:
|
||||
interface = SimulatedOBD2Interface(logging.getLogger("test"), query_rate_limit=200.0)
|
||||
report = Report()
|
||||
|
||||
scans = interface.scans
|
||||
scans[0].callback = report.set_speed
|
||||
scans[1].callback = report.set_fuel
|
||||
scans[2].callback = report.set_oil_temp
|
||||
scans[3].callback = report.set_rpm
|
||||
scans[4].callback = report.set_coolant_temp
|
||||
|
||||
tasks = [asyncio.create_task(interface.sensor_data_loop(scan.model_copy(update={"interval": 0.01}))) for scan in scans]
|
||||
await asyncio.sleep(0.05)
|
||||
await interface.stop()
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
self.assertGreater(report.speed_mph, 0.0)
|
||||
self.assertGreater(report.rpm, 0.0)
|
||||
self.assertGreater(report.fuel_level_pct, 0.0)
|
||||
self.assertGreater(report.oil_temp_f, 0.0)
|
||||
self.assertGreater(report.coolant_temp_f, 0.0)
|
||||
|
||||
def test_simulated_connection_reports_connected_until_closed(self) -> None:
|
||||
connection = SimulatedOBDConnection()
|
||||
self.assertTrue(connection.is_connected())
|
||||
connection.close()
|
||||
self.assertFalse(connection.is_connected())
|
||||
|
||||
async def test_query_display_value_formats_non_numeric_commands(self) -> None:
|
||||
interface = SimulatedOBD2Interface(logging.getLogger("test"))
|
||||
import obd
|
||||
|
||||
vin = await interface.query_display_value(obd.commands.VIN)
|
||||
dtc = await interface.query_display_value(obd.commands.GET_DTC)
|
||||
|
||||
self.assertEqual(vin, "1HGBH41JXMN109186")
|
||||
self.assertIn("P0301", dtc)
|
||||
|
||||
|
||||
class DashboardTests(unittest.TestCase):
|
||||
def test_format_metric_value_uses_report_fields(self) -> None:
|
||||
report = Report(
|
||||
speed_mph=55.4,
|
||||
rpm=2100,
|
||||
fuel_level_pct=75.0,
|
||||
oil_temp_f=195.2,
|
||||
coolant_temp_f=201.8,
|
||||
)
|
||||
|
||||
self.assertEqual(format_metric_value(report, "speed"), "055.4")
|
||||
self.assertEqual(format_metric_value(report, "rpm"), "02100")
|
||||
self.assertEqual(format_metric_value(report, "fuel"), "075.0")
|
||||
self.assertEqual(format_metric_value(report, "oil-temp"), "195.2")
|
||||
self.assertEqual(format_metric_value(report, "coolant-temp"), "201.8")
|
||||
|
||||
def test_format_obd_value_formats_numeric_and_none(self) -> None:
|
||||
self.assertEqual(format_obd_value(None), "--")
|
||||
self.assertEqual(format_obd_value(42.0), "42")
|
||||
self.assertEqual(format_obd_value(42.5), "42.5")
|
||||
|
||||
def test_mode_6_command_list_excludes_none_entries(self) -> None:
|
||||
app = OBD2App(Report())
|
||||
commands = app.get_mode_commands(6)
|
||||
|
||||
self.assertTrue(commands)
|
||||
self.assertTrue(all(command is not None for command in commands))
|
||||
|
||||
def test_parse_duration_ms_supports_duration_suffixes(self) -> None:
|
||||
self.assertEqual(parse_duration_ms("10ms"), 10)
|
||||
self.assertEqual(parse_duration_ms("30s"), 30000)
|
||||
self.assertEqual(parse_duration_ms("1m"), 60000)
|
||||
self.assertEqual(parse_duration_ms("1h"), 3600000)
|
||||
self.assertEqual(parse_duration_ms("250"), 250)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user