From b1cd5fc68685866efb61ee0e6b5abb64648f4486 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Mon, 12 Aug 2024 10:41:05 +0200 Subject: [PATCH 1/2] Tests: harden test_emcy_consumer_wait Wait a little bit longer before dispatching the EMCY packet, and start the one-shot timer just before invoking the wait() call. --- test/test_emcy.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_emcy.py b/test/test_emcy.py index a48f24fc..704419a9 100644 --- a/test/test_emcy.py +++ b/test/test_emcy.py @@ -72,7 +72,7 @@ def test_emcy_consumer_reset(self): self.assertEqual(len(self.emcy.active), 0) def test_emcy_consumer_wait(self): - PAUSE = TIMEOUT / 4 + PAUSE = TIMEOUT / 2 def push_err(): self.emcy.on_emcy(0x81, b'\x01\x20\x01\x01\x02\x03\x04\x05', 100) @@ -89,15 +89,15 @@ def check_err(err): # Check unfiltered wait, on success. timer = threading.Timer(PAUSE, push_err) - timer.start() with self.assertLogs(level=logging.INFO): + timer.start() err = self.emcy.wait(timeout=TIMEOUT) check_err(err) # Check filtered wait, on success. timer = threading.Timer(PAUSE, push_err) - timer.start() with self.assertLogs(level=logging.INFO): + timer.start() err = self.emcy.wait(0x2001, TIMEOUT) check_err(err) From 041888c9a22952f31dad7f00d477886ea39033e3 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Mon, 12 Aug 2024 10:51:36 +0200 Subject: [PATCH 2/2] Better resource management --- test/test_emcy.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/test/test_emcy.py b/test/test_emcy.py index 704419a9..a09b8eb9 100644 --- a/test/test_emcy.py +++ b/test/test_emcy.py @@ -1,6 +1,7 @@ import logging import threading import unittest +from contextlib import contextmanager import can import canopen @@ -84,34 +85,42 @@ def check_err(err): data=bytes([1, 2, 3, 4, 5]), ts=100, ) + @contextmanager + def timer(func): + t = threading.Timer(PAUSE, func) + try: + yield t + finally: + t.join(TIMEOUT) + # Check unfiltered wait, on timeout. self.assertIsNone(self.emcy.wait(timeout=TIMEOUT)) # Check unfiltered wait, on success. - timer = threading.Timer(PAUSE, push_err) - with self.assertLogs(level=logging.INFO): - timer.start() - err = self.emcy.wait(timeout=TIMEOUT) + with timer(push_err) as t: + with self.assertLogs(level=logging.INFO): + t.start() + err = self.emcy.wait(timeout=TIMEOUT) check_err(err) # Check filtered wait, on success. - timer = threading.Timer(PAUSE, push_err) - with self.assertLogs(level=logging.INFO): - timer.start() - err = self.emcy.wait(0x2001, TIMEOUT) + with timer(push_err) as t: + with self.assertLogs(level=logging.INFO): + t.start() + err = self.emcy.wait(0x2001, TIMEOUT) check_err(err) # Check filtered wait, on timeout. - timer = threading.Timer(PAUSE, push_err) - timer.start() - self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT)) + with timer(push_err) as t: + t.start() + self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT)) def push_reset(): self.emcy.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 100) - timer = threading.Timer(PAUSE, push_reset) - timer.start() - self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT)) + with timer(push_reset) as t: + t.start() + self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT)) class TestEmcyError(unittest.TestCase):