From 4a4a57a59596e5d1211b2a6a0806b3092ad3bb6e Mon Sep 17 00:00:00 2001 From: Felix Divo <4403130+felixdivo@users.noreply.github.com> Date: Wed, 27 Oct 2021 16:58:10 +0200 Subject: [PATCH] Specific Exceptions: Adapting nixnet interface Part of #1046. Since the NI-XNET interface was not released in a stable release yet (see #968), we can freely change the exception handling without deprecation notices. --- can/interfaces/nixnet.py | 52 +++++++++++++++------------------------- 1 file changed, 19 insertions(+), 33 deletions(-) diff --git a/can/interfaces/nixnet.py b/can/interfaces/nixnet.py index f50daf4c9..99d3738be 100644 --- a/can/interfaces/nixnet.py +++ b/can/interfaces/nixnet.py @@ -13,7 +13,9 @@ import time import struct -from can import CanError, BusABC, Message +from can import BusABC, Message +from ..exceptions import CanInitializationError, CanOperationError + logger = logging.getLogger(__name__) @@ -28,18 +30,15 @@ database, XnetError, ) - except ImportError: - logger.error("Error, NIXNET python module cannot be loaded.") - raise ImportError() + except ImportError as error: + raise ImportError("Error, NIXNET python module cannot be loaded") from error else: - logger.error("NI-XNET interface is only available on Windows systems") - raise NotImplementedError("NiXNET is not supported on not Win32 platforms") + raise NotImplementedError("NiXNET only supported on Win32 platforms") class NiXNETcanBus(BusABC): """ The CAN Bus implemented for the NI-XNET interface. - """ def __init__( @@ -52,7 +51,7 @@ def __init__( brs=False, can_termination=False, log_errors=True, - **kwargs + **kwargs, ): """ :param str channel: @@ -69,9 +68,8 @@ def __init__( ``is_error_frame`` set to True and ``arbitration_id`` will identify the error (default True) - :raises can.interfaces.nixnet.NiXNETError: + :raises can.exceptions.CanInitializationError: If starting communication fails - """ self._rx_queue = [] self.channel = channel @@ -120,8 +118,10 @@ def __init__( self.__session_send.start() self.__session_receive.start() - except errors.XnetError as err: - raise NiXNETError(function="__init__", error_message=err.args[0]) from None + except errors.XnetError as error: + raise CanInitializationError( + f"{error.args[0]} ({error.error_type})", error.error_code + ) from None self._is_filtered = False super(NiXNETcanBus, self).__init__( @@ -129,7 +129,7 @@ def __init__( can_filters=can_filters, bitrate=bitrate, log_errors=log_errors, - **kwargs + **kwargs, ) def _recv_internal(self, timeout): @@ -160,8 +160,7 @@ def _recv_internal(self, timeout): ) return msg, self._filters is None - except Exception as e: - # print('Error: ', e) + except Exception: return None, self._filters is None def send(self, msg, timeout=None): @@ -174,7 +173,7 @@ def send(self, msg, timeout=None): :param float timeout: Max time to wait for the device to be ready in seconds, None if time is infinite - :raises can.interfaces.nixnet.NiXNETError: + :raises can.exceptions.CanOperationError: If writing to transmit buffer fails. It does not wait for message to be ACKed currently. """ @@ -201,8 +200,10 @@ def send(self, msg, timeout=None): try: self.__session_send.frames.write([can_frame], timeout) - except errors.XnetError as err: - raise NiXNETError(function="send", error_message=err.args[0]) from None + except errors.XnetError as error: + raise CanOperationError( + f"{error.args[0]} ({error.error_type})", error.error_code + ) from None def reset(self): """ @@ -253,18 +254,3 @@ def _detect_available_configs(): logger.debug("An error occured while searching for configs: %s", str(error)) return configs - - -# To-Do review error management, I don't like this implementation -class NiXNETError(CanError): - """Error from NI-XNET driver.""" - - def __init__(self, function="", error_message=""): - super(NiXNETError, self).__init__() - #: Function that failed - self.function = function - #: Arguments passed to function - self.error_message = error_message - - def __str__(self): - return "Function %s failed:\n%s" % (self.function, self.error_message)