diff --git a/can/interfaces/kvaser/canlib.py b/can/interfaces/kvaser/canlib.py index 201a4ec82..9d6d6b64a 100644 --- a/can/interfaces/kvaser/canlib.py +++ b/can/interfaces/kvaser/canlib.py @@ -11,7 +11,8 @@ import logging import ctypes -from can import CanError, BusABC +from can import BusABC +from ...exceptions import CanError, CanInitializationError, CanOperationError from can import Message from can.util import time_perfcounter_correlation from . import constants as canstat @@ -67,44 +68,51 @@ class CANLIBError(CanError): """ def __init__(self, function, error_code, arguments): - super().__init__() - self.error_code = error_code + message = CANLIBError._get_error_message(error_code) + super().__init__(f"Function {function.__name__} failed - {message}", error_code) self.function = function self.arguments = arguments - def __str__(self): - return "Function %s failed - %s" % ( - self.function.__name__, - self.__get_error_message(), - ) - - def __get_error_message(self): + @staticmethod + def _get_error_message(error_code: int) -> str: errmsg = ctypes.create_string_buffer(128) - canGetErrorText(self.error_code, errmsg, len(errmsg)) + canGetErrorText(error_code, errmsg, len(errmsg)) return errmsg.value.decode("ascii") +class CANLIBInitializationError(CANLIBError, CanInitializationError): + pass + + +class CANLIBOperationError(CANLIBError, CanOperationError): + pass + + def __convert_can_status_to_int(result): - # log.debug("converting can status to int {} ({})".format(result, type(result))) if isinstance(result, int): return result else: return result.value -def __check_status(result, function, arguments): +def __check_status_operation(result, function, arguments): + result = __convert_can_status_to_int(result) + if not canstat.CANSTATUS_SUCCESS(result): + raise CANLIBOperationError(function, result, arguments) + return result + + +def __check_status_initialization(result, function, arguments): result = __convert_can_status_to_int(result) if not canstat.CANSTATUS_SUCCESS(result): - # log.debug('Detected error while checking CAN status') - raise CANLIBError(function, result, arguments) + raise CANLIBInitializationError(function, result, arguments) return result def __check_status_read(result, function, arguments): result = __convert_can_status_to_int(result) if not canstat.CANSTATUS_SUCCESS(result) and result != canstat.canERR_NOMSG: - # log.debug('Detected error in which checking status read') - raise CANLIBError(function, result, arguments) + raise CANLIBOperationError(function, result, arguments) return result @@ -115,16 +123,12 @@ class c_canHandle(ctypes.c_int): canINVALID_HANDLE = -1 -def __handle_is_valid(handle): - return handle.value > canINVALID_HANDLE - - def __check_bus_handle_validity(handle, function, arguments): - if not __handle_is_valid(handle): - result = __convert_can_status_to_int(handle) - raise CANLIBError(function, result, arguments) - else: - return handle + if handle.value > canINVALID_HANDLE: + return handle # is valid + + result = __convert_can_status_to_int(handle) + raise CANLIBInitializationError(function, result, arguments) if __canlib is not None: @@ -134,7 +138,7 @@ def __check_bus_handle_validity(handle, function, arguments): "canGetErrorText", argtypes=[canstat.c_canStatus, ctypes.c_char_p, ctypes.c_uint], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) # TODO wrap this type of function to provide a more Pythonic API @@ -142,35 +146,35 @@ def __check_bus_handle_validity(handle, function, arguments): "canGetNumberOfChannels", argtypes=[ctypes.c_void_p], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_initialization, ) kvReadTimer = __get_canlib_function( "kvReadTimer", argtypes=[c_canHandle, ctypes.POINTER(ctypes.c_uint)], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_initialization, ) canBusOff = __get_canlib_function( "canBusOff", argtypes=[c_canHandle], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) canBusOn = __get_canlib_function( "canBusOn", argtypes=[c_canHandle], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_initialization, ) canClose = __get_canlib_function( "canClose", argtypes=[c_canHandle], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) canOpenChannel = __get_canlib_function( @@ -192,7 +196,7 @@ def __check_bus_handle_validity(handle, function, arguments): ctypes.c_uint, ], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_initialization, ) canSetBusParamsFd = __get_canlib_function( @@ -205,21 +209,21 @@ def __check_bus_handle_validity(handle, function, arguments): ctypes.c_uint, ], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_initialization, ) canSetBusOutputControl = __get_canlib_function( "canSetBusOutputControl", argtypes=[c_canHandle, ctypes.c_uint], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_initialization, ) canSetAcceptanceFilter = __get_canlib_function( "canSetAcceptanceFilter", argtypes=[c_canHandle, ctypes.c_uint, ctypes.c_uint, ctypes.c_int], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) canReadWait = __get_canlib_function( @@ -247,32 +251,39 @@ def __check_bus_handle_validity(handle, function, arguments): ctypes.c_uint, ], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) canWriteSync = __get_canlib_function( "canWriteSync", argtypes=[c_canHandle, ctypes.c_ulong], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, + ) + + canIoCtlInit = __get_canlib_function( + "canIoCtl", + argtypes=[c_canHandle, ctypes.c_uint, ctypes.c_void_p, ctypes.c_uint], + restype=canstat.c_canStatus, + errcheck=__check_status_initialization, ) canIoCtl = __get_canlib_function( "canIoCtl", argtypes=[c_canHandle, ctypes.c_uint, ctypes.c_void_p, ctypes.c_uint], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) canGetVersion = __get_canlib_function( - "canGetVersion", restype=ctypes.c_short, errcheck=__check_status + "canGetVersion", restype=ctypes.c_short, errcheck=__check_status_operation ) kvFlashLeds = __get_canlib_function( "kvFlashLeds", argtypes=[c_canHandle, ctypes.c_int, ctypes.c_int], restype=ctypes.c_short, - errcheck=__check_status, + errcheck=__check_status_operation, ) if sys.platform == "win32": @@ -280,21 +291,21 @@ def __check_bus_handle_validity(handle, function, arguments): "canGetVersionEx", argtypes=[ctypes.c_uint], restype=ctypes.c_uint, - errcheck=__check_status, + errcheck=__check_status_operation, ) canGetChannelData = __get_canlib_function( "canGetChannelData", argtypes=[ctypes.c_int, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_initialization, ) canRequestBusStatistics = __get_canlib_function( "canRequestBusStatistics", argtypes=[c_canHandle], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) canGetBusStatistics = __get_canlib_function( @@ -305,7 +316,7 @@ def __check_bus_handle_validity(handle, function, arguments): ctypes.c_size_t, ], restype=canstat.c_canStatus, - errcheck=__check_status, + errcheck=__check_status_operation, ) @@ -440,7 +451,7 @@ def __init__(self, channel, can_filters=None, **kwargs): log.debug("Creating read handle to bus channel: %s", channel) self._read_handle = canOpenChannel(channel, flags) - canIoCtl( + canIoCtlInit( self._read_handle, canstat.canIOCTL_SET_TIMER_SCALE, ctypes.byref(ctypes.c_long(TIMESTAMP_RESOLUTION)), @@ -467,7 +478,7 @@ def __init__(self, channel, can_filters=None, **kwargs): local_echo = single_handle or receive_own_messages if receive_own_messages and single_handle: log.warning("receive_own_messages only works if single_handle is False") - canIoCtl( + canIoCtlInit( self._read_handle, canstat.canIOCTL_SET_LOCAL_TXECHO, ctypes.byref(ctypes.c_byte(local_echo)), @@ -533,9 +544,8 @@ def _apply_filters(self, filters): for handle in (self._read_handle, self._write_handle): for extended in (0, 1): canSetAcceptanceFilter(handle, 0, 0, extended) - except (NotImplementedError, CANLIBError): - # TODO add logging? - pass + except (NotImplementedError, CANLIBError) as e: + log.error("An error occured while disabling filtering: %s", e) def flush_tx_buffer(self): """Wipeout the transmit buffer on the Kvaser.""" @@ -570,7 +580,6 @@ def _recv_internal(self, timeout=None): ) if status == canstat.canOK: - # log.debug('read complete -> status OK') data_array = data.raw flags = flags.value is_extended = bool(flags & canstat.canMSG_EXT) @@ -671,8 +680,9 @@ def _detect_available_configs(): num_channels = ctypes.c_int(0) try: canGetNumberOfChannels(ctypes.byref(num_channels)) - except Exception: + except (CANLIBError, NameError): pass + return [ {"interface": "kvaser", "channel": channel} for channel in range(num_channels.value) diff --git a/test/test_kvaser.py b/test/test_kvaser.py index 2637ae0e7..0efdfd643 100644 --- a/test/test_kvaser.py +++ b/test/test_kvaser.py @@ -19,6 +19,7 @@ def setUp(self): canlib.canGetNumberOfChannels = KvaserTest.canGetNumberOfChannels canlib.canOpenChannel = Mock(return_value=0) canlib.canIoCtl = Mock(return_value=0) + canlib.canIoCtlInit = Mock(return_value=0) canlib.kvReadTimer = Mock() canlib.canSetBusParams = Mock() canlib.canSetBusParamsFd = Mock()