Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 126 additions & 55 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import time
import os
from typing import Any, Dict, List, Optional, Union

try:
# Try builtin Python 3 Windows API
Expand All @@ -31,6 +32,7 @@
from can import BusABC, Message
from can.util import len2dlc, dlc2len
from .exceptions import VectorError
from can.bit_timing import BitTiming

# Define Module Logger
# ====================
Expand All @@ -53,22 +55,24 @@ class VectorBus(BusABC):

def __init__(
self,
channel,
can_filters=None,
poll_interval=0.01,
receive_own_messages=False,
bitrate=None,
rx_queue_size=2 ** 14,
app_name="CANalyzer",
serial=None,
fd=False,
data_bitrate=None,
sjwAbr=2,
tseg1Abr=6,
tseg2Abr=3,
sjwDbr=2,
tseg1Dbr=6,
tseg2Dbr=3,
channel: Union[int, List[int]],
can_filters: Optional[List[Dict[str, int]]] = None,
poll_interval: float = 0.01,
receive_own_messages: bool = False,
bitrate: Optional[int] = None,
timing: Optional[BitTiming] = None,
data_timing: Optional[BitTiming] = None,
rx_queue_size: int = 2 ** 14,
app_name: str = "CANalyzer",
serial: Optional[int] = None,
fd: bool = False,
data_bitrate: Optional[int] = None,
sjwAbr: int = 2,
tseg1Abr: int = 6,
tseg2Abr: int = 3,
sjwDbr: int = 2,
tseg1Dbr: int = 6,
tseg2Dbr: int = 3,
**kwargs,
):
"""
Expand All @@ -79,6 +83,11 @@ def __init__(
Poll interval in seconds.
:param int bitrate:
Bitrate in bits/s.
:param can.BitTiming timing:
Bit timing configuration.
For CAN-FD this also applies to arbitration/nominal phase if no data_timing is provided.
:param can.BitTiming data_timing:
Bit timing configuration for CAN FD data phase.
:param int rx_queue_size:
Number of messages in receive queue (power of 2).
CAN: range 16…32768
Expand Down Expand Up @@ -160,7 +169,7 @@ def __init__(
hw_channel,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value,
)
LOG.debug("Channel index %d found", channel)
LOG.debug(f"Channel index {channel} found")
idx = xldriver.xlGetChannelIndex(
hw_type.value, hw_index.value, hw_channel.value
)
Expand All @@ -184,7 +193,7 @@ def __init__(

permission_mask = xlclass.XLaccess()
# Set mask to request channel init permission if needed
if bitrate or fd:
if bitrate or fd or timing:
permission_mask.value = self.mask
if fd:
xldriver.xlOpenPort(
Expand All @@ -207,55 +216,60 @@ def __init__(
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value,
)
LOG.debug(
"Open Port: PortHandle: %d, PermissionMask: 0x%X",
self.port_handle.value,
permission_mask.value,
f"xlOpenPort: "
f"PortHandle={self.port_handle.value}, "
f"PermissionMask={permission_mask.value}"
)

# If application has init access, set CAN settings
if permission_mask.value == self.mask:
if fd:
self.canFdConf = xlclass.XLcanFdConf()
if bitrate:
self.canFdConf.arbitrationBitRate = ctypes.c_uint(bitrate)
else:
self.canFdConf.arbitrationBitRate = ctypes.c_uint(500000)
self.canFdConf.sjwAbr = ctypes.c_uint(sjwAbr)
self.canFdConf.tseg1Abr = ctypes.c_uint(tseg1Abr)
self.canFdConf.tseg2Abr = ctypes.c_uint(tseg2Abr)
if data_bitrate:
self.canFdConf.dataBitRate = ctypes.c_uint(data_bitrate)
else:
self.canFdConf.dataBitRate = self.canFdConf.arbitrationBitRate
self.canFdConf.sjwDbr = ctypes.c_uint(sjwDbr)
self.canFdConf.tseg1Dbr = ctypes.c_uint(tseg1Dbr)
self.canFdConf.tseg2Dbr = ctypes.c_uint(tseg2Dbr)
self.canFdConf = self._get_canfdconf(
timing,
data_timing,
bitrate,
sjwAbr,
tseg1Abr,
tseg2Abr,
data_bitrate,
sjwDbr,
tseg1Dbr,
tseg2Dbr,
)

xldriver.xlCanFdSetConfiguration(
self.port_handle, self.mask, self.canFdConf
)
LOG.info(
"SetFdConfig.: ABaudr.=%u, DBaudr.=%u",
self.canFdConf.arbitrationBitRate,
self.canFdConf.dataBitRate,
)
LOG.info(
"SetFdConfig.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
self.canFdConf.sjwAbr,
self.canFdConf.tseg1Abr,
self.canFdConf.tseg2Abr,
)
LOG.info(
"SetFdConfig.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
self.canFdConf.sjwDbr,
self.canFdConf.tseg1Dbr,
self.canFdConf.tseg2Dbr,
f"xlCanFdSetConfiguration: "
f"arbitrationBitRate={self.canFdConf.arbitrationBitRate}, "
f"sjwAbr={self.canFdConf.sjwAbr}, "
f"tseg1Abr={self.canFdConf.tseg1Abr}, "
f"tseg2Abr={self.canFdConf.tseg2Abr}, "
f"dataBitRate={self.canFdConf.dataBitRate}, "
f"sjwDbr={self.canFdConf.sjwDbr}, "
f"tseg1Dbr={self.canFdConf.tseg1Dbr}, "
f"tseg2Dbr={self.canFdConf.tseg2Dbr}"
)
else:
if bitrate:
if timing:
self.chipParams = self._get_chipparams(timing)
xldriver.xlCanSetChannelParams(
self.port_handle, self.mask, self.chipParams
)
LOG.info(
f"xlCanSetChannelParams: "
f"bitRate={self.chipParams.bitRate}, "
f"sjwAbr={self.chipParams.sjw}, "
f"tseg1={self.chipParams.tseg1}, "
f"tseg2={self.chipParams.tseg2}, "
f"sam={self.chipParams.sam}"
)
elif bitrate:
xldriver.xlCanSetChannelBitrate(
self.port_handle, permission_mask, bitrate
)
LOG.info("SetChannelBitrate: baudr.=%u", bitrate)
LOG.info(f"xlCanSetChannelBitrate: bitrate={bitrate}")
else:
LOG.info("No init access!")

Expand Down Expand Up @@ -288,7 +302,64 @@ def __init__(
self._is_filtered = False
super().__init__(channel=channel, can_filters=can_filters, **kwargs)

def _apply_filters(self, filters):
@staticmethod
def _get_canfdconf(
timing: Optional[BitTiming] = None,
data_timing: Optional[BitTiming] = None,
bitrate: Optional[int] = None,
sjwAbr: Optional[int] = None,
tseg1Abr: Optional[int] = None,
tseg2Abr: Optional[int] = None,
data_bitrate: Optional[int] = None,
sjwDbr: Optional[int] = None,
tseg1Dbr: Optional[int] = None,
tseg2Dbr: Optional[int] = None,
) -> xlclass.XLcanFdConf:
canFdConf = xlclass.XLcanFdConf()
if timing:
canFdConf.arbitrationBitRate = ctypes.c_uint(timing.bitrate)
canFdConf.sjwAbr = ctypes.c_uint(timing.sjw)
canFdConf.tseg1Abr = ctypes.c_uint(timing.tseg1)
canFdConf.tseg2Abr = ctypes.c_uint(timing.tseg2)
if data_timing:
canFdConf.dataBitRate = ctypes.c_uint(data_timing.bitrate)
canFdConf.sjwDbr = ctypes.c_uint(data_timing.sjw)
canFdConf.tseg1Dbr = ctypes.c_uint(data_timing.tseg1)
canFdConf.tseg2Dbr = ctypes.c_uint(data_timing.tseg2)
else:
canFdConf.dataBitRate = ctypes.c_uint(timing.bitrate)
canFdConf.sjwDbr = ctypes.c_uint(timing.sjw)
canFdConf.tseg1Dbr = ctypes.c_uint(timing.tseg1)
canFdConf.tseg2Dbr = ctypes.c_uint(timing.tseg2)
else:
if bitrate:
canFdConf.arbitrationBitRate = ctypes.c_uint(bitrate)
else:
canFdConf.arbitrationBitRate = ctypes.c_uint(500000)
canFdConf.sjwAbr = ctypes.c_uint(sjwAbr)
canFdConf.tseg1Abr = ctypes.c_uint(tseg1Abr)
canFdConf.tseg2Abr = ctypes.c_uint(tseg2Abr)
if data_bitrate:
canFdConf.dataBitRate = ctypes.c_uint(data_bitrate)
else:
canFdConf.dataBitRate = canFdConf.arbitrationBitRate
canFdConf.sjwDbr = ctypes.c_uint(sjwDbr)
canFdConf.tseg1Dbr = ctypes.c_uint(tseg1Dbr)
canFdConf.tseg2Dbr = ctypes.c_uint(tseg2Dbr)

return canFdConf

@staticmethod
def _get_chipparams(timing: BitTiming) -> xlclass.XLchipParams:
chipParams = xlclass.XLchipParams()
chipParams.bitRate = ctypes.c_ulong(timing.bitrate)
chipParams.sjw = ctypes.c_ubyte(timing.sjw)
chipParams.tseg1 = ctypes.c_ubyte(timing.tseg1)
chipParams.tseg2 = ctypes.c_ubyte(timing.tseg2)
chipParams.sam = ctypes.c_ubyte(timing.nof_samples)
return chipParams

def _apply_filters(self, filters: List[Dict[str, int]]):
if filters:
# Only up to one filter per ID type allowed
if len(filters) == 1 or (
Expand Down Expand Up @@ -543,7 +614,7 @@ def reset(self):
)

@staticmethod
def _detect_available_configs():
def _detect_available_configs() -> List[Dict[str, Any]]:
configs = []
channel_configs = get_channel_configs()
LOG.info("Found %d channels", len(channel_configs))
Expand Down
Loading