From a994900a6a42f081106a113aeafc7d6b243e2dfa Mon Sep 17 00:00:00 2001 From: Jimmy Allen Date: Wed, 1 Apr 2026 19:34:26 -0400 Subject: [PATCH] initial --- command_ttl.conf | 291 ++++++++++++++++++++++ models.py | 43 ++++ obd2_interface.py | 605 ++++++++++++++++++++++++++++++++++++++++++++++ obd2_tui.py | 390 ++++++++++++++++++++++++++++++ requirements.txt | 5 + ui.css | 80 ++++++ 6 files changed, 1414 insertions(+) create mode 100644 command_ttl.conf create mode 100644 models.py create mode 100644 obd2_interface.py create mode 100644 obd2_tui.py create mode 100644 requirements.txt create mode 100644 ui.css diff --git a/command_ttl.conf b/command_ttl.conf new file mode 100644 index 0000000..8f8c662 --- /dev/null +++ b/command_ttl.conf @@ -0,0 +1,291 @@ +# Format: COMMAND_NAME,ttl_ms +PIDS_A,5000 +STATUS,5000 +FREEZE_DTC,5000 +FUEL_STATUS,5000 +ENGINE_LOAD,5000 +COOLANT_TEMP,5000 +SHORT_FUEL_TRIM_1,5000 +LONG_FUEL_TRIM_1,5000 +SHORT_FUEL_TRIM_2,5000 +LONG_FUEL_TRIM_2,5000 +FUEL_PRESSURE,5000 +INTAKE_PRESSURE,5000 +RPM,10 +SPEED,10 +TIMING_ADVANCE,5000 +INTAKE_TEMP,5000 +MAF,5000 +THROTTLE_POS,5000 +AIR_STATUS,5000 +O2_SENSORS,5000 +O2_B1S1,5000 +O2_B1S2,5000 +O2_B1S3,5000 +O2_B1S4,5000 +O2_B2S1,5000 +O2_B2S2,5000 +O2_B2S3,5000 +O2_B2S4,5000 +OBD_COMPLIANCE,5000 +O2_SENSORS_ALT,5000 +AUX_INPUT_STATUS,5000 +RUN_TIME,5000 +PIDS_B,5000 +DISTANCE_W_MIL,5000 +FUEL_RAIL_PRESSURE_VAC,5000 +FUEL_RAIL_PRESSURE_DIRECT,5000 +O2_S1_WR_VOLTAGE,5000 +O2_S2_WR_VOLTAGE,5000 +O2_S3_WR_VOLTAGE,5000 +O2_S4_WR_VOLTAGE,5000 +O2_S5_WR_VOLTAGE,5000 +O2_S6_WR_VOLTAGE,5000 +O2_S7_WR_VOLTAGE,5000 +O2_S8_WR_VOLTAGE,5000 +COMMANDED_EGR,5000 +EGR_ERROR,5000 +EVAPORATIVE_PURGE,5000 +FUEL_LEVEL,5000 +WARMUPS_SINCE_DTC_CLEAR,5000 +DISTANCE_SINCE_DTC_CLEAR,5000 +EVAP_VAPOR_PRESSURE,5000 +BAROMETRIC_PRESSURE,5000 +O2_S1_WR_CURRENT,5000 +O2_S2_WR_CURRENT,5000 +O2_S3_WR_CURRENT,5000 +O2_S4_WR_CURRENT,5000 +O2_S5_WR_CURRENT,5000 +O2_S6_WR_CURRENT,5000 +O2_S7_WR_CURRENT,5000 +O2_S8_WR_CURRENT,5000 +CATALYST_TEMP_B1S1,5000 +CATALYST_TEMP_B2S1,5000 +CATALYST_TEMP_B1S2,5000 +CATALYST_TEMP_B2S2,5000 +PIDS_C,5000 +STATUS_DRIVE_CYCLE,5000 +CONTROL_MODULE_VOLTAGE,5000 +ABSOLUTE_LOAD,5000 +COMMANDED_EQUIV_RATIO,5000 +RELATIVE_THROTTLE_POS,5000 +AMBIANT_AIR_TEMP,5000 +THROTTLE_POS_B,5000 +THROTTLE_POS_C,5000 +ACCELERATOR_POS_D,5000 +ACCELERATOR_POS_E,5000 +ACCELERATOR_POS_F,5000 +THROTTLE_ACTUATOR,5000 +RUN_TIME_MIL,5000 +TIME_SINCE_DTC_CLEARED,5000 +MAX_VALUES,5000 +MAX_MAF,5000 +FUEL_TYPE,5000 +ETHANOL_PERCENT,5000 +EVAP_VAPOR_PRESSURE_ABS,5000 +EVAP_VAPOR_PRESSURE_ALT,5000 +SHORT_O2_TRIM_B1,5000 +LONG_O2_TRIM_B1,5000 +SHORT_O2_TRIM_B2,5000 +LONG_O2_TRIM_B2,5000 +FUEL_RAIL_PRESSURE_ABS,5000 +RELATIVE_ACCEL_POS,5000 +HYBRID_BATTERY_REMAINING,5000 +OIL_TEMP,5000 +FUEL_INJECT_TIMING,5000 +FUEL_RATE,5000 +EMISSION_REQ,5000 +DTC_PIDS_A,5000 +DTC_STATUS,5000 +DTC_FREEZE_DTC,5000 +DTC_FUEL_STATUS,5000 +DTC_ENGINE_LOAD,5000 +DTC_COOLANT_TEMP,5000 +DTC_SHORT_FUEL_TRIM_1,5000 +DTC_LONG_FUEL_TRIM_1,5000 +DTC_SHORT_FUEL_TRIM_2,5000 +DTC_LONG_FUEL_TRIM_2,5000 +DTC_FUEL_PRESSURE,5000 +DTC_INTAKE_PRESSURE,5000 +DTC_RPM,5000 +DTC_SPEED,5000 +DTC_TIMING_ADVANCE,5000 +DTC_INTAKE_TEMP,5000 +DTC_MAF,5000 +DTC_THROTTLE_POS,5000 +DTC_AIR_STATUS,5000 +DTC_O2_SENSORS,5000 +DTC_O2_B1S1,5000 +DTC_O2_B1S2,5000 +DTC_O2_B1S3,5000 +DTC_O2_B1S4,5000 +DTC_O2_B2S1,5000 +DTC_O2_B2S2,5000 +DTC_O2_B2S3,5000 +DTC_O2_B2S4,5000 +DTC_OBD_COMPLIANCE,5000 +DTC_O2_SENSORS_ALT,5000 +DTC_AUX_INPUT_STATUS,5000 +DTC_RUN_TIME,5000 +DTC_PIDS_B,5000 +DTC_DISTANCE_W_MIL,5000 +DTC_FUEL_RAIL_PRESSURE_VAC,5000 +DTC_FUEL_RAIL_PRESSURE_DIRECT,5000 +DTC_O2_S1_WR_VOLTAGE,5000 +DTC_O2_S2_WR_VOLTAGE,5000 +DTC_O2_S3_WR_VOLTAGE,5000 +DTC_O2_S4_WR_VOLTAGE,5000 +DTC_O2_S5_WR_VOLTAGE,5000 +DTC_O2_S6_WR_VOLTAGE,5000 +DTC_O2_S7_WR_VOLTAGE,5000 +DTC_O2_S8_WR_VOLTAGE,5000 +DTC_COMMANDED_EGR,5000 +DTC_EGR_ERROR,5000 +DTC_EVAPORATIVE_PURGE,5000 +DTC_FUEL_LEVEL,5000 +DTC_WARMUPS_SINCE_DTC_CLEAR,5000 +DTC_DISTANCE_SINCE_DTC_CLEAR,5000 +DTC_EVAP_VAPOR_PRESSURE,5000 +DTC_BAROMETRIC_PRESSURE,5000 +DTC_O2_S1_WR_CURRENT,5000 +DTC_O2_S2_WR_CURRENT,5000 +DTC_O2_S3_WR_CURRENT,5000 +DTC_O2_S4_WR_CURRENT,5000 +DTC_O2_S5_WR_CURRENT,5000 +DTC_O2_S6_WR_CURRENT,5000 +DTC_O2_S7_WR_CURRENT,5000 +DTC_O2_S8_WR_CURRENT,5000 +DTC_CATALYST_TEMP_B1S1,5000 +DTC_CATALYST_TEMP_B2S1,5000 +DTC_CATALYST_TEMP_B1S2,5000 +DTC_CATALYST_TEMP_B2S2,5000 +DTC_PIDS_C,5000 +DTC_STATUS_DRIVE_CYCLE,5000 +DTC_CONTROL_MODULE_VOLTAGE,5000 +DTC_ABSOLUTE_LOAD,5000 +DTC_COMMANDED_EQUIV_RATIO,5000 +DTC_RELATIVE_THROTTLE_POS,5000 +DTC_AMBIANT_AIR_TEMP,5000 +DTC_THROTTLE_POS_B,5000 +DTC_THROTTLE_POS_C,5000 +DTC_ACCELERATOR_POS_D,5000 +DTC_ACCELERATOR_POS_E,5000 +DTC_ACCELERATOR_POS_F,5000 +DTC_THROTTLE_ACTUATOR,5000 +DTC_RUN_TIME_MIL,5000 +DTC_TIME_SINCE_DTC_CLEARED,5000 +DTC_MAX_VALUES,5000 +DTC_MAX_MAF,5000 +DTC_FUEL_TYPE,5000 +DTC_ETHANOL_PERCENT,5000 +DTC_EVAP_VAPOR_PRESSURE_ABS,5000 +DTC_EVAP_VAPOR_PRESSURE_ALT,5000 +DTC_SHORT_O2_TRIM_B1,5000 +DTC_LONG_O2_TRIM_B1,5000 +DTC_SHORT_O2_TRIM_B2,5000 +DTC_LONG_O2_TRIM_B2,5000 +DTC_FUEL_RAIL_PRESSURE_ABS,5000 +DTC_RELATIVE_ACCEL_POS,5000 +DTC_HYBRID_BATTERY_REMAINING,5000 +DTC_OIL_TEMP,5000 +DTC_FUEL_INJECT_TIMING,5000 +DTC_FUEL_RATE,5000 +DTC_EMISSION_REQ,5000 +GET_DTC,5000 +CLEAR_DTC,5000 +MIDS_A,5000 +MONITOR_O2_B1S1,5000 +MONITOR_O2_B1S2,5000 +MONITOR_O2_B1S3,5000 +MONITOR_O2_B1S4,5000 +MONITOR_O2_B2S1,5000 +MONITOR_O2_B2S2,5000 +MONITOR_O2_B2S3,5000 +MONITOR_O2_B2S4,5000 +MONITOR_O2_B3S1,5000 +MONITOR_O2_B3S2,5000 +MONITOR_O2_B3S3,5000 +MONITOR_O2_B3S4,5000 +MONITOR_O2_B4S1,5000 +MONITOR_O2_B4S2,5000 +MONITOR_O2_B4S3,5000 +MONITOR_O2_B4S4,5000 +MIDS_B,5000 +MONITOR_CATALYST_B1,5000 +MONITOR_CATALYST_B2,5000 +MONITOR_CATALYST_B3,5000 +MONITOR_CATALYST_B4,5000 +MONITOR_EGR_B1,5000 +MONITOR_EGR_B2,5000 +MONITOR_EGR_B3,5000 +MONITOR_EGR_B4,5000 +MONITOR_VVT_B1,5000 +MONITOR_VVT_B2,5000 +MONITOR_VVT_B3,5000 +MONITOR_VVT_B4,5000 +MONITOR_EVAP_150,5000 +MONITOR_EVAP_090,5000 +MONITOR_EVAP_040,5000 +MONITOR_EVAP_020,5000 +MONITOR_PURGE_FLOW,5000 +MIDS_C,5000 +MONITOR_O2_HEATER_B1S1,5000 +MONITOR_O2_HEATER_B1S2,5000 +MONITOR_O2_HEATER_B1S3,5000 +MONITOR_O2_HEATER_B1S4,5000 +MONITOR_O2_HEATER_B2S1,5000 +MONITOR_O2_HEATER_B2S2,5000 +MONITOR_O2_HEATER_B2S3,5000 +MONITOR_O2_HEATER_B2S4,5000 +MONITOR_O2_HEATER_B3S1,5000 +MONITOR_O2_HEATER_B3S2,5000 +MONITOR_O2_HEATER_B3S3,5000 +MONITOR_O2_HEATER_B3S4,5000 +MONITOR_O2_HEATER_B4S1,5000 +MONITOR_O2_HEATER_B4S2,5000 +MONITOR_O2_HEATER_B4S3,5000 +MONITOR_O2_HEATER_B4S4,5000 +MIDS_D,5000 +MONITOR_HEATED_CATALYST_B1,5000 +MONITOR_HEATED_CATALYST_B2,5000 +MONITOR_HEATED_CATALYST_B3,5000 +MONITOR_HEATED_CATALYST_B4,5000 +MONITOR_SECONDARY_AIR_1,5000 +MONITOR_SECONDARY_AIR_2,5000 +MONITOR_SECONDARY_AIR_3,5000 +MONITOR_SECONDARY_AIR_4,5000 +MIDS_E,5000 +MONITOR_FUEL_SYSTEM_B1,5000 +MONITOR_FUEL_SYSTEM_B2,5000 +MONITOR_FUEL_SYSTEM_B3,5000 +MONITOR_FUEL_SYSTEM_B4,5000 +MONITOR_BOOST_PRESSURE_B1,5000 +MONITOR_BOOST_PRESSURE_B2,5000 +MONITOR_NOX_ABSORBER_B1,5000 +MONITOR_NOX_ABSORBER_B2,5000 +MONITOR_NOX_CATALYST_B1,5000 +MONITOR_NOX_CATALYST_B2,5000 +MIDS_F,5000 +MONITOR_MISFIRE_GENERAL,5000 +MONITOR_MISFIRE_CYLINDER_1,5000 +MONITOR_MISFIRE_CYLINDER_2,5000 +MONITOR_MISFIRE_CYLINDER_3,5000 +MONITOR_MISFIRE_CYLINDER_4,5000 +MONITOR_MISFIRE_CYLINDER_5,5000 +MONITOR_MISFIRE_CYLINDER_6,5000 +MONITOR_MISFIRE_CYLINDER_7,5000 +MONITOR_MISFIRE_CYLINDER_8,5000 +MONITOR_MISFIRE_CYLINDER_9,5000 +MONITOR_MISFIRE_CYLINDER_10,5000 +MONITOR_MISFIRE_CYLINDER_11,5000 +MONITOR_MISFIRE_CYLINDER_12,5000 +MONITOR_PM_FILTER_B1,5000 +MONITOR_PM_FILTER_B2,5000 +GET_CURRENT_DTC,5000 +PIDS_9A,5000 +VIN_MESSAGE_COUNT,5000 +VIN,5000 +CALIBRATION_ID_MESSAGE_COUNT,5000 +CALIBRATION_ID,5000 +CVN_MESSAGE_COUNT,5000 +CVN,5000 diff --git a/models.py b/models.py new file mode 100644 index 0000000..21d2d2b --- /dev/null +++ b/models.py @@ -0,0 +1,43 @@ +from typing import Any, Callable + +from pydantic import BaseModel, ConfigDict + + +class Report(BaseModel): + model_config = ConfigDict(validate_assignment=True) + + speed_mph: float = 0.0 + rpm: float = 0.0 + fuel_level_pct: float = 0.0 + oil_temp_f: float = 0.0 + coolant_temp_f: float = 0.0 + + @staticmethod + def _coerce_numeric(value: Any) -> float: + if value is None: + return 0.0 + if hasattr(value, "magnitude"): + value = value.magnitude + return float(value) + + def set_speed(self, value: Any) -> None: + self.speed_mph = self._coerce_numeric(value) + + def set_rpm(self, value: Any) -> None: + self.rpm = self._coerce_numeric(value) + + def set_fuel(self, value: Any) -> None: + self.fuel_level_pct = self._coerce_numeric(value) + + def set_oil_temp(self, value: Any) -> None: + self.oil_temp_f = self._coerce_numeric(value) + + def set_coolant_temp(self, value: Any) -> None: + self.coolant_temp_f = self._coerce_numeric(value) + + +class Scan(BaseModel): + cmd: Any = None + interval: float = 10.0 + callback: Callable[[float], None] | None = None + transform: Callable[[Any], float] | None = None diff --git a/obd2_interface.py b/obd2_interface.py new file mode 100644 index 0000000..c54d063 --- /dev/null +++ b/obd2_interface.py @@ -0,0 +1,605 @@ +import asyncio as aio +import contextlib +import logging +import math +from pathlib import Path +import time +from dataclasses import dataclass +from numbers import Number +from typing import Any + +import obd + +from models import Report, Scan + + +def _quantity_to_float(value: Any, unit: str | None = None) -> float: + if value is None: + raise ValueError("Cannot normalize an empty OBD value") + if unit and hasattr(value, "to"): + value = value.to(unit) + if hasattr(value, "magnitude"): + value = value.magnitude + return float(value) + + +def format_obd_value(value: Any) -> str: + if value is None: + return "--" + if isinstance(value, str): + return value + if hasattr(value, "magnitude"): + return str(value) + if isinstance(value, Number): + numeric = float(value) + if numeric.is_integer(): + return str(int(numeric)) + return f"{numeric:.1f}" + if isinstance(value, (list, tuple, set)): + return ", ".join(str(item) for item in value) or "--" + return str(value) + + +@dataclass +class CacheEntry: + value: Any + expires_at: float + + +class SimulatedOBDConnection: + def __init__(self) -> None: + self._start = time.monotonic() + self._closed = False + + def is_connected(self) -> bool: + return not self._closed + + def close(self) -> None: + self._closed = True + + def query(self, cmd: Any) -> Any: + value = self._value_for_command(cmd) + return type( + "SimulatedResponse", + (), + { + "value": value, + "is_null": staticmethod(lambda: False), + }, + )() + + def _value_for_command(self, cmd: Any) -> float: + elapsed = time.monotonic() - self._start + name = getattr(cmd, "name", str(cmd)) + mode = getattr(cmd, "mode", None) + + if name == "SPEED": + return 45.0 + 20.0 * math.sin(elapsed / 2.5) + if name == "RPM": + return 1800.0 + 900.0 * (1.0 + math.sin(elapsed * 1.7)) + if name == "FUEL_LEVEL": + return max(0.0, 85.0 - elapsed * 0.03) + if name == "OIL_TEMP": + return 185.0 + 12.0 * math.sin(elapsed / 8.0) + if name == "COOLANT_TEMP": + return 195.0 + 8.0 * math.sin(elapsed / 10.0) + if name == "VIN": + return "1HGBH41JXMN109186" + if "CALIBRATION_ID" in name: + return "CALID-TEST-001" + if name == "CVN": + return "A1B2C3D4" + if "MESSAGE_COUNT" in name: + return 1 + if name in {"GET_DTC", "GET_CURRENT_DTC"}: + return "P0301, P0420" + if name == "CLEAR_DTC": + return "READY" + if name.startswith(("PIDS_", "DTC_PIDS_", "MIDS_")): + return "FFFF" + if mode == 6: + base = 40.0 + (sum(ord(char) for char in name) % 30) + return base + 10.0 * math.sin(elapsed / 3.0) + if mode in {1, 2, 9}: + base = 10.0 + (sum(ord(char) for char in name) % 80) + return base + 5.0 * math.sin(elapsed / 4.0) + return 0.0 + + +class OBD2Interface: + def __init__( + self, + logger: logging.Logger, + connection: Any | None = None, + query_rate_limit: float = 10.0, + cache_ttl: float = 0.25, + ttl_config_path: str | None = None, + ttl_config_scan_interval: float = 2.0, + ): + self.logger = logger + self.connection = connection + self._owns_connection = connection is None + self.query_rate_limit = query_rate_limit + self.cache_ttl = cache_ttl + self.ttl_config_path = Path(ttl_config_path) if ttl_config_path else None + self.ttl_config_scan_interval = ttl_config_scan_interval + self.sensor_connected = self._is_connected() if connection is not None else False + self._stop_event = aio.Event() + self._tasks: list[aio.Task[Any]] = [] + self._rate_limit_lock = aio.Lock() + self._cache_lock = aio.Lock() + self._next_query_time = 0.0 + self._query_cache: dict[str, CacheEntry] = {} + self._command_ttls: dict[str, float] = {} + self._ttl_config_mtime: float | None = None + self._query_queue: aio.Queue[tuple[Any, str, aio.Future[Any | None]]] = aio.Queue() + self._pending_queries: dict[str, aio.Future[Any | None]] = {} + self._pending_lock = aio.Lock() + self._query_worker_task: aio.Task[None] | None = None + self._ttl_config_task: aio.Task[None] | None = None + + def _is_connected(self) -> bool: + if self.connection is None: + return False + is_connected = getattr(self.connection, "is_connected", None) + if callable(is_connected): + return bool(is_connected()) + return self.connection is not None + + async def connect(self) -> None: + if self.sensor_connected and self.connection is not None: + self._ensure_query_worker() + self._ensure_ttl_config_task() + return + if not self._owns_connection and self.connection is not None: + self.sensor_connected = self._is_connected() + if not self.sensor_connected: + raise ConnectionError("OBD adapter is not connected") + self._ensure_query_worker() + self._ensure_ttl_config_task() + return + + try: + self.logger.info("Attempting to connect to OBD adapter") + self.connection = await aio.to_thread(obd.OBD) + self.sensor_connected = self._is_connected() + if not self.sensor_connected: + raise ConnectionError("OBD adapter not found") + self._ensure_query_worker() + self._ensure_ttl_config_task() + self.logger.info("Connected to OBD adapter") + except Exception as exc: + self.sensor_connected = False + self.connection = None + raise ConnectionError(f"Unable to connect to OBD adapter: {exc}") from exc + + async def disconnect(self) -> None: + self.sensor_connected = False + close = getattr(self.connection, "close", None) + self.connection = None + async with self._cache_lock: + self._query_cache.clear() + await self._fail_pending_queries(ConnectionError("OBD adapter is not connected")) + if callable(close): + await aio.to_thread(close) + + @property + def scans(self) -> list[Scan]: + return [ + Scan( + cmd=obd.commands.SPEED, + interval=0.25, + callback=lambda value: None, + transform=lambda value: _quantity_to_float(value, "mph"), + ), + Scan( + cmd=obd.commands.FUEL_LEVEL, + interval=10.0, + callback=lambda value: None, + transform=lambda value: _quantity_to_float(value), + ), + Scan( + cmd=obd.commands.OIL_TEMP, + interval=5.0, + callback=lambda value: None, + transform=lambda value: _quantity_to_float(value, "degF"), + ), + Scan( + cmd=obd.commands.RPM, + interval=0.25, + callback=lambda value: None, + transform=lambda value: _quantity_to_float(value), + ), + Scan( + cmd=obd.commands.COOLANT_TEMP, + interval=5.0, + callback=lambda value: None, + transform=lambda value: _quantity_to_float(value, "degF"), + ), + ] + + @property + def telemetry_commands(self) -> dict[str, Any]: + return { + "speed": obd.commands.SPEED, + "rpm": obd.commands.RPM, + "fuel": obd.commands.FUEL_LEVEL, + "oil-temp": obd.commands.OIL_TEMP, + "coolant-temp": obd.commands.COOLANT_TEMP, + } + + def normalize_metric_value(self, metric_id: str, value: Any) -> float: + transforms = { + "speed": lambda raw: _quantity_to_float(raw, "mph"), + "rpm": lambda raw: _quantity_to_float(raw), + "fuel": lambda raw: _quantity_to_float(raw), + "oil-temp": lambda raw: _quantity_to_float(raw, "degF"), + "coolant-temp": lambda raw: _quantity_to_float(raw, "degF"), + } + return transforms[metric_id](value) + + async def query_command(self, cmd: Any) -> Any | None: + if not self.sensor_connected or self.connection is None: + raise ConnectionError("OBD adapter is not connected") + cache_key = self._cache_key(cmd) + cached_value = await self._get_cached_value(cache_key) + if cached_value is not None: + return cached_value + self._ensure_query_worker() + return await self._enqueue_query(cmd, cache_key) + + def _cache_key(self, cmd: Any) -> str: + return getattr(cmd, "name", str(cmd)) + + async def _get_cached_value(self, cache_key: str) -> Any | None: + ttl = self._ttl_for_command(cache_key) + if ttl <= 0: + return None + + now = aio.get_running_loop().time() + async with self._cache_lock: + entry = self._query_cache.get(cache_key) + if entry is None: + return None + if entry.expires_at <= now: + self._query_cache.pop(cache_key, None) + return None + return entry.value + + async def _set_cached_value(self, cache_key: str, value: Any) -> None: + ttl = self._ttl_for_command(cache_key) + if ttl <= 0: + return + + expires_at = aio.get_running_loop().time() + ttl + async with self._cache_lock: + self._query_cache[cache_key] = CacheEntry(value=value, expires_at=expires_at) + + def _ttl_for_command(self, cache_key: str) -> float: + return self._command_ttls.get(cache_key, self.cache_ttl) + + def get_command_ttl_ms(self, command_name: str) -> int: + return int(round(self._ttl_for_command(command_name) * 1000.0)) + + def _ensure_ttl_config_task(self) -> None: + if self.ttl_config_path is None: + return + if self._ttl_config_task is None or self._ttl_config_task.done(): + self._ttl_config_task = aio.create_task(self._watch_ttl_config()) + + async def _watch_ttl_config(self) -> None: + await self.reload_ttl_config(force=True) + while not self._stop_event.is_set(): + await aio.sleep(self.ttl_config_scan_interval) + await self.reload_ttl_config() + + async def reload_ttl_config(self, force: bool = False) -> None: + if self.ttl_config_path is None: + return + + try: + stat_result = await aio.to_thread(self.ttl_config_path.stat) + except FileNotFoundError: + if self._command_ttls: + self.logger.warning("TTL config file not found: %s", self.ttl_config_path) + self._command_ttls = {} + self._ttl_config_mtime = None + return + + mtime = stat_result.st_mtime + if not force and self._ttl_config_mtime == mtime: + return + + contents = await aio.to_thread(self.ttl_config_path.read_text, "utf-8") + overrides = self._parse_ttl_config(contents) + self._command_ttls = overrides + self._ttl_config_mtime = mtime + self.logger.info("Loaded %d TTL overrides from %s", len(overrides), self.ttl_config_path.name) + + def _parse_ttl_config(self, contents: str) -> dict[str, float]: + overrides: dict[str, float] = {} + for line_number, raw_line in enumerate(contents.splitlines(), start=1): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + try: + command_name, ttl_ms = [part.strip() for part in line.split(",", 1)] + except ValueError: + self.logger.warning("Invalid TTL config line %d: %s", line_number, raw_line) + continue + if not command_name: + self.logger.warning("Missing command name on TTL config line %d", line_number) + continue + try: + overrides[command_name] = max(0.0, float(ttl_ms) / 1000.0) + except ValueError: + self.logger.warning("Invalid TTL value on line %d: %s", line_number, ttl_ms) + return overrides + + async def update_ttl_override(self, command_name: str, ttl_ms: int) -> None: + if self.ttl_config_path is None: + raise ValueError("TTL config path is not configured") + + ttl_ms = max(0, int(ttl_ms)) + lines: list[str] = [] + found = False + + if self.ttl_config_path.exists(): + contents = await aio.to_thread(self.ttl_config_path.read_text, "utf-8") + for raw_line in contents.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + lines.append(raw_line) + continue + try: + existing_command, _existing_ttl = [part.strip() for part in raw_line.split(",", 1)] + except ValueError: + lines.append(raw_line) + continue + if existing_command == command_name: + lines.append(f"{command_name},{ttl_ms}") + found = True + else: + lines.append(raw_line) + + if not found: + if lines and lines[-1].strip(): + lines.append("") + lines.append(f"{command_name},{ttl_ms}") + + text = "\n".join(lines).rstrip() + "\n" + await aio.to_thread(self.ttl_config_path.write_text, text, "utf-8") + await self.reload_ttl_config(force=True) + + def _ensure_query_worker(self) -> None: + if self._query_worker_task is None or self._query_worker_task.done(): + self._query_worker_task = aio.create_task(self._query_worker()) + + async def _enqueue_query(self, cmd: Any, cache_key: str) -> Any | None: + async with self._pending_lock: + existing = self._pending_queries.get(cache_key) + if existing is not None: + return await existing + + future: aio.Future[Any | None] = aio.get_running_loop().create_future() + self._pending_queries[cache_key] = future + await self._query_queue.put((cmd, cache_key, future)) + + return await future + + async def _query_worker(self) -> None: + while not self._stop_event.is_set(): + cmd: Any | None = None + cache_key = "" + future: aio.Future[Any | None] | None = None + try: + cmd, cache_key, future = await self._query_queue.get() + if future.cancelled(): + continue + result = await self._execute_query(cmd, cache_key) + if not future.done(): + future.set_result(result) + except aio.CancelledError: + raise + except Exception as exc: + if future is not None and not future.done(): + future.set_exception(exc) + finally: + if future is not None: + async with self._pending_lock: + current = self._pending_queries.get(cache_key) + if current is future: + self._pending_queries.pop(cache_key, None) + if cmd is not None: + self._query_queue.task_done() + + async def _execute_query(self, cmd: Any, cache_key: str) -> Any | None: + await self._acquire_query_slot() + connection = self.connection + if connection is None or not self.sensor_connected: + raise ConnectionError("OBD adapter is not connected") + try: + response = await aio.to_thread(connection.query, cmd) + except Exception as exc: + await self.disconnect() + raise ConnectionError(f"OBD query failed for {cmd}: {exc}") from exc + if response is None or getattr(response, "is_null", lambda: False)(): + self.logger.debug("No response for %s", cmd) + return None + value = getattr(response, "value", None) + if value is None: + self.logger.debug("Empty response for %s", cmd) + return None + await self._set_cached_value(cache_key, value) + return value + + async def _fail_pending_queries(self, exc: Exception) -> None: + async with self._pending_lock: + pending = list(self._pending_queries.values()) + self._pending_queries.clear() + + for future in pending: + if not future.done(): + future.set_exception(exc) + + while not self._query_queue.empty(): + try: + _cmd, _cache_key, future = self._query_queue.get_nowait() + except aio.QueueEmpty: + break + if not future.done(): + future.set_exception(exc) + self._query_queue.task_done() + + async def _acquire_query_slot(self) -> None: + if self.query_rate_limit <= 0: + return + + interval = 1.0 / self.query_rate_limit + loop = aio.get_running_loop() + + async with self._rate_limit_lock: + now = loop.time() + wait_time = max(0.0, self._next_query_time - now) + scheduled_time = max(now, self._next_query_time) + self._next_query_time = scheduled_time + interval + + if wait_time > 0: + await aio.sleep(wait_time) + + async def query_value(self, scan: Scan) -> float | None: + value = await self.query_command(scan.cmd) + if value is None: + return None + if scan.transform is None: + return _quantity_to_float(value) + return scan.transform(value) + + async def query_display_value(self, cmd: Any) -> str: + value = await self.query_command(cmd) + if value is None: + return "--" + return format_obd_value(value) + + async def sensor_data_loop(self, scan: Scan) -> None: + if scan.callback is None: + return + + self.logger.info( + "Starting sensor acquisition for %s at %.2fs interval", + scan.cmd, + scan.interval, + ) + try: + while self.sensor_connected and not self._stop_event.is_set(): + value = await self.query_value(scan) + if value is not None: + scan.callback(value) + await aio.sleep(scan.interval) + except ConnectionError as exc: + if self._stop_event.is_set(): + self.logger.info("Sensor loop stopped for %s during shutdown", scan.cmd) + return + self.logger.error("Sensor loop lost connection for %s: %s", scan.cmd, exc) + self.sensor_connected = False + self._stop_event.set() + raise + except aio.CancelledError: + self.logger.info("Sensor task for %s cancelled", scan.cmd) + raise + except Exception: + self.logger.exception("Sensor loop failed for %s", scan.cmd) + self.sensor_connected = False + self._stop_event.set() + raise + finally: + self.logger.info("Sensor loop terminated for %s", scan.cmd) + + async def print_report(self, report: Report, interval: float = 1.0) -> None: + try: + while not self._stop_event.is_set(): + self.logger.info(report.model_dump()) + await aio.sleep(interval) + except aio.CancelledError: + self.logger.info("Report logger cancelled") + raise + + async def start(self, report: Report) -> None: + if not self.sensor_connected: + self.logger.warning("No OBD adapter connected; telemetry polling not started") + return + + scans = self.scans + scans[0].callback = report.set_speed + scans[1].callback = report.set_fuel + scans[2].callback = report.set_oil_temp + scans[3].callback = report.set_rpm + scans[4].callback = report.set_coolant_temp + + self._tasks = [aio.create_task(self.print_report(report))] + self._tasks.extend(aio.create_task(self.sensor_data_loop(scan)) for scan in scans) + + try: + await aio.gather(*self._tasks) + finally: + await self.stop() + + async def stop(self) -> None: + self._stop_event.set() + + current = aio.current_task() + for task in self._tasks: + if task is not current and not task.done(): + task.cancel() + + for task in self._tasks: + if task is current: + continue + with contextlib.suppress(aio.CancelledError): + await task + + self._tasks.clear() + + if self._query_worker_task is not None: + self._query_worker_task.cancel() + with contextlib.suppress(aio.CancelledError): + await self._query_worker_task + self._query_worker_task = None + + if self._ttl_config_task is not None: + self._ttl_config_task.cancel() + with contextlib.suppress(aio.CancelledError): + await self._ttl_config_task + self._ttl_config_task = None + + await self.disconnect() + + +class SimulatedOBD2Interface(OBD2Interface): + def __init__( + self, + logger: logging.Logger, + query_rate_limit: float = 10.0, + ttl_config_path: str | None = None, + ttl_config_scan_interval: float = 2.0, + ): + super().__init__( + logger, + connection=SimulatedOBDConnection(), + query_rate_limit=query_rate_limit, + ttl_config_path=ttl_config_path, + ttl_config_scan_interval=ttl_config_scan_interval, + ) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s.%(msecs)03d - [%(levelname)s] - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + try: + report = Report() + aio.run(OBD2Interface(logging.getLogger()).start(report)) + except KeyboardInterrupt: + logging.info("Program interrupted by user (Ctrl+C).") diff --git a/obd2_tui.py b/obd2_tui.py new file mode 100644 index 0000000..7b63c7e --- /dev/null +++ b/obd2_tui.py @@ -0,0 +1,390 @@ +import asyncio +import argparse +import contextlib +import logging +from typing import Any + +import obd +from textual.app import App, ComposeResult +from textual.containers import Container +from textual.widgets import DataTable, Digits, Footer, Header, Input, Label, RichLog, Static + +from models import Report +from obd2_interface import OBD2Interface, SimulatedOBD2Interface, format_obd_value + + +METRICS = [ + ("speed", "Speed", "mph"), + ("rpm", "RPM", ""), + ("fuel", "Fuel", "%"), + ("coolant-temp", "Coolant Temp", "F"), + ("oil-temp", "Oil Temp", "F"), +] + +MODE_LABELS = { + 1: "Mode 01 Current Data", + 2: "Mode 02 Freeze Frame", + 3: "Mode 03 Stored DTCs", + 4: "Mode 04 Clear DTCs", + 6: "Mode 06 Onboard Monitoring", + 7: "Mode 07 Pending DTCs", + 9: "Mode 09 Vehicle Info", +} + + +def format_metric_value(report: Report, metric_id: str) -> str: + values = { + "speed": f"{report.speed_mph:05.1f}", + "rpm": f"{report.rpm:05.0f}", + "fuel": f"{report.fuel_level_pct:05.1f}", + "oil-temp": f"{report.oil_temp_f:05.1f}", + "coolant-temp": f"{report.coolant_temp_f:05.1f}", + } + return values[metric_id] + + +class RichLogHandler(logging.Handler): + def __init__(self, app: "OBD2App") -> None: + super().__init__() + self.app = app + + def emit(self, record: logging.LogRecord) -> None: + message = self.format(record) + try: + self.app.call_from_thread(self.app.write_log, message) + except RuntimeError: + self.app.write_log(message) + + +class OBD2App(App[None]): + CSS_PATH = "ui.css" + POLL_CONCURRENCY = 12 + POLL_INTERVAL = 0.01 + RECONNECT_DELAY = 5.0 + BINDINGS = [ + ("q", "quit", "Quit"), + ("b", "toggle_border", "Toggle border"), + ("1", "select_mode(1)", "Mode 1"), + ("2", "select_mode(2)", "Mode 2"), + ("3", "select_mode(3)", "Mode 3"), + ("4", "select_mode(4)", "Mode 4"), + ("6", "select_mode(6)", "Mode 6"), + ("7", "select_mode(7)", "Mode 7"), + ("9", "select_mode(9)", "Mode 9"), + ] + + def __init__( + self, + report: Report, + interface_factory: type[OBD2Interface] = OBD2Interface, + query_rate_limit: float = 10.0, + ttl_config_path: str | None = None, + ): + super().__init__() + self.report = report + self.interface_factory = interface_factory + self.query_rate_limit = query_rate_limit + self.ttl_config_path = ttl_config_path + self.logger = logging.getLogger("obd2") + self.interface: OBD2Interface | None = None + self.poll_task: asyncio.Task[None] | None = None + self.double_border = False + self.selected_mode = 1 + self.last_command_values: dict[str, str] = {} + self.command_value_cache: dict[str, str] = {} + self.log_handler: RichLogHandler | None = None + self.ttl_edit_command: str | None = None + + def compose(self) -> ComposeResult: + yield Header() + yield Container( + Container(*[self.compose_metric(metric_id, label, unit) for metric_id, label, unit in METRICS], id="dashboard"), + Container( + DataTable(id="commands-table"), + Container( + Label("Selected TTL (ms)", id="ttl-editor-label"), + Input(placeholder="TTL in ms", id="ttl-editor"), + id="ttl-editor-pane", + ), + id="table-pane", + ), + id="main-pane", + ) + yield Static("", id="mode-banner") + yield RichLog(id="log-panel", auto_scroll=True, wrap=True, highlight=True, markup=False) + yield Footer() + + def compose_metric(self, metric_id: str, label: str, unit: str) -> Container: + unit_text = unit or "value" + return Container( + Label(label, classes="metric-label"), + Digits(format_metric_value(self.report, metric_id), id=f"{metric_id}-digits", classes="metric-digits"), + Label(unit_text, classes="metric-unit"), + classes="metric-card", + ) + + def on_mount(self) -> None: + self.configure_logging() + self.interface = self.interface_factory( + self.logger, + query_rate_limit=self.query_rate_limit, + ttl_config_path=self.ttl_config_path, + ) + self.configure_commands_table() + self.load_mode_table(self.selected_mode) + self.poll_task = asyncio.create_task(self.poll_commands()) + self.logger.info("Mounted OBD2 dashboard") + + async def on_unmount(self) -> None: + if self.interface is not None: + await self.interface.stop() + if self.poll_task is not None: + self.poll_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self.poll_task + self.teardown_logging() + + def action_toggle_border(self) -> None: + self.double_border = not self.double_border + border = ("double", "yellow") if self.double_border else ("solid", "white") + for card in self.query(".metric-card"): + card.styles.border = border + + def action_select_mode(self, mode: int) -> None: + if mode not in MODE_LABELS: + return + self.selected_mode = mode + self.load_mode_table(mode) + self.logger.info("Selected %s", MODE_LABELS[mode]) + + def configure_logging(self) -> None: + self.log_handler = RichLogHandler(self) + self.log_handler.setFormatter( + logging.Formatter( + "%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", + ) + ) + root_logger = logging.getLogger() + root_logger.handlers.clear() + root_logger.setLevel(logging.INFO) + root_logger.addHandler(self.log_handler) + self.logger.propagate = True + + def teardown_logging(self) -> None: + if self.log_handler is None: + return + root_logger = logging.getLogger() + with contextlib.suppress(ValueError): + root_logger.removeHandler(self.log_handler) + self.log_handler = None + + def write_log(self, message: str) -> None: + self.query_one("#log-panel", RichLog).write(message) + + def configure_commands_table(self) -> None: + table = self.query_one("#commands-table", DataTable) + table.cursor_type = "row" + table.zebra_stripes = True + table.add_column("Code", key="code", width=10) + table.add_column("Name", key="name", width=28) + table.add_column("Description", key="desc", width=44) + table.add_column("TTL (ms)", key="ttl", width=12) + table.add_column("Value", key="value", width=24) + + def load_mode_table(self, mode: int) -> None: + table = self.query_one("#commands-table", DataTable) + table.clear(columns=False) + self.last_command_values = {} + commands = self.get_mode_commands(mode) + for command in commands: + pid = getattr(command, "pid", None) + code = f"{mode:02X}.{pid:02X}" if pid is not None else f"{mode:02X}" + cached_value = self.command_value_cache.get(command.name, "--") + ttl_value = self.get_command_ttl_display(command.name) + table.add_row(code, command.name, getattr(command, "desc", ""), ttl_value, cached_value, key=command.name) + self.last_command_values[command.name] = cached_value + banner = self.query_one("#mode-banner", Static) + banner.update(f"{MODE_LABELS[mode]} Press 1,2,3,4,6,7,9 to switch modes") + if commands: + self.set_ttl_editor(commands[0].name) + + def get_mode_commands(self, mode: int) -> list[object]: + return [command for command in obd.commands[mode] if command is not None] + + def get_command_ttl_display(self, command_name: str) -> str: + if self.interface is None: + return "--" + return str(self.interface.get_command_ttl_ms(command_name)) + + def set_ttl_editor(self, command_name: str) -> None: + self.ttl_edit_command = command_name + self.query_one("#ttl-editor-label", Label).update(f"Selected TTL (ms) for {command_name}") + self.query_one("#ttl-editor", Input).value = self.get_command_ttl_display(command_name) + + def update_metric_from_command(self, metric_id: str, raw_value: object) -> None: + if self.interface is None: + return + numeric_value = self.interface.normalize_metric_value(metric_id, raw_value) + setters = { + "speed": self.report.set_speed, + "rpm": self.report.set_rpm, + "fuel": self.report.set_fuel, + "oil-temp": self.report.set_oil_temp, + "coolant-temp": self.report.set_coolant_temp, + } + setters[metric_id](numeric_value) + self.query_one(f"#{metric_id}-digits", Digits).update(format_metric_value(self.report, metric_id)) + + def build_query_plan(self, mode: int) -> list[tuple[str | None, Any]]: + if self.interface is None: + return [] + + query_plan: list[tuple[str | None, Any]] = [] + seen_names: set[str] = set() + + for metric_id, command in self.interface.telemetry_commands.items(): + command_name = getattr(command, "name", str(command)) + query_plan.append((metric_id, command)) + seen_names.add(command_name) + + visible_command_names = self.get_visible_command_names() + for command in self.get_mode_commands(mode): + command_name = getattr(command, "name", str(command)) + if command_name not in visible_command_names: + continue + if command_name in seen_names: + continue + query_plan.append((None, command)) + seen_names.add(command_name) + + return query_plan + + def get_visible_command_names(self) -> set[str]: + table = self.query_one("#commands-table", DataTable) + if table.row_count == 0: + return set() + + first_visible_row = max(0, int(table.scroll_y)) + visible_height = max(1, table.scrollable_content_region.height) + last_visible_row = min(table.row_count, first_visible_row + visible_height) + + visible_names: set[str] = set() + for row_index in range(first_visible_row, last_visible_row): + row = table.get_row_at(row_index) + if len(row) < 2: + continue + visible_names.add(str(row[1])) + return visible_names + + async def query_commands_concurrently(self, query_plan: list[tuple[str | None, Any]]) -> list[tuple[str | None, Any, Any | None]]: + if self.interface is None: + return [] + + semaphore = asyncio.Semaphore(self.POLL_CONCURRENCY) + + async def query_one(metric_id: str | None, command: Any) -> tuple[str | None, Any, Any | None]: + async with semaphore: + raw_value = await self.interface.query_command(command) + return (metric_id, command, raw_value) + + tasks = [query_one(metric_id, command) for metric_id, command in query_plan] + return await asyncio.gather(*tasks) + + def apply_query_results(self, results: list[tuple[str | None, Any, Any | None]]) -> None: + table = self.query_one("#commands-table", DataTable) + + for metric_id, command, raw_value in results: + command_name = getattr(command, "name", str(command)) + value = "--" if raw_value is None else format_obd_value(raw_value) + + if metric_id is not None and raw_value is not None: + self.update_metric_from_command(metric_id, raw_value) + + if self.last_command_values.get(command_name) == value: + continue + + try: + table.update_cell(command_name, "value", value) + self.last_command_values[command_name] = value + self.command_value_cache[command_name] = value + except Exception: + continue + + def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: + self.set_ttl_editor(str(event.row_key)) + + async def on_input_submitted(self, event: Input.Submitted) -> None: + if event.input.id != "ttl-editor" or self.interface is None or self.ttl_edit_command is None: + return + + raw_value = event.value.strip() + try: + ttl_ms = int(raw_value) + except ValueError: + self.logger.error("Invalid TTL value: %s", raw_value) + self.set_ttl_editor(self.ttl_edit_command) + return + + await self.interface.update_ttl_override(self.ttl_edit_command, ttl_ms) + self.query_one("#commands-table", DataTable).update_cell(self.ttl_edit_command, "ttl", str(ttl_ms)) + self.logger.info("Updated TTL for %s to %d ms", self.ttl_edit_command, ttl_ms) + self.set_ttl_editor(self.ttl_edit_command) + + async def poll_commands(self) -> None: + while True: + if self.interface is None: + await asyncio.sleep(0.1) + continue + + try: + await self.interface.connect() + mode = self.selected_mode + query_plan = self.build_query_plan(mode) + results = await self.query_commands_concurrently(query_plan) + + if mode == self.selected_mode: + self.apply_query_results(results) + + await asyncio.sleep(self.POLL_INTERVAL) + except ConnectionError as exc: + self.logger.error("%s", exc) + await asyncio.sleep(self.RECONNECT_DELAY) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="OBD2 telemetry dashboard") + parser.add_argument( + "--simulated", + action="store_true", + help="Run the dashboard with continuously generated telemetry data", + ) + parser.add_argument( + "--qps", + type=float, + default=10.0, + help="Maximum OBD queries per second across the app", + ) + parser.add_argument( + "--ttl-config", + default="command_ttl.conf", + help="Path to COMMAND,ttl_ms cache TTL overrides file", + ) + return parser + + +if __name__ == "__main__": + args = build_parser().parse_args() + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s.%(msecs)03d - [%(levelname)s] - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + interface_factory = SimulatedOBD2Interface if args.simulated else OBD2Interface + app = OBD2App( + Report(), + interface_factory=interface_factory, + query_rate_limit=args.qps, + ttl_config_path=args.ttl_config, + ) + app.run() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..390f51a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +python-can[serial]==4.5.0 +obd==0.7.3 +pydantic==2.12.5 +textual==6.8.0 +textual-dev==1.8.0 diff --git a/ui.css b/ui.css new file mode 100644 index 0000000..71252f5 --- /dev/null +++ b/ui.css @@ -0,0 +1,80 @@ +Screen { + layout: vertical; +} + +#dashboard { + layout: grid; + grid-size: 3 2; + grid-gutter: 1 2; + padding: 1 0; + height: auto; + width: 72; +} + +#main-pane { + layout: horizontal; + height: 1fr; + margin: 0 2; +} + +#table-pane { + layout: vertical; + width: 1fr; +} + +#mode-banner { + padding: 0 2; + color: $text; + text-style: bold; +} + +.metric-card { + border: solid white; + padding: 1; + height: 10; +} + +.metric-label { + text-style: bold; + color: $text; +} + +.metric-digits { + color: ansi_bright_green; + text-style: bold; + margin: 1 0; + height: 3; +} + +.metric-unit { + color: $text-muted; +} + +#commands-table { + height: 1fr; + width: 1fr; + border: heavy $primary; + margin: 1 0 0 2; +} + +#ttl-editor-pane { + layout: horizontal; + height: 3; + margin: 1 0 0 2; + align: left middle; +} + +#ttl-editor-label { + width: 28; + content-align: left middle; +} + +#ttl-editor { + width: 16; +} + +#log-panel { + height: 12; + border: heavy $accent; + margin: 0 2 1 2; +}