Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions can/interfaces/nixnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import time
import struct

from can import CanError, BusABC, Message
from can import BusABC, Message
from ..exceptions import CanInitializationError, CanOperationError


logger = logging.getLogger(__name__)

Expand All @@ -28,18 +30,15 @@
database,
XnetError,
)
except ImportError:
logger.error("Error, NIXNET python module cannot be loaded.")
raise
except ImportError as error:
raise ImportError("NIXNET python module cannot be loaded") from error
else:
logger.error("NI-XNET interface is only available on Windows systems")
raise NotImplementedError("NiXNET is not supported on not Win32 platforms")
raise NotImplementedError("NiXNET only supported on Win32 platforms")


class NiXNETcanBus(BusABC):
"""
The CAN Bus implemented for the NI-XNET interface.

"""

def __init__(
Expand All @@ -52,7 +51,7 @@ def __init__(
brs=False,
can_termination=False,
log_errors=True,
**kwargs
**kwargs,
):
"""
:param str channel:
Expand All @@ -69,9 +68,8 @@ def __init__(
``is_error_frame`` set to True and ``arbitration_id`` will identify
the error (default True)

:raises can.interfaces.nixnet.NiXNETError:
:raises can.exceptions.CanInitializationError:
If starting communication fails

"""
self._rx_queue = []
self.channel = channel
Expand Down Expand Up @@ -120,16 +118,18 @@ def __init__(
self.__session_send.start()
self.__session_receive.start()

except errors.XnetError as err:
raise NiXNETError(function="__init__", error_message=err.args[0]) from None
except errors.XnetError as error:
raise CanInitializationError(
f"{error.args[0]} ({error.error_type})", error.error_code
) from None

self._is_filtered = False
super(NiXNETcanBus, self).__init__(
channel=channel,
can_filters=can_filters,
bitrate=bitrate,
log_errors=log_errors,
**kwargs
**kwargs,
)

def _recv_internal(self, timeout):
Expand Down Expand Up @@ -160,8 +160,7 @@ def _recv_internal(self, timeout):
)

return msg, self._filters is None
except Exception as e:
# print('Error: ', e)
except Exception:
return None, self._filters is None

def send(self, msg, timeout=None):
Expand All @@ -174,7 +173,7 @@ def send(self, msg, timeout=None):
:param float timeout:
Max time to wait for the device to be ready in seconds, None if time is infinite

:raises can.interfaces.nixnet.NiXNETError:
:raises can.exceptions.CanOperationError:
If writing to transmit buffer fails.
It does not wait for message to be ACKed currently.
"""
Expand All @@ -201,8 +200,10 @@ def send(self, msg, timeout=None):

try:
self.__session_send.frames.write([can_frame], timeout)
except errors.XnetError as err:
raise NiXNETError(function="send", error_message=err.args[0]) from None
except errors.XnetError as error:
raise CanOperationError(
f"{error.args[0]} ({error.error_type})", error.error_code
) from None

def reset(self):
"""
Expand Down Expand Up @@ -253,18 +254,3 @@ def _detect_available_configs():
logger.debug("An error occured while searching for configs: %s", str(error))

return configs


# To-Do review error management, I don't like this implementation
class NiXNETError(CanError):
"""Error from NI-XNET driver."""

def __init__(self, function="", error_message=""):
super(NiXNETError, self).__init__()
#: Function that failed
self.function = function
#: Arguments passed to function
self.error_message = error_message

def __str__(self):
return "Function %s failed:\n%s" % (self.function, self.error_message)