Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 98 additions & 54 deletions can/interfaces/nican.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@
import logging
import sys

from can import CanError, BusABC, Message
from can import BusABC, Message
import can.typechecking
from ..exceptions import (
CanError,
CanInterfaceNotImplementedError,
CanOperationError,
CanInitializationError,
)
from typing import Optional, Tuple, Type


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,15 +83,48 @@ class TxMessageStruct(ctypes.Structure):
]


def check_status(result, function, arguments):
class NicanError(CanError):
"""Error from NI-CAN driver."""

def __init__(self, function, error_code: int, arguments) -> None:
super().__init__(
message=f"{function} failed: {get_error_message(self.error_code)}",
error_code=error_code,
)

#: Function that failed
self.function = function

#: Arguments passed to function
self.arguments = arguments


class NicanInitializationError(NicanError, CanInitializationError):
pass


class NicanOperationError(NicanError, CanOperationError):
pass


def check_status(
result: int,
function,
arguments,
error_class: Type[NicanError] = NicanOperationError,
) -> int:
if result > 0:
logger.warning(get_error_message(result))
elif result < 0:
raise NicanError(function, result, arguments)
raise error_class(function, result, arguments)
return result


def get_error_message(status_code):
def check_status_init(*args, **kwargs) -> int:
return check_status(*args, **kwargs, error_class=NicanInitializationError)


def get_error_message(status_code: int) -> str:
"""Convert status code to descriptive string."""
errmsg = ctypes.create_string_buffer(1024)
nican.ncStatusToString(status_code, len(errmsg), errmsg)
Expand All @@ -102,21 +144,28 @@ def get_error_message(status_code):
ctypes.c_void_p,
ctypes.c_void_p,
]
nican.ncConfig.errcheck = check_status
nican.ncConfig.errcheck = check_status_init

nican.ncOpenObject.argtypes = [ctypes.c_char_p, ctypes.c_void_p]
nican.ncOpenObject.errcheck = check_status
nican.ncOpenObject.errcheck = check_status_init

nican.ncCloseObject.errcheck = check_status

nican.ncAction.argtypes = [ctypes.c_ulong, ctypes.c_ulong, ctypes.c_ulong]
nican.ncAction.errcheck = check_status

nican.ncRead.errcheck = check_status

nican.ncWrite.errcheck = check_status

nican.ncWaitForState.argtypes = [
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_void_p,
]
nican.ncWaitForState.errcheck = check_status

nican.ncStatusToString.argtypes = [ctypes.c_int, ctypes.c_uint, ctypes.c_char_p]
else:
nican = None
Expand All @@ -137,37 +186,42 @@ class NicanBus(BusABC):
"""

def __init__(
self, channel, can_filters=None, bitrate=None, log_errors=True, **kwargs
):
self,
channel: str,
can_filters: Optional[can.typechecking.CanFilters] = None,
bitrate: Optional[int] = None,
log_errors: bool = True,
**kwargs,
) -> None:
"""
:param str channel:
Name of the object to open (e.g. 'CAN0')
:param channel:
Name of the object to open (e.g. `"CAN0"`)

:param int bitrate:
Bitrate in bits/s
:param bitrate:
Bitrate in bit/s

:param list can_filters:
:param can_filters:
See :meth:`can.BusABC.set_filters`.

:param bool log_errors:
:param log_errors:
If True, communication errors will appear as CAN messages with
``is_error_frame`` set to True and ``arbitration_id`` will identify
the error (default True)

:raises can.interfaces.nican.NicanError:
If starting communication fails

:raise can.CanInterfaceNotImplementedError:
If the current operating system is not supported or the driver could not be loaded.
:raise can.interfaces.nican.NicanInitializationError:
If the bus could not be set up.
"""
if nican is None:
raise ImportError(
raise CanInterfaceNotImplementedError(
"The NI-CAN driver could not be loaded. "
"Check that you are using 32-bit Python on Windows."
)

self.channel = channel
self.channel_info = "NI-CAN: " + channel
if not isinstance(channel, bytes):
channel = channel.encode()
self.channel_info = f"NI-CAN: {channel}"
channel_bytes = channel.encode("ascii")

config = [(NC_ATTR_START_ON_OPEN, True), (NC_ATTR_LOG_COMM_ERRS, log_errors)]

Expand Down Expand Up @@ -208,31 +262,33 @@ def __init__(
attr_id_list = AttrList(*(row[0] for row in config))
attr_value_list = AttrList(*(row[1] for row in config))
nican.ncConfig(
channel,
channel_bytes,
len(config),
ctypes.byref(attr_id_list),
ctypes.byref(attr_value_list),
)

self.handle = ctypes.c_ulong()
nican.ncOpenObject(channel, ctypes.byref(self.handle))
nican.ncOpenObject(channel_bytes, ctypes.byref(self.handle))

super().__init__(
channel=channel,
can_filters=can_filters,
bitrate=bitrate,
log_errors=log_errors,
**kwargs
**kwargs,
)

def _recv_internal(self, timeout):
def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[Message], bool]:
"""
Read a message from a NI-CAN bus.

:param float timeout:
Max time to wait in seconds or None if infinite
:param timeout:
Max time to wait in seconds or ``None`` if infinite

:raises can.interfaces.nican.NicanError:
:raises can.interfaces.nican.NicanOperationError:
If reception fails
"""
if timeout is None:
Expand Down Expand Up @@ -274,14 +330,19 @@ def _recv_internal(self, timeout):
)
return msg, True

def send(self, msg, timeout=None):
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
"""
Send a message to NI-CAN.

:param can.Message msg:
:param msg:
Message to send

:raises can.interfaces.nican.NicanError:
:param timeout:
The timeout

.. warning:: This gets ignored.

:raises can.interfaces.nican.NicanOperationError:
If writing to transmit buffer fails.
It does not wait for message to be ACKed currently.
"""
Expand All @@ -299,36 +360,19 @@ def send(self, msg, timeout=None):
# Maybe it is possible to use ncCreateNotification instead but seems a
# bit overkill at the moment.
# state = ctypes.c_ulong()
# nican.ncWaitForState(
# self.handle, NC_ST_WRITE_SUCCESS, int(timeout * 1000), ctypes.byref(state))
# nican.ncWaitForState(self.handle, NC_ST_WRITE_SUCCESS, int(timeout * 1000), ctypes.byref(state))

def reset(self):
def reset(self) -> None:
"""
Resets network interface. Stops network interface, then resets the CAN
chip to clear the CAN error counters (clear error passive state).
Resetting includes clearing all entries from read and write queues.

:raises can.interfaces.nican.NicanOperationError:
If resetting fails.
"""
nican.ncAction(self.handle, NC_OP_RESET, 0)

def shutdown(self):
def shutdown(self) -> None:
"""Close object."""
nican.ncCloseObject(self.handle)


class NicanError(CanError):
"""Error from NI-CAN driver."""

def __init__(self, function, error_code, arguments):
super().__init__()
#: Status code
self.error_code = error_code
#: Function that failed
self.function = function
#: Arguments passed to function
self.arguments = arguments

def __str__(self):
return "Function %s failed:\n%s" % (
self.function.__name__,
get_error_message(self.error_code),
)