Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 62 additions & 52 deletions can/interfaces/kvaser/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import logging
import ctypes

from can import CanError, BusABC
from can import BusABC
from ...exceptions import CanError, CanInitializationError, CanOperationError
from can import Message
from can.util import time_perfcounter_correlation
from . import constants as canstat
Expand Down Expand Up @@ -67,44 +68,51 @@ class CANLIBError(CanError):
"""

def __init__(self, function, error_code, arguments):
super().__init__()
self.error_code = error_code
message = CANLIBError._get_error_message(error_code)
super().__init__(f"Function {function.__name__} failed - {message}", error_code)
self.function = function
self.arguments = arguments

def __str__(self):
return "Function %s failed - %s" % (
self.function.__name__,
self.__get_error_message(),
)

def __get_error_message(self):
@staticmethod
def _get_error_message(error_code: int) -> str:
errmsg = ctypes.create_string_buffer(128)
canGetErrorText(self.error_code, errmsg, len(errmsg))
canGetErrorText(error_code, errmsg, len(errmsg))
return errmsg.value.decode("ascii")


class CANLIBInitializationError(CANLIBError, CanInitializationError):
pass


class CANLIBOperationError(CANLIBError, CanOperationError):
pass


def __convert_can_status_to_int(result):
# log.debug("converting can status to int {} ({})".format(result, type(result)))
if isinstance(result, int):
return result
else:
return result.value


def __check_status(result, function, arguments):
def __check_status_operation(result, function, arguments):
result = __convert_can_status_to_int(result)
if not canstat.CANSTATUS_SUCCESS(result):
raise CANLIBOperationError(function, result, arguments)
return result


def __check_status_initialization(result, function, arguments):
result = __convert_can_status_to_int(result)
if not canstat.CANSTATUS_SUCCESS(result):
# log.debug('Detected error while checking CAN status')
raise CANLIBError(function, result, arguments)
raise CANLIBInitializationError(function, result, arguments)
return result


def __check_status_read(result, function, arguments):
result = __convert_can_status_to_int(result)
if not canstat.CANSTATUS_SUCCESS(result) and result != canstat.canERR_NOMSG:
# log.debug('Detected error in which checking status read')
raise CANLIBError(function, result, arguments)
raise CANLIBOperationError(function, result, arguments)
return result


Expand All @@ -115,16 +123,12 @@ class c_canHandle(ctypes.c_int):
canINVALID_HANDLE = -1


def __handle_is_valid(handle):
return handle.value > canINVALID_HANDLE


def __check_bus_handle_validity(handle, function, arguments):
if not __handle_is_valid(handle):
result = __convert_can_status_to_int(handle)
raise CANLIBError(function, result, arguments)
else:
return handle
if handle.value > canINVALID_HANDLE:
return handle # is valid

result = __convert_can_status_to_int(handle)
raise CANLIBInitializationError(function, result, arguments)


if __canlib is not None:
Expand All @@ -134,43 +138,43 @@ def __check_bus_handle_validity(handle, function, arguments):
"canGetErrorText",
argtypes=[canstat.c_canStatus, ctypes.c_char_p, ctypes.c_uint],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

# TODO wrap this type of function to provide a more Pythonic API
canGetNumberOfChannels = __get_canlib_function(
"canGetNumberOfChannels",
argtypes=[ctypes.c_void_p],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_initialization,
)

kvReadTimer = __get_canlib_function(
"kvReadTimer",
argtypes=[c_canHandle, ctypes.POINTER(ctypes.c_uint)],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_initialization,
)

canBusOff = __get_canlib_function(
"canBusOff",
argtypes=[c_canHandle],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canBusOn = __get_canlib_function(
"canBusOn",
argtypes=[c_canHandle],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_initialization,
)

canClose = __get_canlib_function(
"canClose",
argtypes=[c_canHandle],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canOpenChannel = __get_canlib_function(
Expand All @@ -192,7 +196,7 @@ def __check_bus_handle_validity(handle, function, arguments):
ctypes.c_uint,
],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_initialization,
)

canSetBusParamsFd = __get_canlib_function(
Expand All @@ -205,21 +209,21 @@ def __check_bus_handle_validity(handle, function, arguments):
ctypes.c_uint,
],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_initialization,
)

canSetBusOutputControl = __get_canlib_function(
"canSetBusOutputControl",
argtypes=[c_canHandle, ctypes.c_uint],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_initialization,
)

canSetAcceptanceFilter = __get_canlib_function(
"canSetAcceptanceFilter",
argtypes=[c_canHandle, ctypes.c_uint, ctypes.c_uint, ctypes.c_int],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canReadWait = __get_canlib_function(
Expand Down Expand Up @@ -247,54 +251,61 @@ def __check_bus_handle_validity(handle, function, arguments):
ctypes.c_uint,
],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canWriteSync = __get_canlib_function(
"canWriteSync",
argtypes=[c_canHandle, ctypes.c_ulong],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canIoCtlInit = __get_canlib_function(
"canIoCtl",
argtypes=[c_canHandle, ctypes.c_uint, ctypes.c_void_p, ctypes.c_uint],
restype=canstat.c_canStatus,
errcheck=__check_status_initialization,
)

canIoCtl = __get_canlib_function(
"canIoCtl",
argtypes=[c_canHandle, ctypes.c_uint, ctypes.c_void_p, ctypes.c_uint],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canGetVersion = __get_canlib_function(
"canGetVersion", restype=ctypes.c_short, errcheck=__check_status
"canGetVersion", restype=ctypes.c_short, errcheck=__check_status_operation
)

kvFlashLeds = __get_canlib_function(
"kvFlashLeds",
argtypes=[c_canHandle, ctypes.c_int, ctypes.c_int],
restype=ctypes.c_short,
errcheck=__check_status,
errcheck=__check_status_operation,
)

if sys.platform == "win32":
canGetVersionEx = __get_canlib_function(
"canGetVersionEx",
argtypes=[ctypes.c_uint],
restype=ctypes.c_uint,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canGetChannelData = __get_canlib_function(
"canGetChannelData",
argtypes=[ctypes.c_int, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_initialization,
)

canRequestBusStatistics = __get_canlib_function(
"canRequestBusStatistics",
argtypes=[c_canHandle],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)

canGetBusStatistics = __get_canlib_function(
Expand All @@ -305,7 +316,7 @@ def __check_bus_handle_validity(handle, function, arguments):
ctypes.c_size_t,
],
restype=canstat.c_canStatus,
errcheck=__check_status,
errcheck=__check_status_operation,
)


Expand Down Expand Up @@ -440,7 +451,7 @@ def __init__(self, channel, can_filters=None, **kwargs):

log.debug("Creating read handle to bus channel: %s", channel)
self._read_handle = canOpenChannel(channel, flags)
canIoCtl(
canIoCtlInit(
self._read_handle,
canstat.canIOCTL_SET_TIMER_SCALE,
ctypes.byref(ctypes.c_long(TIMESTAMP_RESOLUTION)),
Expand All @@ -467,7 +478,7 @@ def __init__(self, channel, can_filters=None, **kwargs):
local_echo = single_handle or receive_own_messages
if receive_own_messages and single_handle:
log.warning("receive_own_messages only works if single_handle is False")
canIoCtl(
canIoCtlInit(
self._read_handle,
canstat.canIOCTL_SET_LOCAL_TXECHO,
ctypes.byref(ctypes.c_byte(local_echo)),
Expand Down Expand Up @@ -533,9 +544,8 @@ def _apply_filters(self, filters):
for handle in (self._read_handle, self._write_handle):
for extended in (0, 1):
canSetAcceptanceFilter(handle, 0, 0, extended)
except (NotImplementedError, CANLIBError):
# TODO add logging?
pass
except (NotImplementedError, CANLIBError) as e:
log.error("An error occured while disabling filtering: %s", e)

def flush_tx_buffer(self):
"""Wipeout the transmit buffer on the Kvaser."""
Expand Down Expand Up @@ -570,7 +580,6 @@ def _recv_internal(self, timeout=None):
)

if status == canstat.canOK:
# log.debug('read complete -> status OK')
data_array = data.raw
flags = flags.value
is_extended = bool(flags & canstat.canMSG_EXT)
Expand Down Expand Up @@ -671,8 +680,9 @@ def _detect_available_configs():
num_channels = ctypes.c_int(0)
try:
canGetNumberOfChannels(ctypes.byref(num_channels))
except Exception:
except (CANLIBError, NameError):
pass

return [
{"interface": "kvaser", "channel": channel}
for channel in range(num_channels.value)
Expand Down
1 change: 1 addition & 0 deletions test/test_kvaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def setUp(self):
canlib.canGetNumberOfChannels = KvaserTest.canGetNumberOfChannels
canlib.canOpenChannel = Mock(return_value=0)
canlib.canIoCtl = Mock(return_value=0)
canlib.canIoCtlInit = Mock(return_value=0)
canlib.kvReadTimer = Mock()
canlib.canSetBusParams = Mock()
canlib.canSetBusParamsFd = Mock()
Expand Down