diff --git a/test/test_slcan.py b/test/test_slcan.py index aa97e518b..c38848373 100644 --- a/test/test_slcan.py +++ b/test/test_slcan.py @@ -1,6 +1,13 @@ #!/usr/bin/env python +import os +import threading +import time import unittest +from platform import platform + +import pytest + import can @@ -123,5 +130,59 @@ def test_serial_number(self): self.assertIsNone(sn) +@pytest.fixture +def pseudo_terminal(): + import pty + + main, peripheral = pty.openpty() + + return main, peripheral + + +@pytest.fixture +def bus(pseudo_terminal): + _, peripheral = pseudo_terminal + return can.Bus(os.ttyname(peripheral), interface="slcan", sleep_after_open=0) + + +@pytest.fixture +def bus_writer(pseudo_terminal): + main, _ = pseudo_terminal + return os.fdopen(main, "wb") + + +@pytest.mark.skipif("linux" not in platform().lower(), reason="Requires Linux") +@pytest.mark.parametrize("timeout", [0.5, 1]) +def test_verify_recv_timeout(timeout, bus, bus_writer): + + # Recv timeout should always occur + # since the message is not delimited + msg = b"F00B4R" + + def delayed_consecutive_writes(): + bus_writer.write(msg) + bus_writer.flush() + + # Delay until we're close to the end of + # timeout before sending another message + time.sleep(timeout * 0.75) + + bus_writer.write(msg) + bus_writer.flush() + + timeout_ms = int(timeout * 1_000) + allowable_timeout_error = timeout_ms / 200 + + writer_thread = threading.Thread(target=delayed_consecutive_writes) + + start_time = time.time_ns() + writer_thread.start() + bus.recv(timeout) + stop_time = time.time_ns() + + duration_ms = int((stop_time - start_time) / 1_000_000) + assert duration_ms - timeout_ms < allowable_timeout_error + + if __name__ == "__main__": unittest.main()