diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index ed366b4c8..827e94215 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -10,6 +10,7 @@ import logging import time import os +from typing import Any, Dict, List, Optional, Union try: # Try builtin Python 3 Windows API @@ -31,6 +32,7 @@ from can import BusABC, Message from can.util import len2dlc, dlc2len from .exceptions import VectorError +from can.bit_timing import BitTiming # Define Module Logger # ==================== @@ -53,22 +55,24 @@ class VectorBus(BusABC): def __init__( self, - channel, - can_filters=None, - poll_interval=0.01, - receive_own_messages=False, - bitrate=None, - rx_queue_size=2 ** 14, - app_name="CANalyzer", - serial=None, - fd=False, - data_bitrate=None, - sjwAbr=2, - tseg1Abr=6, - tseg2Abr=3, - sjwDbr=2, - tseg1Dbr=6, - tseg2Dbr=3, + channel: Union[int, List[int]], + can_filters: Optional[List[Dict[str, int]]] = None, + poll_interval: float = 0.01, + receive_own_messages: bool = False, + bitrate: Optional[int] = None, + timing: Optional[BitTiming] = None, + data_timing: Optional[BitTiming] = None, + rx_queue_size: int = 2 ** 14, + app_name: str = "CANalyzer", + serial: Optional[int] = None, + fd: bool = False, + data_bitrate: Optional[int] = None, + sjwAbr: int = 2, + tseg1Abr: int = 6, + tseg2Abr: int = 3, + sjwDbr: int = 2, + tseg1Dbr: int = 6, + tseg2Dbr: int = 3, **kwargs, ): """ @@ -79,6 +83,11 @@ def __init__( Poll interval in seconds. :param int bitrate: Bitrate in bits/s. + :param can.BitTiming timing: + Bit timing configuration. + For CAN-FD this also applies to arbitration/nominal phase if no data_timing is provided. + :param can.BitTiming data_timing: + Bit timing configuration for CAN FD data phase. :param int rx_queue_size: Number of messages in receive queue (power of 2). CAN: range 16…32768 @@ -160,7 +169,7 @@ def __init__( hw_channel, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value, ) - LOG.debug("Channel index %d found", channel) + LOG.debug(f"Channel index {channel} found") idx = xldriver.xlGetChannelIndex( hw_type.value, hw_index.value, hw_channel.value ) @@ -184,7 +193,7 @@ def __init__( permission_mask = xlclass.XLaccess() # Set mask to request channel init permission if needed - if bitrate or fd: + if bitrate or fd or timing: permission_mask.value = self.mask if fd: xldriver.xlOpenPort( @@ -207,55 +216,60 @@ def __init__( xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value, ) LOG.debug( - "Open Port: PortHandle: %d, PermissionMask: 0x%X", - self.port_handle.value, - permission_mask.value, + f"xlOpenPort: " + f"PortHandle={self.port_handle.value}, " + f"PermissionMask={permission_mask.value}" ) + # If application has init access, set CAN settings if permission_mask.value == self.mask: if fd: - self.canFdConf = xlclass.XLcanFdConf() - if bitrate: - self.canFdConf.arbitrationBitRate = ctypes.c_uint(bitrate) - else: - self.canFdConf.arbitrationBitRate = ctypes.c_uint(500000) - self.canFdConf.sjwAbr = ctypes.c_uint(sjwAbr) - self.canFdConf.tseg1Abr = ctypes.c_uint(tseg1Abr) - self.canFdConf.tseg2Abr = ctypes.c_uint(tseg2Abr) - if data_bitrate: - self.canFdConf.dataBitRate = ctypes.c_uint(data_bitrate) - else: - self.canFdConf.dataBitRate = self.canFdConf.arbitrationBitRate - self.canFdConf.sjwDbr = ctypes.c_uint(sjwDbr) - self.canFdConf.tseg1Dbr = ctypes.c_uint(tseg1Dbr) - self.canFdConf.tseg2Dbr = ctypes.c_uint(tseg2Dbr) + self.canFdConf = self._get_canfdconf( + timing, + data_timing, + bitrate, + sjwAbr, + tseg1Abr, + tseg2Abr, + data_bitrate, + sjwDbr, + tseg1Dbr, + tseg2Dbr, + ) xldriver.xlCanFdSetConfiguration( self.port_handle, self.mask, self.canFdConf ) LOG.info( - "SetFdConfig.: ABaudr.=%u, DBaudr.=%u", - self.canFdConf.arbitrationBitRate, - self.canFdConf.dataBitRate, - ) - LOG.info( - "SetFdConfig.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u", - self.canFdConf.sjwAbr, - self.canFdConf.tseg1Abr, - self.canFdConf.tseg2Abr, - ) - LOG.info( - "SetFdConfig.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u", - self.canFdConf.sjwDbr, - self.canFdConf.tseg1Dbr, - self.canFdConf.tseg2Dbr, + f"xlCanFdSetConfiguration: " + f"arbitrationBitRate={self.canFdConf.arbitrationBitRate}, " + f"sjwAbr={self.canFdConf.sjwAbr}, " + f"tseg1Abr={self.canFdConf.tseg1Abr}, " + f"tseg2Abr={self.canFdConf.tseg2Abr}, " + f"dataBitRate={self.canFdConf.dataBitRate}, " + f"sjwDbr={self.canFdConf.sjwDbr}, " + f"tseg1Dbr={self.canFdConf.tseg1Dbr}, " + f"tseg2Dbr={self.canFdConf.tseg2Dbr}" ) else: - if bitrate: + if timing: + self.chipParams = self._get_chipparams(timing) + xldriver.xlCanSetChannelParams( + self.port_handle, self.mask, self.chipParams + ) + LOG.info( + f"xlCanSetChannelParams: " + f"bitRate={self.chipParams.bitRate}, " + f"sjwAbr={self.chipParams.sjw}, " + f"tseg1={self.chipParams.tseg1}, " + f"tseg2={self.chipParams.tseg2}, " + f"sam={self.chipParams.sam}" + ) + elif bitrate: xldriver.xlCanSetChannelBitrate( self.port_handle, permission_mask, bitrate ) - LOG.info("SetChannelBitrate: baudr.=%u", bitrate) + LOG.info(f"xlCanSetChannelBitrate: bitrate={bitrate}") else: LOG.info("No init access!") @@ -288,7 +302,64 @@ def __init__( self._is_filtered = False super().__init__(channel=channel, can_filters=can_filters, **kwargs) - def _apply_filters(self, filters): + @staticmethod + def _get_canfdconf( + timing: Optional[BitTiming] = None, + data_timing: Optional[BitTiming] = None, + bitrate: Optional[int] = None, + sjwAbr: Optional[int] = None, + tseg1Abr: Optional[int] = None, + tseg2Abr: Optional[int] = None, + data_bitrate: Optional[int] = None, + sjwDbr: Optional[int] = None, + tseg1Dbr: Optional[int] = None, + tseg2Dbr: Optional[int] = None, + ) -> xlclass.XLcanFdConf: + canFdConf = xlclass.XLcanFdConf() + if timing: + canFdConf.arbitrationBitRate = ctypes.c_uint(timing.bitrate) + canFdConf.sjwAbr = ctypes.c_uint(timing.sjw) + canFdConf.tseg1Abr = ctypes.c_uint(timing.tseg1) + canFdConf.tseg2Abr = ctypes.c_uint(timing.tseg2) + if data_timing: + canFdConf.dataBitRate = ctypes.c_uint(data_timing.bitrate) + canFdConf.sjwDbr = ctypes.c_uint(data_timing.sjw) + canFdConf.tseg1Dbr = ctypes.c_uint(data_timing.tseg1) + canFdConf.tseg2Dbr = ctypes.c_uint(data_timing.tseg2) + else: + canFdConf.dataBitRate = ctypes.c_uint(timing.bitrate) + canFdConf.sjwDbr = ctypes.c_uint(timing.sjw) + canFdConf.tseg1Dbr = ctypes.c_uint(timing.tseg1) + canFdConf.tseg2Dbr = ctypes.c_uint(timing.tseg2) + else: + if bitrate: + canFdConf.arbitrationBitRate = ctypes.c_uint(bitrate) + else: + canFdConf.arbitrationBitRate = ctypes.c_uint(500000) + canFdConf.sjwAbr = ctypes.c_uint(sjwAbr) + canFdConf.tseg1Abr = ctypes.c_uint(tseg1Abr) + canFdConf.tseg2Abr = ctypes.c_uint(tseg2Abr) + if data_bitrate: + canFdConf.dataBitRate = ctypes.c_uint(data_bitrate) + else: + canFdConf.dataBitRate = canFdConf.arbitrationBitRate + canFdConf.sjwDbr = ctypes.c_uint(sjwDbr) + canFdConf.tseg1Dbr = ctypes.c_uint(tseg1Dbr) + canFdConf.tseg2Dbr = ctypes.c_uint(tseg2Dbr) + + return canFdConf + + @staticmethod + def _get_chipparams(timing: BitTiming) -> xlclass.XLchipParams: + chipParams = xlclass.XLchipParams() + chipParams.bitRate = ctypes.c_ulong(timing.bitrate) + chipParams.sjw = ctypes.c_ubyte(timing.sjw) + chipParams.tseg1 = ctypes.c_ubyte(timing.tseg1) + chipParams.tseg2 = ctypes.c_ubyte(timing.tseg2) + chipParams.sam = ctypes.c_ubyte(timing.nof_samples) + return chipParams + + def _apply_filters(self, filters: List[Dict[str, int]]): if filters: # Only up to one filter per ID type allowed if len(filters) == 1 or ( @@ -543,7 +614,7 @@ def reset(self): ) @staticmethod - def _detect_available_configs(): + def _detect_available_configs() -> List[Dict[str, Any]]: configs = [] channel_configs = get_channel_configs() LOG.info("Found %d channels", len(channel_configs)) diff --git a/test/test_vector.py b/test/test_vector.py index d99509df8..1f22ac037 100644 --- a/test/test_vector.py +++ b/test/test_vector.py @@ -35,6 +35,9 @@ def setUp(self) -> None: can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration = Mock( return_value=0 ) + can.interfaces.vector.canlib.xldriver.xlCanSetChannelParams = Mock( + return_value=0 + ) can.interfaces.vector.canlib.xldriver.xlCanSetChannelMode = Mock(return_value=0) can.interfaces.vector.canlib.xldriver.xlActivateChannel = Mock(return_value=0) can.interfaces.vector.canlib.xldriver.xlGetSyncTime = Mock( @@ -91,7 +94,6 @@ def test_bus_creation_bitrate(self) -> None: self.assertIsInstance(self.bus, canlib.VectorBus) can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() - can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called() xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0] self.assertEqual( @@ -106,12 +108,109 @@ def test_bus_creation_bitrate(self) -> None: ] self.assertEqual(xlCanSetChannelBitrate_args[2], 200000) + def test_bus_creation_bittiming_class(self) -> None: + timing = can.BitTiming( + f_clock=8000000, bitrate=250000, tseg1=13, tseg2=2, sjw=1 + ) + self.bus = self.bus = can.Bus( + channel=0, bustype="vector", timing=timing, _testing=True + ) + self.assertIsInstance(self.bus, canlib.VectorBus) + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() + can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() + can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called() + xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0] + self.assertEqual( + xlOpenPort_args[5], xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION.value + ) + self.assertEqual(xlOpenPort_args[6], xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value) + can.interfaces.vector.canlib.xldriver.xlCanSetChannelParams.assert_called() + xlCanSetChannelParams_args = can.interfaces.vector.canlib.xldriver.xlCanSetChannelParams.call_args[ + 0 + ] + chipParams = xlCanSetChannelParams_args[2] + self.assertEqual(chipParams.bitRate, 250000) + self.assertEqual(chipParams.tseg1, 13) + self.assertEqual(chipParams.tseg2, 2) + self.assertEqual(chipParams.sam, 1) + + def test_bus_creation_bittiming_class_fd(self) -> None: + timing = can.BitTiming( + f_clock=8000000, bitrate=250000, tseg1=13, tseg2=2, sjw=1 + ) + data_timing = can.BitTiming( + f_clock=80000000, bitrate=2000000, tseg1=29, tseg2=10, sjw=10 + ) + self.bus = self.bus = can.Bus( + channel=0, + bustype="vector", + timing=timing, + data_timing=data_timing, + fd=True, + _testing=True, + ) + self.assertIsInstance(self.bus, canlib.VectorBus) + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() + can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() + can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called() + xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0] + self.assertEqual( + xlOpenPort_args[5], + xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4.value, + ) + self.assertEqual(xlOpenPort_args[6], xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value) + can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_called() + can.interfaces.vector.canlib.xldriver.xlCanSetChannelBitrate.assert_not_called() + xlCanFdSetConfiguration_args = can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.call_args[ + 0 + ] + canFdConf = xlCanFdSetConfiguration_args[2] + self.assertEqual(canFdConf.arbitrationBitRate, 250000) + self.assertEqual(canFdConf.dataBitRate, 2000000) + self.assertEqual(canFdConf.sjwAbr, 1) + self.assertEqual(canFdConf.tseg1Abr, 13) + self.assertEqual(canFdConf.tseg2Abr, 2) + self.assertEqual(canFdConf.sjwDbr, 10) + self.assertEqual(canFdConf.tseg1Dbr, 29) + self.assertEqual(canFdConf.tseg2Dbr, 10) + + def test_bus_creation_bittiming_class_fd_2(self) -> None: + timing = can.BitTiming( + f_clock=8000000, bitrate=250000, tseg1=13, tseg2=2, sjw=1 + ) + self.bus = self.bus = can.Bus( + channel=0, bustype="vector", timing=timing, fd=True, _testing=True + ) + self.assertIsInstance(self.bus, canlib.VectorBus) + can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() + can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() + can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called() + xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0] + self.assertEqual( + xlOpenPort_args[5], + xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4.value, + ) + self.assertEqual(xlOpenPort_args[6], xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value) + can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_called() + can.interfaces.vector.canlib.xldriver.xlCanSetChannelBitrate.assert_not_called() + xlCanFdSetConfiguration_args = can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.call_args[ + 0 + ] + canFdConf = xlCanFdSetConfiguration_args[2] + self.assertEqual(canFdConf.arbitrationBitRate, 250000) + self.assertEqual(canFdConf.dataBitRate, 250000) + self.assertEqual(canFdConf.sjwAbr, 1) + self.assertEqual(canFdConf.tseg1Abr, 13) + self.assertEqual(canFdConf.tseg2Abr, 2) + self.assertEqual(canFdConf.sjwDbr, 1) + self.assertEqual(canFdConf.tseg1Dbr, 13) + self.assertEqual(canFdConf.tseg2Dbr, 2) + def test_bus_creation_fd(self) -> None: self.bus = can.Bus(channel=0, bustype="vector", fd=True, _testing=True) self.assertIsInstance(self.bus, canlib.VectorBus) can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() - can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called() xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0] self.assertEqual( @@ -119,7 +218,6 @@ def test_bus_creation_fd(self) -> None: xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4.value, ) self.assertEqual(xlOpenPort_args[6], xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value) - can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_called() can.interfaces.vector.canlib.xldriver.xlCanSetChannelBitrate.assert_not_called() @@ -141,7 +239,6 @@ def test_bus_creation_fd_bitrate_timings(self) -> None: self.assertIsInstance(self.bus, canlib.VectorBus) can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called() can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called() - can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called() xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0] self.assertEqual( @@ -149,10 +246,8 @@ def test_bus_creation_fd_bitrate_timings(self) -> None: xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4.value, ) self.assertEqual(xlOpenPort_args[6], xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value) - can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_called() can.interfaces.vector.canlib.xldriver.xlCanSetChannelBitrate.assert_not_called() - xlCanFdSetConfiguration_args = can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.call_args[ 0 ]