diff --git a/can/interfaces/usb2can/serial_selector.py b/can/interfaces/usb2can/serial_selector.py index d9beb5df4..fcc951262 100644 --- a/can/interfaces/usb2can/serial_selector.py +++ b/can/interfaces/usb2can/serial_selector.py @@ -2,6 +2,7 @@ """ import logging +from typing import List try: import win32com.client @@ -10,7 +11,7 @@ raise -def WMIDateStringToDate(dtmDate): +def WMIDateStringToDate(dtmDate) -> str: if dtmDate[4] == 0: strDateTime = dtmDate[5] + "/" else: @@ -39,14 +40,12 @@ def WMIDateStringToDate(dtmDate): return strDateTime -def find_serial_devices(serial_matcher="ED"): +def find_serial_devices(serial_matcher: str = "ED") -> List[str]: """ Finds a list of USB devices where the serial number (partially) matches the given string. - :param str serial_matcher (optional): + :param serial_matcher: only device IDs starting with this string are returned - - :rtype: List[str] """ objWMIService = win32com.client.Dispatch("WbemScripting.SWbemLocator") objSWbemServices = objWMIService.ConnectServer(".", "root\\cimv2") diff --git a/can/interfaces/usb2can/usb2canInterface.py b/can/interfaces/usb2can/usb2canInterface.py index 479c54764..4c086adea 100644 --- a/can/interfaces/usb2can/usb2canInterface.py +++ b/can/interfaces/usb2can/usb2canInterface.py @@ -1,12 +1,19 @@ """ -This interface is for Windows only, otherwise use socketCAN. +This interface is for Windows only, otherwise use SocketCAN. """ import logging from ctypes import byref - -from can import BusABC, Message, CanError -from .usb2canabstractionlayer import * +from typing import Optional + +from can import BusABC, Message, CanInitializationError, CanOperationError +from .usb2canabstractionlayer import Usb2CanAbstractionLayer, CanalMsg, CanalError +from .usb2canabstractionlayer import ( + flags_t, + IS_ERROR_FRAME, + IS_REMOTE_FRAME, + IS_ID_TYPE, +) from .serial_selector import find_serial_devices # Set up logging @@ -90,7 +97,7 @@ def __init__( flags=0x00000008, *args, bitrate=500000, - **kwargs + **kwargs, ): self.can = Usb2CanAbstractionLayer(dll) @@ -102,15 +109,15 @@ def __init__( if not device_id: devices = find_serial_devices() if not devices: - raise CanError("could not automatically find any device") + raise CanInitializationError("could not automatically find any device") device_id = devices[0] # convert to kb/s and cap: max rate is 1000 kb/s baudrate = min(int(bitrate // 1000), 1000) - self.channel_info = "USB2CAN device {}".format(device_id) + self.channel_info = f"USB2CAN device {device_id}" - connector = "{}; {}".format(device_id, baudrate) + connector = f"{device_id}; {baudrate}" self.handle = self.can.open(connector, flags_t) super().__init__( @@ -126,7 +133,7 @@ def send(self, msg, timeout=None): status = self.can.send(self.handle, byref(tx)) if status != CanalError.SUCCESS: - raise CanError("could not send message: status == {}".format(status)) + raise CanOperationError("could not send message", error_code=status) def _recv_internal(self, timeout): @@ -148,8 +155,7 @@ def _recv_internal(self, timeout): ): rx = None else: - log.error("Canal Error %s", status) - rx = None + raise CanOperationError("could not receive message", error_code=status) return rx, False @@ -157,28 +163,28 @@ def shutdown(self): """ Shuts down connection to the device safely. - :raise cam.CanError: is closing the connection did not work + :raise cam.CanOperationError: is closing the connection did not work """ status = self.can.close(self.handle) if status != CanalError.SUCCESS: - raise CanError("could not shut down bus: status == {}".format(status)) + raise CanOperationError("could not shut down bus", error_code=status) @staticmethod def _detect_available_configs(): return Usb2canBus.detect_available_configs() @staticmethod - def detect_available_configs(serial_matcher=None): + def detect_available_configs(serial_matcher: Optional[str] = None): """ - Uses the Windows Management Instrumentation to identify serial devices. + Uses the *Windows Management Instrumentation* to identify serial devices. - :param str serial_matcher (optional): + :param serial_matcher: search string for automatic detection of the device serial """ - if serial_matcher: - channels = find_serial_devices(serial_matcher) - else: + if serial_matcher is None: channels = find_serial_devices() + else: + channels = find_serial_devices(serial_matcher) return [{"interface": "usb2can", "channel": c} for c in channels] diff --git a/can/interfaces/usb2can/usb2canabstractionlayer.py b/can/interfaces/usb2can/usb2canabstractionlayer.py index ce8157a09..f3d7ad0ee 100644 --- a/can/interfaces/usb2can/usb2canabstractionlayer.py +++ b/can/interfaces/usb2can/usb2canabstractionlayer.py @@ -4,9 +4,9 @@ """ from ctypes import * -from struct import * -from enum import Enum +from enum import IntEnum import logging +from contextlib import contextmanager import can @@ -25,7 +25,7 @@ IS_ID_TYPE = 1 -class CanalError(Enum): +class CanalError(IntEnum): SUCCESS = 0 BAUDRATE = 1 BUS_OFF = 2 @@ -102,6 +102,14 @@ class CanalMsg(Structure): ] +@contextmanager +def error_check(error_message: str) -> None: + try: + yield + except Exception as error: + raise can.CanOperationError(error_message) from error + + class Usb2CanAbstractionLayer: """A low level wrapper around the usb2can library. @@ -112,21 +120,26 @@ def __init__(self, dll="usb2can.dll"): """ :type dll: str or path-like :param dll (optional): the path to the usb2can DLL to load - :raises OSError: if the DLL could not be loaded + + :raises can.CanInterfaceNotImplementedError: if the DLL could not be loaded """ - self.__m_dllBasic = windll.LoadLibrary(dll) + try: + self.__m_dllBasic = windll.LoadLibrary(dll) + if self.__m_dllBasic is None: + raise Exception("__m_dllBasic is None") - if self.__m_dllBasic is None: - log.warning("DLL failed to load at path: {}".format(dll)) + except Exception as error: + message = f"DLL failed to load at path: {dll}" + raise can.CanInterfaceNotImplementedError(message) from error - def open(self, configuration, flags): + def open(self, configuration: str, flags: int): """ Opens a CAN connection using `CanalOpen()`. - :param str configuration: the configuration: "device_id; baudrate" - :param int flags: the flags to be set + :param configuration: the configuration: "device_id; baudrate" + :param flags: the flags to be set - :raises can.CanError: if any error occurred + :raises can.CanInitializationError: if any error occurred :returns: Valid handle for CANAL API functions on success """ try: @@ -136,100 +149,59 @@ def open(self, configuration, flags): result = self.__m_dllBasic.CanalOpen(config_ascii, flags) except Exception as ex: # catch any errors thrown by this call and re-raise - raise can.CanError( - 'CanalOpen() failed, configuration: "{}", error: {}'.format( - configuration, ex - ) + raise can.CanInitializationError( + f'CanalOpen() failed, configuration: "{configuration}", error: {ex}' ) else: # any greater-than-zero return value indicates a success # (see https://grodansparadis.gitbooks.io/the-vscp-daemon/canal_interface_specification.html) # raise an error if the return code is <= 0 if result <= 0: - raise can.CanError( - 'CanalOpen() failed, configuration: "{}", return code: {}'.format( - configuration, result - ) + raise can.CanInitializationError( + f'CanalOpen() failed, configuration: "{configuration}"', + error_code=result, ) else: return result - def close(self, handle): - try: - res = self.__m_dllBasic.CanalClose(handle) - return CanalError(res) - except: - log.warning("Failed to close") - raise + def close(self, handle) -> CanalError: + with error_check("Failed to close"): + return CanalError(self.__m_dllBasic.CanalClose(handle)) - def send(self, handle, msg): - try: - res = self.__m_dllBasic.CanalSend(handle, msg) - return CanalError(res) - except: - log.warning("Sending error") - raise can.CanError("Failed to transmit frame") + def send(self, handle, msg) -> CanalError: + with error_check("Failed to transmit frame"): + return CanalError(self.__m_dllBasic.CanalSend(handle, msg)) - def receive(self, handle, msg): - try: - res = self.__m_dllBasic.CanalReceive(handle, msg) - return CanalError(res) - except: - log.warning("Receive error") - raise + def receive(self, handle, msg) -> CanalError: + with error_check("Receive error"): + return CanalError(self.__m_dllBasic.CanalReceive(handle, msg)) - def blocking_send(self, handle, msg, timeout): - try: - res = self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout) - return CanalError(res) - except: - log.warning("Blocking send error") - raise + def blocking_send(self, handle, msg, timeout) -> CanalError: + with error_check("Blocking send error"): + return CanalError(self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout)) - def blocking_receive(self, handle, msg, timeout): - try: - res = self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout) - return CanalError(res) - except: - log.warning("Blocking Receive Failed") - raise + def blocking_receive(self, handle, msg, timeout) -> CanalError: + with error_check("Blocking Receive Failed"): + return CanalError( + self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout) + ) - def get_status(self, handle, status): - try: - res = self.__m_dllBasic.CanalGetStatus(handle, status) - return CanalError(res) - except: - log.warning("Get status failed") - raise + def get_status(self, handle, status) -> CanalError: + with error_check("Get status failed"): + return CanalError(self.__m_dllBasic.CanalGetStatus(handle, status)) - def get_statistics(self, handle, statistics): - try: - res = self.__m_dllBasic.CanalGetStatistics(handle, statistics) - return CanalError(res) - except: - log.warning("Get Statistics failed") - raise + def get_statistics(self, handle, statistics) -> CanalError: + with error_check("Get Statistics failed"): + return CanalError(self.__m_dllBasic.CanalGetStatistics(handle, statistics)) def get_version(self): - try: - res = self.__m_dllBasic.CanalGetVersion() - return res - except: - log.warning("Failed to get version info") - raise + with error_check("Failed to get version info"): + return self.__m_dllBasic.CanalGetVersion() def get_library_version(self): - try: - res = self.__m_dllBasic.CanalGetDllVersion() - return res - except: - log.warning("Failed to get DLL version") - raise + with error_check("Failed to get DLL version"): + return self.__m_dllBasic.CanalGetDllVersion() def get_vendor_string(self): - try: - res = self.__m_dllBasic.CanalGetVendorString() - return res - except: - log.warning("Failed to get vendor string") - raise + with error_check("Failed to get vendor string"): + return self.__m_dllBasic.CanalGetVendorString()