270 lines
10 KiB
Python
270 lines
10 KiB
Python
import asyncio
|
|
import logging
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from models import Report
|
|
from obd2_interface import OBD2Interface, SimulatedOBD2Interface, SimulatedOBDConnection, format_obd_value
|
|
from obd2_tui import OBD2App, format_metric_value, parse_duration_ms
|
|
|
|
|
|
class FakeQuantity:
|
|
def __init__(self, magnitude: float):
|
|
self.magnitude = magnitude
|
|
|
|
def to(self, _unit: str) -> "FakeQuantity":
|
|
return self
|
|
|
|
|
|
class FakeResponse:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def is_null(self) -> bool:
|
|
return self.value is None
|
|
|
|
|
|
class FakeConnection:
|
|
def __init__(self, values):
|
|
self.values = list(values)
|
|
self.closed = False
|
|
self.query_count = 0
|
|
|
|
def is_connected(self) -> bool:
|
|
return True
|
|
|
|
def query(self, _cmd):
|
|
self.query_count += 1
|
|
if self.values:
|
|
return self.values.pop(0)
|
|
return FakeResponse(None)
|
|
|
|
def close(self) -> None:
|
|
self.closed = True
|
|
|
|
|
|
class ReportTests(unittest.TestCase):
|
|
def test_setters_normalize_quantities(self) -> None:
|
|
report = Report()
|
|
report.set_speed(FakeQuantity(32.5))
|
|
report.set_rpm(FakeQuantity(1800))
|
|
report.set_fuel(FakeQuantity(67.0))
|
|
report.set_oil_temp(FakeQuantity(194))
|
|
report.set_coolant_temp(FakeQuantity(201))
|
|
|
|
self.assertEqual(report.speed_mph, 32.5)
|
|
self.assertEqual(report.rpm, 1800.0)
|
|
self.assertEqual(report.fuel_level_pct, 67.0)
|
|
self.assertEqual(report.oil_temp_f, 194.0)
|
|
self.assertEqual(report.coolant_temp_f, 201.0)
|
|
|
|
|
|
class InterfaceTests(unittest.IsolatedAsyncioTestCase):
|
|
async def test_query_value_returns_none_for_empty_response(self) -> None:
|
|
interface = OBD2Interface(logging.getLogger("test"), connection=FakeConnection([FakeResponse(None)]))
|
|
value = await interface.query_value(interface.scans[0])
|
|
self.assertIsNone(value)
|
|
|
|
async def test_query_rate_limit_defaults_to_ten_per_second(self) -> None:
|
|
interface = OBD2Interface(
|
|
logging.getLogger("test"),
|
|
connection=FakeConnection([FakeResponse(1), FakeResponse(2)]),
|
|
)
|
|
|
|
start = asyncio.get_running_loop().time()
|
|
await asyncio.gather(
|
|
interface.query_command(interface.telemetry_commands["speed"]),
|
|
interface.query_command(interface.telemetry_commands["rpm"]),
|
|
)
|
|
elapsed = asyncio.get_running_loop().time() - start
|
|
|
|
self.assertGreaterEqual(elapsed, 0.09)
|
|
|
|
async def test_query_command_uses_ttl_cache(self) -> None:
|
|
connection = FakeConnection([FakeResponse(12.3)])
|
|
interface = OBD2Interface(
|
|
logging.getLogger("test"),
|
|
connection=connection,
|
|
cache_ttl=0.25,
|
|
)
|
|
|
|
command = interface.telemetry_commands["speed"]
|
|
first = await interface.query_command(command)
|
|
second = await interface.query_command(command)
|
|
|
|
self.assertEqual(first, 12.3)
|
|
self.assertEqual(second, 12.3)
|
|
self.assertEqual(connection.query_count, 1)
|
|
|
|
async def test_query_command_cache_expires(self) -> None:
|
|
connection = FakeConnection([FakeResponse(12.3), FakeResponse(45.6)])
|
|
interface = OBD2Interface(
|
|
logging.getLogger("test"),
|
|
connection=connection,
|
|
cache_ttl=0.01,
|
|
)
|
|
|
|
command = interface.telemetry_commands["speed"]
|
|
first = await interface.query_command(command)
|
|
await asyncio.sleep(0.02)
|
|
second = await interface.query_command(command)
|
|
|
|
self.assertEqual(first, 12.3)
|
|
self.assertEqual(second, 45.6)
|
|
self.assertEqual(connection.query_count, 2)
|
|
|
|
async def test_ttl_config_overrides_cache_ttl(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config_path = Path(tmp_dir) / "command_ttl.conf"
|
|
config_path.write_text("SPEED,10\n", encoding="utf-8")
|
|
connection = FakeConnection([FakeResponse(12.3), FakeResponse(45.6)])
|
|
interface = OBD2Interface(
|
|
logging.getLogger("test"),
|
|
connection=connection,
|
|
cache_ttl=10.0,
|
|
ttl_config_path=str(config_path),
|
|
)
|
|
|
|
await interface.reload_ttl_config(force=True)
|
|
command = interface.telemetry_commands["speed"]
|
|
first = await interface.query_command(command)
|
|
await asyncio.sleep(0.02)
|
|
second = await interface.query_command(command)
|
|
|
|
self.assertEqual(first, 12.3)
|
|
self.assertEqual(second, 45.6)
|
|
self.assertEqual(connection.query_count, 2)
|
|
|
|
async def test_ttl_config_reload_updates_overrides(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config_path = Path(tmp_dir) / "command_ttl.conf"
|
|
config_path.write_text("SPEED,250\n", encoding="utf-8")
|
|
interface = OBD2Interface(
|
|
logging.getLogger("test"),
|
|
connection=FakeConnection([FakeResponse(12.3)]),
|
|
ttl_config_path=str(config_path),
|
|
)
|
|
|
|
await interface.reload_ttl_config(force=True)
|
|
self.assertEqual(interface._ttl_for_command("SPEED"), 0.25)
|
|
|
|
await asyncio.sleep(0.02)
|
|
config_path.write_text("SPEED,1000\nRPM,100\n", encoding="utf-8")
|
|
await interface.reload_ttl_config(force=True)
|
|
|
|
self.assertEqual(interface._ttl_for_command("SPEED"), 1.0)
|
|
self.assertEqual(interface._ttl_for_command("RPM"), 0.1)
|
|
|
|
async def test_update_ttl_override_writes_config_and_reloads(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config_path = Path(tmp_dir) / "command_ttl.conf"
|
|
config_path.write_text("SPEED,250\nRPM,500\n", encoding="utf-8")
|
|
interface = OBD2Interface(
|
|
logging.getLogger("test"),
|
|
connection=FakeConnection([FakeResponse(1)]),
|
|
ttl_config_path=str(config_path),
|
|
)
|
|
|
|
await interface.reload_ttl_config(force=True)
|
|
await interface.update_ttl_override("SPEED", 750)
|
|
await interface.update_ttl_override("FUEL_LEVEL", 1000)
|
|
|
|
self.assertEqual(interface.get_command_ttl_ms("SPEED"), 750)
|
|
self.assertEqual(interface.get_command_ttl_ms("FUEL_LEVEL"), 1000)
|
|
contents = config_path.read_text(encoding="utf-8")
|
|
self.assertIn("SPEED,750", contents)
|
|
self.assertIn("FUEL_LEVEL,1000", contents)
|
|
|
|
async def test_sensor_loop_updates_report_without_fake_values(self) -> None:
|
|
connection = FakeConnection([FakeResponse(FakeQuantity(41.2))])
|
|
interface = OBD2Interface(logging.getLogger("test"), connection=connection)
|
|
report = Report()
|
|
scan = interface.scans[0].model_copy(update={"callback": report.set_speed, "interval": 0.01})
|
|
|
|
task = asyncio.create_task(interface.sensor_data_loop(scan))
|
|
await asyncio.sleep(0.03)
|
|
await interface.stop()
|
|
await task
|
|
|
|
self.assertEqual(report.speed_mph, 41.2)
|
|
self.assertTrue(connection.closed)
|
|
|
|
async def test_simulated_connection_generates_all_telemetry(self) -> None:
|
|
interface = SimulatedOBD2Interface(logging.getLogger("test"), query_rate_limit=200.0)
|
|
report = Report()
|
|
|
|
scans = interface.scans
|
|
scans[0].callback = report.set_speed
|
|
scans[1].callback = report.set_fuel
|
|
scans[2].callback = report.set_oil_temp
|
|
scans[3].callback = report.set_rpm
|
|
scans[4].callback = report.set_coolant_temp
|
|
|
|
tasks = [asyncio.create_task(interface.sensor_data_loop(scan.model_copy(update={"interval": 0.01}))) for scan in scans]
|
|
await asyncio.sleep(0.05)
|
|
await interface.stop()
|
|
await asyncio.gather(*tasks)
|
|
|
|
self.assertGreater(report.speed_mph, 0.0)
|
|
self.assertGreater(report.rpm, 0.0)
|
|
self.assertGreater(report.fuel_level_pct, 0.0)
|
|
self.assertGreater(report.oil_temp_f, 0.0)
|
|
self.assertGreater(report.coolant_temp_f, 0.0)
|
|
|
|
def test_simulated_connection_reports_connected_until_closed(self) -> None:
|
|
connection = SimulatedOBDConnection()
|
|
self.assertTrue(connection.is_connected())
|
|
connection.close()
|
|
self.assertFalse(connection.is_connected())
|
|
|
|
async def test_query_display_value_formats_non_numeric_commands(self) -> None:
|
|
interface = SimulatedOBD2Interface(logging.getLogger("test"))
|
|
import obd
|
|
|
|
vin = await interface.query_display_value(obd.commands.VIN)
|
|
dtc = await interface.query_display_value(obd.commands.GET_DTC)
|
|
|
|
self.assertEqual(vin, "1HGBH41JXMN109186")
|
|
self.assertIn("P0301", dtc)
|
|
|
|
|
|
class DashboardTests(unittest.TestCase):
|
|
def test_format_metric_value_uses_report_fields(self) -> None:
|
|
report = Report(
|
|
speed_mph=55.4,
|
|
rpm=2100,
|
|
fuel_level_pct=75.0,
|
|
oil_temp_f=195.2,
|
|
coolant_temp_f=201.8,
|
|
)
|
|
|
|
self.assertEqual(format_metric_value(report, "speed"), "055.4")
|
|
self.assertEqual(format_metric_value(report, "rpm"), "02100")
|
|
self.assertEqual(format_metric_value(report, "fuel"), "075.0")
|
|
self.assertEqual(format_metric_value(report, "oil-temp"), "195.2")
|
|
self.assertEqual(format_metric_value(report, "coolant-temp"), "201.8")
|
|
|
|
def test_format_obd_value_formats_numeric_and_none(self) -> None:
|
|
self.assertEqual(format_obd_value(None), "--")
|
|
self.assertEqual(format_obd_value(42.0), "42")
|
|
self.assertEqual(format_obd_value(42.5), "42.5")
|
|
|
|
def test_mode_6_command_list_excludes_none_entries(self) -> None:
|
|
app = OBD2App(Report())
|
|
commands = app.get_mode_commands(6)
|
|
|
|
self.assertTrue(commands)
|
|
self.assertTrue(all(command is not None for command in commands))
|
|
|
|
def test_parse_duration_ms_supports_duration_suffixes(self) -> None:
|
|
self.assertEqual(parse_duration_ms("10ms"), 10)
|
|
self.assertEqual(parse_duration_ms("30s"), 30000)
|
|
self.assertEqual(parse_duration_ms("1m"), 60000)
|
|
self.assertEqual(parse_duration_ms("1h"), 3600000)
|
|
self.assertEqual(parse_duration_ms("250"), 250)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|