Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 62 additions & 28 deletions can/interfaces/iscan.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
"""
Interface for isCAN from Thorsis Technologies GmbH, former ifak system GmbH.
Interface for isCAN from *Thorsis Technologies GmbH*, former *ifak system GmbH*.
"""

import ctypes
import time
import logging
from typing import Optional, Tuple, Union

from can import CanError, BusABC, Message
from can import BusABC, Message
from can import (
CanError,
CanInterfaceNotImplementedError,
CanInitializationError,
CanOperationError,
)

logger = logging.getLogger(__name__)

Expand All @@ -23,9 +30,15 @@ class MessageExStruct(ctypes.Structure):
]


def check_status(result, function, arguments):
def check_status_initialization(result: int, function, arguments) -> int:
if result > 0:
raise IscanError(function, result, arguments)
raise IscanInitializationError(function, result, arguments)
return result


def check_status(result: int, function, arguments) -> int:
if result > 0:
raise IscanOperationError(function, result, arguments)
return result


Expand All @@ -36,12 +49,15 @@ def check_status(result, function, arguments):
logger.warning("Failed to load IS-CAN driver: %s", e)
else:
iscan.isCAN_DeviceInitEx.argtypes = [ctypes.c_ubyte, ctypes.c_ubyte]
iscan.isCAN_DeviceInitEx.errcheck = check_status
iscan.isCAN_DeviceInitEx.errcheck = check_status_initialization
iscan.isCAN_DeviceInitEx.restype = ctypes.c_ubyte

iscan.isCAN_ReceiveMessageEx.errcheck = check_status
iscan.isCAN_ReceiveMessageEx.restype = ctypes.c_ubyte

iscan.isCAN_TransmitMessageEx.errcheck = check_status
iscan.isCAN_TransmitMessageEx.restype = ctypes.c_ubyte

iscan.isCAN_CloseDevice.errcheck = check_status
iscan.isCAN_CloseDevice.restype = ctypes.c_ubyte

Expand All @@ -62,24 +78,29 @@ class IscanBus(BusABC):
1000000: 9,
}

def __init__(self, channel, bitrate=500000, poll_interval=0.01, **kwargs):
def __init__(
self,
channel: Union[str, int],
bitrate: int = 500000,
poll_interval: float = 0.01,
**kwargs,
) -> None:
"""
:param int channel:
:param channel:
Device number
:param int bitrate:
:param bitrate:
Bitrate in bits/s
:param float poll_interval:
:param poll_interval:
Poll interval in seconds when reading messages
"""
if iscan is None:
raise ImportError("Could not load isCAN driver")
raise CanInterfaceNotImplementedError("Could not load isCAN driver")

self.channel = ctypes.c_ubyte(int(channel))
self.channel_info = "IS-CAN: %s" % channel
self.channel_info = f"IS-CAN: {self.channel}"

if bitrate not in self.BAUDRATES:
valid_bitrates = ", ".join(str(bitrate) for bitrate in self.BAUDRATES)
raise ValueError("Invalid bitrate, choose one of " + valid_bitrates)
raise ValueError(f"Invalid bitrate, choose one of {set(self.BAUDRATES)}")

self.poll_interval = poll_interval
iscan.isCAN_DeviceInitEx(self.channel, self.BAUDRATES[bitrate])
Expand All @@ -88,14 +109,16 @@ def __init__(self, channel, bitrate=500000, poll_interval=0.01, **kwargs):
channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs
)

def _recv_internal(self, timeout):
def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[Message], bool]:
raw_msg = MessageExStruct()
end_time = time.time() + timeout if timeout is not None else None
while True:
try:
iscan.isCAN_ReceiveMessageEx(self.channel, ctypes.byref(raw_msg))
except IscanError as e:
if e.error_code != 8:
if e.error_code != 8: # "No message received"
# An error occurred
raise
if end_time is not None and time.time() > end_time:
Expand All @@ -118,7 +141,7 @@ def _recv_internal(self, timeout):
)
return msg, False

def send(self, msg, timeout=None):
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
raw_msg = MessageExStruct(
msg.arbitration_id,
bool(msg.is_extended_id),
Expand All @@ -128,14 +151,14 @@ def send(self, msg, timeout=None):
)
iscan.isCAN_TransmitMessageEx(self.channel, ctypes.byref(raw_msg))

def shutdown(self):
def shutdown(self) -> None:
iscan.isCAN_CloseDevice(self.channel)


class IscanError(CanError):
# TODO: document

ERROR_CODES = {
0: "Success",
1: "No access to device",
2: "Device with ID not found",
3: "Driver operation failed",
Expand All @@ -161,17 +184,28 @@ class IscanError(CanError):
40: "Need a licence number under NT4",
}

def __init__(self, function, error_code, arguments):
super().__init__()
# :Status code
def __init__(self, function, error_code: int, arguments) -> None:
try:
description = ": " + self.ERROR_CODES[self.error_code]
except KeyError:
description = ""

super().__init__(
f"Function {self.function.__name__} failed{description}",
error_code=error_code,
)

#: Status code
self.error_code = error_code
# :Function that failed
#: Function that failed
self.function = function
# :Arguments passed to function
#: Arguments passed to function
self.arguments = arguments

def __str__(self):
description = self.ERROR_CODES.get(
self.error_code, "Error code %d" % self.error_code
)
return "Function %s failed: %s" % (self.function.__name__, description)

class IscanOperationError(IscanError, CanOperationError):
pass


class IscanInitializationError(IscanError, CanInitializationError):
pass