From 54b65088f7eb7187ade9753a1beb0c4183636f67 Mon Sep 17 00:00:00 2001 From: TJ Bruno Date: Wed, 25 Jan 2023 16:05:52 -0800 Subject: [PATCH 1/2] Add unittest --- test/test_slcan.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/test/test_slcan.py b/test/test_slcan.py index aa97e518b..bd0d8deb9 100644 --- a/test/test_slcan.py +++ b/test/test_slcan.py @@ -1,6 +1,13 @@ #!/usr/bin/env python +import os +import threading +import time import unittest +from platform import platform + +import pytest + import can @@ -123,5 +130,55 @@ def test_serial_number(self): self.assertIsNone(sn) +@pytest.fixture +def pseudo_terminal(): + import pty + + main, peripheral = pty.openpty() + + return main, peripheral + + +@pytest.fixture +def bus(pseudo_terminal): + _, peripheral = pseudo_terminal + return can.Bus(os.ttyname(peripheral), interface="slcan", sleep_after_open=0) + + +@pytest.fixture +def bus_writer(pseudo_terminal): + main, _ = pseudo_terminal + writer = os.fdopen(main, "wb") + return writer + + +@pytest.mark.skipif("linux" not in platform().lower(), reason="Requires Linux") +@pytest.mark.parametrize("timeout", [0.5, 1]) +def test_verify_recv_timeout(timeout, bus, bus_writer): + msg = b"Hello" + unterminated = msg + terminated = msg + b"\r" + + def consecutive_writes(): + bus_writer.write(terminated) + bus_writer.flush() + time.sleep(timeout / 2) + bus_writer.write(unterminated) + bus_writer.flush() + + timeout_ms = int(timeout * 1_000) + allowable_timeout_error = timeout_ms / 200 + + writer_thread = threading.Thread(target=consecutive_writes) + + start_time = time.time_ns() + writer_thread.start() + bus.recv(timeout) + stop_time = time.time_ns() + + duration_ms = int((stop_time - start_time) / 1_000_000) + assert duration_ms - timeout_ms < allowable_timeout_error + + if __name__ == "__main__": unittest.main() From a57e2a556122f2cc5a74d74989d9bb67c787d484 Mon Sep 17 00:00:00 2001 From: TJ Bruno Date: Thu, 26 Jan 2023 08:32:19 -0800 Subject: [PATCH 2/2] Improve test logic --- test/test_slcan.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/test/test_slcan.py b/test/test_slcan.py index bd0d8deb9..c38848373 100644 --- a/test/test_slcan.py +++ b/test/test_slcan.py @@ -148,28 +148,32 @@ def bus(pseudo_terminal): @pytest.fixture def bus_writer(pseudo_terminal): main, _ = pseudo_terminal - writer = os.fdopen(main, "wb") - return writer + return os.fdopen(main, "wb") @pytest.mark.skipif("linux" not in platform().lower(), reason="Requires Linux") @pytest.mark.parametrize("timeout", [0.5, 1]) def test_verify_recv_timeout(timeout, bus, bus_writer): - msg = b"Hello" - unterminated = msg - terminated = msg + b"\r" - def consecutive_writes(): - bus_writer.write(terminated) + # Recv timeout should always occur + # since the message is not delimited + msg = b"F00B4R" + + def delayed_consecutive_writes(): + bus_writer.write(msg) bus_writer.flush() - time.sleep(timeout / 2) - bus_writer.write(unterminated) + + # Delay until we're close to the end of + # timeout before sending another message + time.sleep(timeout * 0.75) + + bus_writer.write(msg) bus_writer.flush() timeout_ms = int(timeout * 1_000) allowable_timeout_error = timeout_ms / 200 - writer_thread = threading.Thread(target=consecutive_writes) + writer_thread = threading.Thread(target=delayed_consecutive_writes) start_time = time.time_ns() writer_thread.start()