Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions can/interfaces/usb2can/serial_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""

import logging
from typing import List

try:
import win32com.client
Expand All @@ -10,7 +11,7 @@
raise


def WMIDateStringToDate(dtmDate):
def WMIDateStringToDate(dtmDate) -> str:
if dtmDate[4] == 0:
strDateTime = dtmDate[5] + "/"
else:
Expand Down Expand Up @@ -39,14 +40,12 @@ def WMIDateStringToDate(dtmDate):
return strDateTime


def find_serial_devices(serial_matcher="ED"):
def find_serial_devices(serial_matcher: str = "ED") -> List[str]:
"""
Finds a list of USB devices where the serial number (partially) matches the given string.

:param str serial_matcher (optional):
:param serial_matcher:
only device IDs starting with this string are returned

:rtype: List[str]
"""
objWMIService = win32com.client.Dispatch("WbemScripting.SWbemLocator")
objSWbemServices = objWMIService.ConnectServer(".", "root\\cimv2")
Expand Down
44 changes: 25 additions & 19 deletions can/interfaces/usb2can/usb2canInterface.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
"""
This interface is for Windows only, otherwise use socketCAN.
This interface is for Windows only, otherwise use SocketCAN.
"""

import logging
from ctypes import byref

from can import BusABC, Message, CanError
from .usb2canabstractionlayer import *
from typing import Optional

from can import BusABC, Message, CanInitializationError, CanOperationError
from .usb2canabstractionlayer import Usb2CanAbstractionLayer, CanalMsg, CanalError
from .usb2canabstractionlayer import (
flags_t,
IS_ERROR_FRAME,
IS_REMOTE_FRAME,
IS_ID_TYPE,
)
from .serial_selector import find_serial_devices

# Set up logging
Expand Down Expand Up @@ -90,7 +97,7 @@ def __init__(
flags=0x00000008,
*args,
bitrate=500000,
**kwargs
**kwargs,
):

self.can = Usb2CanAbstractionLayer(dll)
Expand All @@ -102,15 +109,15 @@ def __init__(
if not device_id:
devices = find_serial_devices()
if not devices:
raise CanError("could not automatically find any device")
raise CanInitializationError("could not automatically find any device")
device_id = devices[0]

# convert to kb/s and cap: max rate is 1000 kb/s
baudrate = min(int(bitrate // 1000), 1000)

self.channel_info = "USB2CAN device {}".format(device_id)
self.channel_info = f"USB2CAN device {device_id}"

connector = "{}; {}".format(device_id, baudrate)
connector = f"{device_id}; {baudrate}"
self.handle = self.can.open(connector, flags_t)

super().__init__(
Expand All @@ -126,7 +133,7 @@ def send(self, msg, timeout=None):
status = self.can.send(self.handle, byref(tx))

if status != CanalError.SUCCESS:
raise CanError("could not send message: status == {}".format(status))
raise CanOperationError("could not send message", error_code=status)

def _recv_internal(self, timeout):

Expand All @@ -148,37 +155,36 @@ def _recv_internal(self, timeout):
):
rx = None
else:
log.error("Canal Error %s", status)
rx = None
raise CanOperationError("could not receive message", error_code=status)

return rx, False

def shutdown(self):
"""
Shuts down connection to the device safely.

:raise cam.CanError: is closing the connection did not work
:raise cam.CanOperationError: is closing the connection did not work
"""
status = self.can.close(self.handle)

if status != CanalError.SUCCESS:
raise CanError("could not shut down bus: status == {}".format(status))
raise CanOperationError("could not shut down bus", error_code=status)

@staticmethod
def _detect_available_configs():
return Usb2canBus.detect_available_configs()

@staticmethod
def detect_available_configs(serial_matcher=None):
def detect_available_configs(serial_matcher: Optional[str] = None):
"""
Uses the Windows Management Instrumentation to identify serial devices.
Uses the *Windows Management Instrumentation* to identify serial devices.

:param str serial_matcher (optional):
:param serial_matcher:
search string for automatic detection of the device serial
"""
if serial_matcher:
channels = find_serial_devices(serial_matcher)
else:
if serial_matcher is None:
channels = find_serial_devices()
else:
channels = find_serial_devices(serial_matcher)

return [{"interface": "usb2can", "channel": c} for c in channels]
144 changes: 58 additions & 86 deletions can/interfaces/usb2can/usb2canabstractionlayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"""

from ctypes import *
from struct import *
from enum import Enum
from enum import IntEnum
import logging
from contextlib import contextmanager

import can

Expand All @@ -25,7 +25,7 @@
IS_ID_TYPE = 1


class CanalError(Enum):
class CanalError(IntEnum):
SUCCESS = 0
BAUDRATE = 1
BUS_OFF = 2
Expand Down Expand Up @@ -102,6 +102,14 @@ class CanalMsg(Structure):
]


@contextmanager
def error_check(error_message: str) -> None:
try:
yield
except Exception as error:
raise can.CanOperationError(error_message) from error
Comment on lines +105 to +110
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's neat

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank's Brian for the chunk of reviews!



class Usb2CanAbstractionLayer:
"""A low level wrapper around the usb2can library.

Expand All @@ -112,21 +120,26 @@ def __init__(self, dll="usb2can.dll"):
"""
:type dll: str or path-like
:param dll (optional): the path to the usb2can DLL to load
:raises OSError: if the DLL could not be loaded

:raises can.CanInterfaceNotImplementedError: if the DLL could not be loaded
"""
self.__m_dllBasic = windll.LoadLibrary(dll)
try:
self.__m_dllBasic = windll.LoadLibrary(dll)
if self.__m_dllBasic is None:
raise Exception("__m_dllBasic is None")

if self.__m_dllBasic is None:
log.warning("DLL failed to load at path: {}".format(dll))
except Exception as error:
message = f"DLL failed to load at path: {dll}"
raise can.CanInterfaceNotImplementedError(message) from error

def open(self, configuration, flags):
def open(self, configuration: str, flags: int):
"""
Opens a CAN connection using `CanalOpen()`.

:param str configuration: the configuration: "device_id; baudrate"
:param int flags: the flags to be set
:param configuration: the configuration: "device_id; baudrate"
:param flags: the flags to be set

:raises can.CanError: if any error occurred
:raises can.CanInitializationError: if any error occurred
:returns: Valid handle for CANAL API functions on success
"""
try:
Expand All @@ -136,100 +149,59 @@ def open(self, configuration, flags):
result = self.__m_dllBasic.CanalOpen(config_ascii, flags)
except Exception as ex:
# catch any errors thrown by this call and re-raise
raise can.CanError(
'CanalOpen() failed, configuration: "{}", error: {}'.format(
configuration, ex
)
raise can.CanInitializationError(
f'CanalOpen() failed, configuration: "{configuration}", error: {ex}'
)
else:
# any greater-than-zero return value indicates a success
# (see https://grodansparadis.gitbooks.io/the-vscp-daemon/canal_interface_specification.html)
# raise an error if the return code is <= 0
if result <= 0:
raise can.CanError(
'CanalOpen() failed, configuration: "{}", return code: {}'.format(
configuration, result
)
raise can.CanInitializationError(
f'CanalOpen() failed, configuration: "{configuration}"',
error_code=result,
)
else:
return result

def close(self, handle):
try:
res = self.__m_dllBasic.CanalClose(handle)
return CanalError(res)
except:
log.warning("Failed to close")
raise
def close(self, handle) -> CanalError:
with error_check("Failed to close"):
return CanalError(self.__m_dllBasic.CanalClose(handle))

def send(self, handle, msg):
try:
res = self.__m_dllBasic.CanalSend(handle, msg)
return CanalError(res)
except:
log.warning("Sending error")
raise can.CanError("Failed to transmit frame")
def send(self, handle, msg) -> CanalError:
with error_check("Failed to transmit frame"):
return CanalError(self.__m_dllBasic.CanalSend(handle, msg))

def receive(self, handle, msg):
try:
res = self.__m_dllBasic.CanalReceive(handle, msg)
return CanalError(res)
except:
log.warning("Receive error")
raise
def receive(self, handle, msg) -> CanalError:
with error_check("Receive error"):
return CanalError(self.__m_dllBasic.CanalReceive(handle, msg))

def blocking_send(self, handle, msg, timeout):
try:
res = self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout)
return CanalError(res)
except:
log.warning("Blocking send error")
raise
def blocking_send(self, handle, msg, timeout) -> CanalError:
with error_check("Blocking send error"):
return CanalError(self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout))

def blocking_receive(self, handle, msg, timeout):
try:
res = self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout)
return CanalError(res)
except:
log.warning("Blocking Receive Failed")
raise
def blocking_receive(self, handle, msg, timeout) -> CanalError:
with error_check("Blocking Receive Failed"):
return CanalError(
self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout)
)

def get_status(self, handle, status):
try:
res = self.__m_dllBasic.CanalGetStatus(handle, status)
return CanalError(res)
except:
log.warning("Get status failed")
raise
def get_status(self, handle, status) -> CanalError:
with error_check("Get status failed"):
return CanalError(self.__m_dllBasic.CanalGetStatus(handle, status))

def get_statistics(self, handle, statistics):
try:
res = self.__m_dllBasic.CanalGetStatistics(handle, statistics)
return CanalError(res)
except:
log.warning("Get Statistics failed")
raise
def get_statistics(self, handle, statistics) -> CanalError:
with error_check("Get Statistics failed"):
return CanalError(self.__m_dllBasic.CanalGetStatistics(handle, statistics))

def get_version(self):
try:
res = self.__m_dllBasic.CanalGetVersion()
return res
except:
log.warning("Failed to get version info")
raise
with error_check("Failed to get version info"):
return self.__m_dllBasic.CanalGetVersion()

def get_library_version(self):
try:
res = self.__m_dllBasic.CanalGetDllVersion()
return res
except:
log.warning("Failed to get DLL version")
raise
with error_check("Failed to get DLL version"):
return self.__m_dllBasic.CanalGetDllVersion()

def get_vendor_string(self):
try:
res = self.__m_dllBasic.CanalGetVendorString()
return res
except:
log.warning("Failed to get vendor string")
raise
with error_check("Failed to get vendor string"):
return self.__m_dllBasic.CanalGetVendorString()