diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 52f0550f2..173727789 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -5,7 +5,7 @@ :meth:`can.BusABC.send_periodic`. """ -from typing import Optional, Sequence, Tuple, Union, TYPE_CHECKING +from typing import Optional, Sequence, Tuple, Union, Callable, TYPE_CHECKING from can import typechecking @@ -198,7 +198,7 @@ def __init__( class ThreadBasedCyclicSendTask( ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC ): - """Fallback cyclic send task using thread.""" + """Fallback cyclic send task using daemon thread.""" def __init__( self, @@ -207,13 +207,28 @@ def __init__( messages: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, + on_error: Optional[Callable[[Exception], bool]] = None, ): + """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`. + + The `on_error` is called if any error happens on `bus` while sending `messages`. + If `on_error` present, and returns ``False`` when invoked, thread is + stopped immediately, otherwise, thread continuiously tries to send `messages` + ignoring errors on a `bus`. Absence of `on_error` means that thread exits immediately + on error. + + :param on_error: The callable that accepts an exception if any + error happened on a `bus` while sending `messages`, + it shall return either ``True`` or ``False`` depending + on desired behaviour of `ThreadBasedCyclicSendTask`. + """ super().__init__(messages, period, duration) self.bus = bus self.send_lock = lock self.stopped = True self.thread = None self.end_time = time.perf_counter() + duration if duration else None + self.on_error = on_error if HAS_EVENTS: self.period_ms: int = int(round(period * 1000, 0)) @@ -250,7 +265,11 @@ def _run(self): self.bus.send(self.messages[msg_index]) except Exception as exc: log.exception(exc) - break + if self.on_error: + if not self.on_error(exc): + break + else: + break if self.end_time is not None and time.perf_counter() >= self.end_time: break msg_index = (msg_index + 1) % len(self.messages) diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index 95bbd0a99..4e41e5e03 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -7,6 +7,7 @@ from time import sleep import unittest +from unittest.mock import MagicMock import gc import can @@ -151,6 +152,43 @@ def test_stopping_perodic_tasks(self): bus.shutdown() + def test_thread_based_cyclic_send_task(self): + bus = can.ThreadSafeBus(bustype="virtual") + msg = can.Message( + is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7] + ) + + # good case, bus is up + on_error_mock = MagicMock(return_value=False) + task = can.broadcastmanager.ThreadBasedCyclicSendTask( + bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + ) + task.start() + sleep(1) + on_error_mock.assert_not_called() + task.stop() + bus.shutdown() + + # bus has been shutted down + on_error_mock.reset_mock() + task = can.broadcastmanager.ThreadBasedCyclicSendTask( + bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + ) + task.start() + sleep(1) + self.assertTrue(on_error_mock.call_count is 1) + task.stop() + + # bus is still shutted down, but on_error returns True + on_error_mock = MagicMock(return_value=True) + task = can.broadcastmanager.ThreadBasedCyclicSendTask( + bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock + ) + task.start() + sleep(1) + self.assertTrue(on_error_mock.call_count > 1) + task.stop() + if __name__ == "__main__": unittest.main()