diff --git a/can/exceptions.py b/can/exceptions.py index 7d6374735..e8731737c 100644 --- a/can/exceptions.py +++ b/can/exceptions.py @@ -15,7 +15,11 @@ :class:`ValueError`. This should always be documented for the function at hand. """ + +from contextlib import contextmanager + from typing import Optional +from typing import Type class CanError(Exception): @@ -96,3 +100,18 @@ class CanTimeoutError(CanError, TimeoutError): - Some message could not be sent after the timeout elapsed - No message was read within the given time """ + + +@contextmanager +def error_check( + error_message: Optional[str] = None, + exception_type: Type[CanError] = CanOperationError, +) -> None: + """Catches any exceptions and turns them into the new type while preserving the stack trace.""" + try: + yield + except Exception as error: + if error_message is None: + raise exception_type(str(error)) from error + else: + raise exception_type(error_message) from error diff --git a/can/interfaces/cantact.py b/can/interfaces/cantact.py index 9f8bf6314..5eab7b3f3 100644 --- a/can/interfaces/cantact.py +++ b/can/interfaces/cantact.py @@ -7,14 +7,20 @@ from unittest.mock import Mock from can import BusABC, Message +from ..exceptions import ( + CanInitializationError, + CanInterfaceNotImplementedError, + error_check, +) logger = logging.getLogger(__name__) try: import cantact except ImportError: + cantact = None logger.warning( - "The CANtact module is not installed. Install it using `python3 -m pip install cantact`" + "The CANtact module is not installed. Install it using `python -m pip install cantact`" ) @@ -25,8 +31,10 @@ class CantactBus(BusABC): def _detect_available_configs(): try: interface = cantact.Interface() - except (NameError, SystemError): - # couldn't import cantact, so no configurations are available + except (NameError, SystemError, AttributeError): + logger.debug( + "Could not import or instantiate cantact, so no configurations are available" + ) return [] channels = [] @@ -42,7 +50,7 @@ def __init__( monitor=False, bit_timing=None, _testing=False, - **kwargs + **kwargs, ): """ :param int channel: @@ -58,36 +66,45 @@ def __init__( if _testing: self.interface = MockInterface() else: - self.interface = cantact.Interface() + if cantact is None: + raise CanInterfaceNotImplementedError( + "The CANtact module is not installed. Install it using `python -m pip install cantact`" + ) + with error_check( + "Cannot create the cantact.Interface", CanInitializationError + ): + self.interface = cantact.Interface() self.channel = int(channel) - self.channel_info = "CANtact: ch:%s" % channel - - # configure the interface - if bit_timing is None: - # use bitrate - self.interface.set_bitrate(int(channel), int(bitrate)) - else: - # use custom bit timing - self.interface.set_bit_timing( - int(channel), - int(bit_timing.brp), - int(bit_timing.tseg1), - int(bit_timing.tseg2), - int(bit_timing.sjw), - ) - self.interface.set_enabled(int(channel), True) - self.interface.set_monitor(int(channel), monitor) - self.interface.start() + self.channel_info = f"CANtact: ch:{channel}" + + # Configure the interface + with error_check("Cannot setup the cantact.Interface", CanInitializationError): + if bit_timing is None: + # use bitrate + self.interface.set_bitrate(int(channel), int(bitrate)) + else: + # use custom bit timing + self.interface.set_bit_timing( + int(channel), + int(bit_timing.brp), + int(bit_timing.tseg1), + int(bit_timing.tseg2), + int(bit_timing.sjw), + ) + self.interface.set_enabled(int(channel), True) + self.interface.set_monitor(int(channel), monitor) + self.interface.start() super().__init__( channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs ) def _recv_internal(self, timeout): - frame = self.interface.recv(int(timeout * 1000)) + with error_check("Cannot receive message"): + frame = self.interface.recv(int(timeout * 1000)) if frame is None: - # timeout occured + # timeout occurred return None, False msg = Message( @@ -103,31 +120,33 @@ def _recv_internal(self, timeout): return msg, False def send(self, msg, timeout=None): - self.interface.send( - self.channel, - msg.arbitration_id, - bool(msg.is_extended_id), - bool(msg.is_remote_frame), - msg.dlc, - msg.data, - ) + with error_check("Cannot send message"): + self.interface.send( + self.channel, + msg.arbitration_id, + bool(msg.is_extended_id), + bool(msg.is_remote_frame), + msg.dlc, + msg.data, + ) def shutdown(self): - self.interface.stop() + with error_check("Cannot shutdown interface"): + self.interface.stop() def mock_recv(timeout): if timeout > 0: - frame = {} - frame["id"] = 0x123 - frame["extended"] = False - frame["timestamp"] = time.time() - frame["loopback"] = False - frame["rtr"] = False - frame["dlc"] = 8 - frame["data"] = [1, 2, 3, 4, 5, 6, 7, 8] - frame["channel"] = 0 - return frame + return { + "id": 0x123, + "extended": False, + "timestamp": time.time(), + "loopback": False, + "rtr": False, + "dlc": 8, + "data": [1, 2, 3, 4, 5, 6, 7, 8], + "channel": 0, + } else: # simulate timeout when timeout = 0 return None @@ -144,7 +163,6 @@ class MockInterface: set_bit_timing = Mock() set_enabled = Mock() set_monitor = Mock() - start = Mock() stop = Mock() send = Mock() channel_count = Mock(return_value=1) diff --git a/can/interfaces/usb2can/usb2canabstractionlayer.py b/can/interfaces/usb2can/usb2canabstractionlayer.py index f3d7ad0ee..8a3ae34ca 100644 --- a/can/interfaces/usb2can/usb2canabstractionlayer.py +++ b/can/interfaces/usb2can/usb2canabstractionlayer.py @@ -6,9 +6,9 @@ from ctypes import * from enum import IntEnum import logging -from contextlib import contextmanager import can +from ...exceptions import error_check log = logging.getLogger("can.usb2can") @@ -102,14 +102,6 @@ class CanalMsg(Structure): ] -@contextmanager -def error_check(error_message: str) -> None: - try: - yield - except Exception as error: - raise can.CanOperationError(error_message) from error - - class Usb2CanAbstractionLayer: """A low level wrapper around the usb2can library.