Files
odbc-dashboard/obd2_interface.py
Jimmy Allen a994900a6a initial
2026-04-01 19:34:26 -04:00

606 lines
22 KiB
Python

import asyncio as aio
import contextlib
import logging
import math
from pathlib import Path
import time
from dataclasses import dataclass
from numbers import Number
from typing import Any
import obd
from models import Report, Scan
def _quantity_to_float(value: Any, unit: str | None = None) -> float:
if value is None:
raise ValueError("Cannot normalize an empty OBD value")
if unit and hasattr(value, "to"):
value = value.to(unit)
if hasattr(value, "magnitude"):
value = value.magnitude
return float(value)
def format_obd_value(value: Any) -> str:
if value is None:
return "--"
if isinstance(value, str):
return value
if hasattr(value, "magnitude"):
return str(value)
if isinstance(value, Number):
numeric = float(value)
if numeric.is_integer():
return str(int(numeric))
return f"{numeric:.1f}"
if isinstance(value, (list, tuple, set)):
return ", ".join(str(item) for item in value) or "--"
return str(value)
@dataclass
class CacheEntry:
value: Any
expires_at: float
class SimulatedOBDConnection:
def __init__(self) -> None:
self._start = time.monotonic()
self._closed = False
def is_connected(self) -> bool:
return not self._closed
def close(self) -> None:
self._closed = True
def query(self, cmd: Any) -> Any:
value = self._value_for_command(cmd)
return type(
"SimulatedResponse",
(),
{
"value": value,
"is_null": staticmethod(lambda: False),
},
)()
def _value_for_command(self, cmd: Any) -> float:
elapsed = time.monotonic() - self._start
name = getattr(cmd, "name", str(cmd))
mode = getattr(cmd, "mode", None)
if name == "SPEED":
return 45.0 + 20.0 * math.sin(elapsed / 2.5)
if name == "RPM":
return 1800.0 + 900.0 * (1.0 + math.sin(elapsed * 1.7))
if name == "FUEL_LEVEL":
return max(0.0, 85.0 - elapsed * 0.03)
if name == "OIL_TEMP":
return 185.0 + 12.0 * math.sin(elapsed / 8.0)
if name == "COOLANT_TEMP":
return 195.0 + 8.0 * math.sin(elapsed / 10.0)
if name == "VIN":
return "1HGBH41JXMN109186"
if "CALIBRATION_ID" in name:
return "CALID-TEST-001"
if name == "CVN":
return "A1B2C3D4"
if "MESSAGE_COUNT" in name:
return 1
if name in {"GET_DTC", "GET_CURRENT_DTC"}:
return "P0301, P0420"
if name == "CLEAR_DTC":
return "READY"
if name.startswith(("PIDS_", "DTC_PIDS_", "MIDS_")):
return "FFFF"
if mode == 6:
base = 40.0 + (sum(ord(char) for char in name) % 30)
return base + 10.0 * math.sin(elapsed / 3.0)
if mode in {1, 2, 9}:
base = 10.0 + (sum(ord(char) for char in name) % 80)
return base + 5.0 * math.sin(elapsed / 4.0)
return 0.0
class OBD2Interface:
def __init__(
self,
logger: logging.Logger,
connection: Any | None = None,
query_rate_limit: float = 10.0,
cache_ttl: float = 0.25,
ttl_config_path: str | None = None,
ttl_config_scan_interval: float = 2.0,
):
self.logger = logger
self.connection = connection
self._owns_connection = connection is None
self.query_rate_limit = query_rate_limit
self.cache_ttl = cache_ttl
self.ttl_config_path = Path(ttl_config_path) if ttl_config_path else None
self.ttl_config_scan_interval = ttl_config_scan_interval
self.sensor_connected = self._is_connected() if connection is not None else False
self._stop_event = aio.Event()
self._tasks: list[aio.Task[Any]] = []
self._rate_limit_lock = aio.Lock()
self._cache_lock = aio.Lock()
self._next_query_time = 0.0
self._query_cache: dict[str, CacheEntry] = {}
self._command_ttls: dict[str, float] = {}
self._ttl_config_mtime: float | None = None
self._query_queue: aio.Queue[tuple[Any, str, aio.Future[Any | None]]] = aio.Queue()
self._pending_queries: dict[str, aio.Future[Any | None]] = {}
self._pending_lock = aio.Lock()
self._query_worker_task: aio.Task[None] | None = None
self._ttl_config_task: aio.Task[None] | None = None
def _is_connected(self) -> bool:
if self.connection is None:
return False
is_connected = getattr(self.connection, "is_connected", None)
if callable(is_connected):
return bool(is_connected())
return self.connection is not None
async def connect(self) -> None:
if self.sensor_connected and self.connection is not None:
self._ensure_query_worker()
self._ensure_ttl_config_task()
return
if not self._owns_connection and self.connection is not None:
self.sensor_connected = self._is_connected()
if not self.sensor_connected:
raise ConnectionError("OBD adapter is not connected")
self._ensure_query_worker()
self._ensure_ttl_config_task()
return
try:
self.logger.info("Attempting to connect to OBD adapter")
self.connection = await aio.to_thread(obd.OBD)
self.sensor_connected = self._is_connected()
if not self.sensor_connected:
raise ConnectionError("OBD adapter not found")
self._ensure_query_worker()
self._ensure_ttl_config_task()
self.logger.info("Connected to OBD adapter")
except Exception as exc:
self.sensor_connected = False
self.connection = None
raise ConnectionError(f"Unable to connect to OBD adapter: {exc}") from exc
async def disconnect(self) -> None:
self.sensor_connected = False
close = getattr(self.connection, "close", None)
self.connection = None
async with self._cache_lock:
self._query_cache.clear()
await self._fail_pending_queries(ConnectionError("OBD adapter is not connected"))
if callable(close):
await aio.to_thread(close)
@property
def scans(self) -> list[Scan]:
return [
Scan(
cmd=obd.commands.SPEED,
interval=0.25,
callback=lambda value: None,
transform=lambda value: _quantity_to_float(value, "mph"),
),
Scan(
cmd=obd.commands.FUEL_LEVEL,
interval=10.0,
callback=lambda value: None,
transform=lambda value: _quantity_to_float(value),
),
Scan(
cmd=obd.commands.OIL_TEMP,
interval=5.0,
callback=lambda value: None,
transform=lambda value: _quantity_to_float(value, "degF"),
),
Scan(
cmd=obd.commands.RPM,
interval=0.25,
callback=lambda value: None,
transform=lambda value: _quantity_to_float(value),
),
Scan(
cmd=obd.commands.COOLANT_TEMP,
interval=5.0,
callback=lambda value: None,
transform=lambda value: _quantity_to_float(value, "degF"),
),
]
@property
def telemetry_commands(self) -> dict[str, Any]:
return {
"speed": obd.commands.SPEED,
"rpm": obd.commands.RPM,
"fuel": obd.commands.FUEL_LEVEL,
"oil-temp": obd.commands.OIL_TEMP,
"coolant-temp": obd.commands.COOLANT_TEMP,
}
def normalize_metric_value(self, metric_id: str, value: Any) -> float:
transforms = {
"speed": lambda raw: _quantity_to_float(raw, "mph"),
"rpm": lambda raw: _quantity_to_float(raw),
"fuel": lambda raw: _quantity_to_float(raw),
"oil-temp": lambda raw: _quantity_to_float(raw, "degF"),
"coolant-temp": lambda raw: _quantity_to_float(raw, "degF"),
}
return transforms[metric_id](value)
async def query_command(self, cmd: Any) -> Any | None:
if not self.sensor_connected or self.connection is None:
raise ConnectionError("OBD adapter is not connected")
cache_key = self._cache_key(cmd)
cached_value = await self._get_cached_value(cache_key)
if cached_value is not None:
return cached_value
self._ensure_query_worker()
return await self._enqueue_query(cmd, cache_key)
def _cache_key(self, cmd: Any) -> str:
return getattr(cmd, "name", str(cmd))
async def _get_cached_value(self, cache_key: str) -> Any | None:
ttl = self._ttl_for_command(cache_key)
if ttl <= 0:
return None
now = aio.get_running_loop().time()
async with self._cache_lock:
entry = self._query_cache.get(cache_key)
if entry is None:
return None
if entry.expires_at <= now:
self._query_cache.pop(cache_key, None)
return None
return entry.value
async def _set_cached_value(self, cache_key: str, value: Any) -> None:
ttl = self._ttl_for_command(cache_key)
if ttl <= 0:
return
expires_at = aio.get_running_loop().time() + ttl
async with self._cache_lock:
self._query_cache[cache_key] = CacheEntry(value=value, expires_at=expires_at)
def _ttl_for_command(self, cache_key: str) -> float:
return self._command_ttls.get(cache_key, self.cache_ttl)
def get_command_ttl_ms(self, command_name: str) -> int:
return int(round(self._ttl_for_command(command_name) * 1000.0))
def _ensure_ttl_config_task(self) -> None:
if self.ttl_config_path is None:
return
if self._ttl_config_task is None or self._ttl_config_task.done():
self._ttl_config_task = aio.create_task(self._watch_ttl_config())
async def _watch_ttl_config(self) -> None:
await self.reload_ttl_config(force=True)
while not self._stop_event.is_set():
await aio.sleep(self.ttl_config_scan_interval)
await self.reload_ttl_config()
async def reload_ttl_config(self, force: bool = False) -> None:
if self.ttl_config_path is None:
return
try:
stat_result = await aio.to_thread(self.ttl_config_path.stat)
except FileNotFoundError:
if self._command_ttls:
self.logger.warning("TTL config file not found: %s", self.ttl_config_path)
self._command_ttls = {}
self._ttl_config_mtime = None
return
mtime = stat_result.st_mtime
if not force and self._ttl_config_mtime == mtime:
return
contents = await aio.to_thread(self.ttl_config_path.read_text, "utf-8")
overrides = self._parse_ttl_config(contents)
self._command_ttls = overrides
self._ttl_config_mtime = mtime
self.logger.info("Loaded %d TTL overrides from %s", len(overrides), self.ttl_config_path.name)
def _parse_ttl_config(self, contents: str) -> dict[str, float]:
overrides: dict[str, float] = {}
for line_number, raw_line in enumerate(contents.splitlines(), start=1):
line = raw_line.strip()
if not line or line.startswith("#"):
continue
try:
command_name, ttl_ms = [part.strip() for part in line.split(",", 1)]
except ValueError:
self.logger.warning("Invalid TTL config line %d: %s", line_number, raw_line)
continue
if not command_name:
self.logger.warning("Missing command name on TTL config line %d", line_number)
continue
try:
overrides[command_name] = max(0.0, float(ttl_ms) / 1000.0)
except ValueError:
self.logger.warning("Invalid TTL value on line %d: %s", line_number, ttl_ms)
return overrides
async def update_ttl_override(self, command_name: str, ttl_ms: int) -> None:
if self.ttl_config_path is None:
raise ValueError("TTL config path is not configured")
ttl_ms = max(0, int(ttl_ms))
lines: list[str] = []
found = False
if self.ttl_config_path.exists():
contents = await aio.to_thread(self.ttl_config_path.read_text, "utf-8")
for raw_line in contents.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
lines.append(raw_line)
continue
try:
existing_command, _existing_ttl = [part.strip() for part in raw_line.split(",", 1)]
except ValueError:
lines.append(raw_line)
continue
if existing_command == command_name:
lines.append(f"{command_name},{ttl_ms}")
found = True
else:
lines.append(raw_line)
if not found:
if lines and lines[-1].strip():
lines.append("")
lines.append(f"{command_name},{ttl_ms}")
text = "\n".join(lines).rstrip() + "\n"
await aio.to_thread(self.ttl_config_path.write_text, text, "utf-8")
await self.reload_ttl_config(force=True)
def _ensure_query_worker(self) -> None:
if self._query_worker_task is None or self._query_worker_task.done():
self._query_worker_task = aio.create_task(self._query_worker())
async def _enqueue_query(self, cmd: Any, cache_key: str) -> Any | None:
async with self._pending_lock:
existing = self._pending_queries.get(cache_key)
if existing is not None:
return await existing
future: aio.Future[Any | None] = aio.get_running_loop().create_future()
self._pending_queries[cache_key] = future
await self._query_queue.put((cmd, cache_key, future))
return await future
async def _query_worker(self) -> None:
while not self._stop_event.is_set():
cmd: Any | None = None
cache_key = ""
future: aio.Future[Any | None] | None = None
try:
cmd, cache_key, future = await self._query_queue.get()
if future.cancelled():
continue
result = await self._execute_query(cmd, cache_key)
if not future.done():
future.set_result(result)
except aio.CancelledError:
raise
except Exception as exc:
if future is not None and not future.done():
future.set_exception(exc)
finally:
if future is not None:
async with self._pending_lock:
current = self._pending_queries.get(cache_key)
if current is future:
self._pending_queries.pop(cache_key, None)
if cmd is not None:
self._query_queue.task_done()
async def _execute_query(self, cmd: Any, cache_key: str) -> Any | None:
await self._acquire_query_slot()
connection = self.connection
if connection is None or not self.sensor_connected:
raise ConnectionError("OBD adapter is not connected")
try:
response = await aio.to_thread(connection.query, cmd)
except Exception as exc:
await self.disconnect()
raise ConnectionError(f"OBD query failed for {cmd}: {exc}") from exc
if response is None or getattr(response, "is_null", lambda: False)():
self.logger.debug("No response for %s", cmd)
return None
value = getattr(response, "value", None)
if value is None:
self.logger.debug("Empty response for %s", cmd)
return None
await self._set_cached_value(cache_key, value)
return value
async def _fail_pending_queries(self, exc: Exception) -> None:
async with self._pending_lock:
pending = list(self._pending_queries.values())
self._pending_queries.clear()
for future in pending:
if not future.done():
future.set_exception(exc)
while not self._query_queue.empty():
try:
_cmd, _cache_key, future = self._query_queue.get_nowait()
except aio.QueueEmpty:
break
if not future.done():
future.set_exception(exc)
self._query_queue.task_done()
async def _acquire_query_slot(self) -> None:
if self.query_rate_limit <= 0:
return
interval = 1.0 / self.query_rate_limit
loop = aio.get_running_loop()
async with self._rate_limit_lock:
now = loop.time()
wait_time = max(0.0, self._next_query_time - now)
scheduled_time = max(now, self._next_query_time)
self._next_query_time = scheduled_time + interval
if wait_time > 0:
await aio.sleep(wait_time)
async def query_value(self, scan: Scan) -> float | None:
value = await self.query_command(scan.cmd)
if value is None:
return None
if scan.transform is None:
return _quantity_to_float(value)
return scan.transform(value)
async def query_display_value(self, cmd: Any) -> str:
value = await self.query_command(cmd)
if value is None:
return "--"
return format_obd_value(value)
async def sensor_data_loop(self, scan: Scan) -> None:
if scan.callback is None:
return
self.logger.info(
"Starting sensor acquisition for %s at %.2fs interval",
scan.cmd,
scan.interval,
)
try:
while self.sensor_connected and not self._stop_event.is_set():
value = await self.query_value(scan)
if value is not None:
scan.callback(value)
await aio.sleep(scan.interval)
except ConnectionError as exc:
if self._stop_event.is_set():
self.logger.info("Sensor loop stopped for %s during shutdown", scan.cmd)
return
self.logger.error("Sensor loop lost connection for %s: %s", scan.cmd, exc)
self.sensor_connected = False
self._stop_event.set()
raise
except aio.CancelledError:
self.logger.info("Sensor task for %s cancelled", scan.cmd)
raise
except Exception:
self.logger.exception("Sensor loop failed for %s", scan.cmd)
self.sensor_connected = False
self._stop_event.set()
raise
finally:
self.logger.info("Sensor loop terminated for %s", scan.cmd)
async def print_report(self, report: Report, interval: float = 1.0) -> None:
try:
while not self._stop_event.is_set():
self.logger.info(report.model_dump())
await aio.sleep(interval)
except aio.CancelledError:
self.logger.info("Report logger cancelled")
raise
async def start(self, report: Report) -> None:
if not self.sensor_connected:
self.logger.warning("No OBD adapter connected; telemetry polling not started")
return
scans = self.scans
scans[0].callback = report.set_speed
scans[1].callback = report.set_fuel
scans[2].callback = report.set_oil_temp
scans[3].callback = report.set_rpm
scans[4].callback = report.set_coolant_temp
self._tasks = [aio.create_task(self.print_report(report))]
self._tasks.extend(aio.create_task(self.sensor_data_loop(scan)) for scan in scans)
try:
await aio.gather(*self._tasks)
finally:
await self.stop()
async def stop(self) -> None:
self._stop_event.set()
current = aio.current_task()
for task in self._tasks:
if task is not current and not task.done():
task.cancel()
for task in self._tasks:
if task is current:
continue
with contextlib.suppress(aio.CancelledError):
await task
self._tasks.clear()
if self._query_worker_task is not None:
self._query_worker_task.cancel()
with contextlib.suppress(aio.CancelledError):
await self._query_worker_task
self._query_worker_task = None
if self._ttl_config_task is not None:
self._ttl_config_task.cancel()
with contextlib.suppress(aio.CancelledError):
await self._ttl_config_task
self._ttl_config_task = None
await self.disconnect()
class SimulatedOBD2Interface(OBD2Interface):
def __init__(
self,
logger: logging.Logger,
query_rate_limit: float = 10.0,
ttl_config_path: str | None = None,
ttl_config_scan_interval: float = 2.0,
):
super().__init__(
logger,
connection=SimulatedOBDConnection(),
query_rate_limit=query_rate_limit,
ttl_config_path=ttl_config_path,
ttl_config_scan_interval=ttl_config_scan_interval,
)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d - [%(levelname)s] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
try:
report = Report()
aio.run(OBD2Interface(logging.getLogger()).start(report))
except KeyboardInterrupt:
logging.info("Program interrupted by user (Ctrl+C).")