diff --git a/can/interfaces/iscan.py b/can/interfaces/iscan.py index a0bb413f2..376ccc3fa 100644 --- a/can/interfaces/iscan.py +++ b/can/interfaces/iscan.py @@ -1,12 +1,19 @@ """ -Interface for isCAN from Thorsis Technologies GmbH, former ifak system GmbH. +Interface for isCAN from *Thorsis Technologies GmbH*, former *ifak system GmbH*. """ import ctypes import time import logging +from typing import Optional, Tuple, Union -from can import CanError, BusABC, Message +from can import BusABC, Message +from can import ( + CanError, + CanInterfaceNotImplementedError, + CanInitializationError, + CanOperationError, +) logger = logging.getLogger(__name__) @@ -23,9 +30,15 @@ class MessageExStruct(ctypes.Structure): ] -def check_status(result, function, arguments): +def check_status_initialization(result: int, function, arguments) -> int: if result > 0: - raise IscanError(function, result, arguments) + raise IscanInitializationError(function, result, arguments) + return result + + +def check_status(result: int, function, arguments) -> int: + if result > 0: + raise IscanOperationError(function, result, arguments) return result @@ -36,12 +49,15 @@ def check_status(result, function, arguments): logger.warning("Failed to load IS-CAN driver: %s", e) else: iscan.isCAN_DeviceInitEx.argtypes = [ctypes.c_ubyte, ctypes.c_ubyte] - iscan.isCAN_DeviceInitEx.errcheck = check_status + iscan.isCAN_DeviceInitEx.errcheck = check_status_initialization iscan.isCAN_DeviceInitEx.restype = ctypes.c_ubyte + iscan.isCAN_ReceiveMessageEx.errcheck = check_status iscan.isCAN_ReceiveMessageEx.restype = ctypes.c_ubyte + iscan.isCAN_TransmitMessageEx.errcheck = check_status iscan.isCAN_TransmitMessageEx.restype = ctypes.c_ubyte + iscan.isCAN_CloseDevice.errcheck = check_status iscan.isCAN_CloseDevice.restype = ctypes.c_ubyte @@ -62,24 +78,29 @@ class IscanBus(BusABC): 1000000: 9, } - def __init__(self, channel, bitrate=500000, poll_interval=0.01, **kwargs): + def __init__( + self, + channel: Union[str, int], + bitrate: int = 500000, + poll_interval: float = 0.01, + **kwargs, + ) -> None: """ - :param int channel: + :param channel: Device number - :param int bitrate: + :param bitrate: Bitrate in bits/s - :param float poll_interval: + :param poll_interval: Poll interval in seconds when reading messages """ if iscan is None: - raise ImportError("Could not load isCAN driver") + raise CanInterfaceNotImplementedError("Could not load isCAN driver") self.channel = ctypes.c_ubyte(int(channel)) - self.channel_info = "IS-CAN: %s" % channel + self.channel_info = f"IS-CAN: {self.channel}" if bitrate not in self.BAUDRATES: - valid_bitrates = ", ".join(str(bitrate) for bitrate in self.BAUDRATES) - raise ValueError("Invalid bitrate, choose one of " + valid_bitrates) + raise ValueError(f"Invalid bitrate, choose one of {set(self.BAUDRATES)}") self.poll_interval = poll_interval iscan.isCAN_DeviceInitEx(self.channel, self.BAUDRATES[bitrate]) @@ -88,14 +109,16 @@ def __init__(self, channel, bitrate=500000, poll_interval=0.01, **kwargs): channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs ) - def _recv_internal(self, timeout): + def _recv_internal( + self, timeout: Optional[float] + ) -> Tuple[Optional[Message], bool]: raw_msg = MessageExStruct() end_time = time.time() + timeout if timeout is not None else None while True: try: iscan.isCAN_ReceiveMessageEx(self.channel, ctypes.byref(raw_msg)) except IscanError as e: - if e.error_code != 8: + if e.error_code != 8: # "No message received" # An error occurred raise if end_time is not None and time.time() > end_time: @@ -118,7 +141,7 @@ def _recv_internal(self, timeout): ) return msg, False - def send(self, msg, timeout=None): + def send(self, msg: Message, timeout: Optional[float] = None) -> None: raw_msg = MessageExStruct( msg.arbitration_id, bool(msg.is_extended_id), @@ -128,14 +151,14 @@ def send(self, msg, timeout=None): ) iscan.isCAN_TransmitMessageEx(self.channel, ctypes.byref(raw_msg)) - def shutdown(self): + def shutdown(self) -> None: iscan.isCAN_CloseDevice(self.channel) class IscanError(CanError): - # TODO: document ERROR_CODES = { + 0: "Success", 1: "No access to device", 2: "Device with ID not found", 3: "Driver operation failed", @@ -161,17 +184,28 @@ class IscanError(CanError): 40: "Need a licence number under NT4", } - def __init__(self, function, error_code, arguments): - super().__init__() - # :Status code + def __init__(self, function, error_code: int, arguments) -> None: + try: + description = ": " + self.ERROR_CODES[self.error_code] + except KeyError: + description = "" + + super().__init__( + f"Function {self.function.__name__} failed{description}", + error_code=error_code, + ) + + #: Status code self.error_code = error_code - # :Function that failed + #: Function that failed self.function = function - # :Arguments passed to function + #: Arguments passed to function self.arguments = arguments - def __str__(self): - description = self.ERROR_CODES.get( - self.error_code, "Error code %d" % self.error_code - ) - return "Function %s failed: %s" % (self.function.__name__, description) + +class IscanOperationError(IscanError, CanOperationError): + pass + + +class IscanInitializationError(IscanError, CanInitializationError): + pass