From 3439aea08a9e785f007ebab5256b894754d81666 Mon Sep 17 00:00:00 2001 From: Mikhail Kulinich Date: Thu, 27 Feb 2020 19:52:27 +0300 Subject: [PATCH 1/6] Add an error callback to ThreadBasedCyclicSendTask --- can/broadcastmanager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 52f0550f2..12ca8426c 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -5,7 +5,7 @@ :meth:`can.BusABC.send_periodic`. """ -from typing import Optional, Sequence, Tuple, Union, TYPE_CHECKING +from typing import Optional, Sequence, Tuple, Union, Callable, TYPE_CHECKING from can import typechecking @@ -207,6 +207,7 @@ def __init__( messages: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, + error_callback: Optional[Callable[[Exception], None]] = None ): super().__init__(messages, period, duration) self.bus = bus @@ -214,6 +215,7 @@ def __init__( self.stopped = True self.thread = None self.end_time = time.perf_counter() + duration if duration else None + self.error_callback = error_callback if HAS_EVENTS: self.period_ms: int = int(round(period * 1000, 0)) @@ -250,6 +252,8 @@ def _run(self): self.bus.send(self.messages[msg_index]) except Exception as exc: log.exception(exc) + if self.error_callback: + self.error_callback(exc) break if self.end_time is not None and time.perf_counter() >= self.end_time: break From 4dbbda6713b0f8705e8bf55d266356a9149be7ba Mon Sep 17 00:00:00 2001 From: Mikhail Kulinich Date: Thu, 27 Feb 2020 20:05:39 +0300 Subject: [PATCH 2/6] Fix formatting error reported by black --- can/broadcastmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 12ca8426c..2ba36fab3 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -207,7 +207,7 @@ def __init__( messages: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, - error_callback: Optional[Callable[[Exception], None]] = None + error_callback: Optional[Callable[[Exception], None]] = None, ): super().__init__(messages, period, duration) self.bus = bus From 5e527c5c07e298c0b60de1ae3c472c11cb0d2614 Mon Sep 17 00:00:00 2001 From: Mikhail Kulinich Date: Sat, 29 Feb 2020 16:39:05 +0300 Subject: [PATCH 3/6] Add return value for on_error --- can/broadcastmanager.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 2ba36fab3..92364ecc0 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -198,7 +198,7 @@ def __init__( class ThreadBasedCyclicSendTask( ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC ): - """Fallback cyclic send task using thread.""" + """Fallback cyclic send task using daemon thread.""" def __init__( self, @@ -207,15 +207,28 @@ def __init__( messages: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, - error_callback: Optional[Callable[[Exception], None]] = None, + on_error: Optional[Callable[[Exception], bool]] = None, ): + """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`. + + The `on_error` is called if any error happens on `bus` while sending `messages`. + If `on_error` present, and returns ``True`` when invoked, thread is + stopped immediately, otherwise, thread continuiously tries to send `messages` + ignoring errors on a `bus`. Absence of `on_error` means that thread exits immediately + on error. + + :param on_error: The callable that accepts an exception if any + error happened on a `bus` while sending `messages`, + it shall return either ``True`` or ``False`` depending + on desired behaviour of `ThreadBasedCyclicSendTask`. + """ super().__init__(messages, period, duration) self.bus = bus self.send_lock = lock self.stopped = True self.thread = None self.end_time = time.perf_counter() + duration if duration else None - self.error_callback = error_callback + self.on_error = on_error if HAS_EVENTS: self.period_ms: int = int(round(period * 1000, 0)) @@ -252,9 +265,11 @@ def _run(self): self.bus.send(self.messages[msg_index]) except Exception as exc: log.exception(exc) - if self.error_callback: - self.error_callback(exc) - break + if self.on_error: + if self.on_error(exc) is True: + break + else: + break if self.end_time is not None and time.perf_counter() >= self.end_time: break msg_index = (msg_index + 1) % len(self.messages) From e3a875a7cbd4aef390eb5c7e4b45057df88ca233 Mon Sep 17 00:00:00 2001 From: Mikhail Kulinich Date: Sat, 29 Feb 2020 18:21:46 +0300 Subject: [PATCH 4/6] unit tests --- test/simplecyclic_test.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index 95bbd0a99..375c8ef9b 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -7,6 +7,7 @@ from time import sleep import unittest +from unittest.mock import MagicMock import gc import can @@ -151,6 +152,42 @@ def test_stopping_perodic_tasks(self): bus.shutdown() + def test_thread_based_cyclic_send_task(self): + bus = can.ThreadSafeBus(bustype="virtual") + message = can.Message( + is_extended_id=False, + arbitration_id=0x123, + data=[0, 1, 2, 3, 4, 5, 6, 7], + ) + + # good case, bus is up + on_error_mock = MagicMock(return_value=True) + task = can.broadcastmanager.ThreadBasedCyclicSendTask( + bus, bus._lock_send_periodic, message, 0.1, 3, on_error_mock) + task.start() + sleep(1) + on_error_mock.assert_not_called() + task.stop() + bus.shutdown() + + # bus has been shutted down + on_error_mock.reset_mock() + task = can.broadcastmanager.ThreadBasedCyclicSendTask( + bus, bus._lock_send_periodic, message, 0.1, 3, on_error_mock) + task.start() + sleep(1) + self.assertTrue(on_error_mock.call_count is 1) + task.stop() + + # bus is still shutted down, but on_error returns False + on_error_mock = MagicMock(return_value=False) + task = can.broadcastmanager.ThreadBasedCyclicSendTask( + bus, bus._lock_send_periodic, message, 0.1, 3, on_error_mock) + task.start() + sleep(1) + self.assertTrue(on_error_mock.call_count > 1) + task.stop() + if __name__ == "__main__": unittest.main() From ad9bc878d07ff0c438f8f4baccdc03f933341ffc Mon Sep 17 00:00:00 2001 From: Mikhail Kulinich Date: Sat, 29 Feb 2020 18:37:28 +0300 Subject: [PATCH 5/6] black --- test/simplecyclic_test.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index 375c8ef9b..a0b477226 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -154,16 +154,15 @@ def test_stopping_perodic_tasks(self): def test_thread_based_cyclic_send_task(self): bus = can.ThreadSafeBus(bustype="virtual") - message = can.Message( - is_extended_id=False, - arbitration_id=0x123, - data=[0, 1, 2, 3, 4, 5, 6, 7], + msg = can.Message( + is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7] ) # good case, bus is up on_error_mock = MagicMock(return_value=True) task = can.broadcastmanager.ThreadBasedCyclicSendTask( - bus, bus._lock_send_periodic, message, 0.1, 3, on_error_mock) + bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + ) task.start() sleep(1) on_error_mock.assert_not_called() @@ -173,7 +172,8 @@ def test_thread_based_cyclic_send_task(self): # bus has been shutted down on_error_mock.reset_mock() task = can.broadcastmanager.ThreadBasedCyclicSendTask( - bus, bus._lock_send_periodic, message, 0.1, 3, on_error_mock) + bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + ) task.start() sleep(1) self.assertTrue(on_error_mock.call_count is 1) @@ -182,7 +182,8 @@ def test_thread_based_cyclic_send_task(self): # bus is still shutted down, but on_error returns False on_error_mock = MagicMock(return_value=False) task = can.broadcastmanager.ThreadBasedCyclicSendTask( - bus, bus._lock_send_periodic, message, 0.1, 3, on_error_mock) + bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + ) task.start() sleep(1) self.assertTrue(on_error_mock.call_count > 1) From 5439407081e27993d3690f857e81635da634cc23 Mon Sep 17 00:00:00 2001 From: Mikhail Kulinich Date: Mon, 2 Mar 2020 17:40:46 +0300 Subject: [PATCH 6/6] review comments --- can/broadcastmanager.py | 4 ++-- test/simplecyclic_test.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 92364ecc0..173727789 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -212,7 +212,7 @@ def __init__( """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`. The `on_error` is called if any error happens on `bus` while sending `messages`. - If `on_error` present, and returns ``True`` when invoked, thread is + If `on_error` present, and returns ``False`` when invoked, thread is stopped immediately, otherwise, thread continuiously tries to send `messages` ignoring errors on a `bus`. Absence of `on_error` means that thread exits immediately on error. @@ -266,7 +266,7 @@ def _run(self): except Exception as exc: log.exception(exc) if self.on_error: - if self.on_error(exc) is True: + if not self.on_error(exc): break else: break diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index a0b477226..4e41e5e03 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -159,7 +159,7 @@ def test_thread_based_cyclic_send_task(self): ) # good case, bus is up - on_error_mock = MagicMock(return_value=True) + on_error_mock = MagicMock(return_value=False) task = can.broadcastmanager.ThreadBasedCyclicSendTask( bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock ) @@ -179,8 +179,8 @@ def test_thread_based_cyclic_send_task(self): self.assertTrue(on_error_mock.call_count is 1) task.stop() - # bus is still shutted down, but on_error returns False - on_error_mock = MagicMock(return_value=False) + # bus is still shutted down, but on_error returns True + on_error_mock = MagicMock(return_value=True) task = can.broadcastmanager.ThreadBasedCyclicSendTask( bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock )