diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 9cecaa83d..333380a6f 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -21,6 +21,7 @@ Any, Dict, Callable, + cast, ) WaitForSingleObject: Optional[Callable[[int, int], int]] @@ -43,7 +44,7 @@ deprecated_args_alias, time_perfcounter_correlation, ) -from can.typechecking import AutoDetectedConfig, CanFilters, Channel +from can.typechecking import AutoDetectedConfig, CanFilters # Define Module Logger # ==================== @@ -152,6 +153,7 @@ def __init__( if xldriver is None: raise CanInterfaceNotImplementedError("The Vector API has not been loaded") self.xldriver = xldriver # keep reference so mypy knows it is not None + self.xldriver.xlOpenDriver() self.poll_interval = poll_interval @@ -165,7 +167,7 @@ def __init__( self.channels = [int(ch) for ch in channel] else: raise TypeError( - f"Invalid type for channels parameter: {type(channel).__name__}" + f"Invalid type for parameter 'channel': {type(channel).__name__}" ) self._app_name = app_name.encode() if app_name is not None else b"" @@ -174,136 +176,71 @@ def __init__( ", ".join(f"CAN {ch + 1}" for ch in self.channels), ) - if serial is not None: - app_name = None - channel_index = [] - channel_configs = get_channel_configs() - for channel_config in channel_configs: - if channel_config.serialNumber == serial: - if channel_config.hwChannel in self.channels: - channel_index.append(channel_config.channelIndex) - if channel_index: - if len(channel_index) != len(self.channels): - LOG.info( - "At least one defined channel wasn't found on the specified hardware." - ) - self.channels = channel_index - else: - # Is there any better way to raise the error? - raise CanInitializationError( - "None of the configured channels could be found on the specified hardware." - ) + channel_configs = get_channel_configs() - self.xldriver.xlOpenDriver() - self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE) self.mask = 0 self.fd = fd - # Get channels masks - self.channel_masks: Dict[Optional[Channel], int] = {} - self.index_to_channel = {} + self.channel_masks: Dict[int, int] = {} + self.index_to_channel: Dict[int, int] = {} for channel in self.channels: - if app_name: - # Get global channel index from application channel - hw_type, hw_index, hw_channel = self.get_application_config( - app_name, channel - ) - LOG.debug("Channel index %d found", channel) - idx = self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel) - if idx < 0: - # Undocumented behavior! See issue #353. - # If hardware is unavailable, this function returns -1. - # Raise an exception as if the driver - # would have signalled XL_ERR_HW_NOT_PRESENT. - raise VectorInitializationError( - xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT, - xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.name, - "xlGetChannelIndex", - ) - else: - # Channel already given as global channel - idx = channel - mask = 1 << idx - self.channel_masks[channel] = mask - self.index_to_channel[idx] = channel - self.mask |= mask + channel_index = self._find_global_channel_idx( + channel=channel, + serial=serial, + app_name=app_name, + channel_configs=channel_configs, + ) + LOG.debug("Channel index %d found", channel) + + channel_mask = 1 << channel_index + self.channel_masks[channel] = channel_mask + self.index_to_channel[channel_index] = channel + self.mask |= channel_mask permission_mask = xlclass.XLaccess() # Set mask to request channel init permission if needed if bitrate or fd: permission_mask.value = self.mask - if fd: - self.xldriver.xlOpenPort( - self.port_handle, - self._app_name, - self.mask, - permission_mask, - rx_queue_size, - xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4, - xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, - ) - else: - self.xldriver.xlOpenPort( - self.port_handle, - self._app_name, - self.mask, - permission_mask, - rx_queue_size, - xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION, - xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, - ) + + interface_version = ( + xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4 + if fd + else xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION + ) + + self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE) + self.xldriver.xlOpenPort( + self.port_handle, + self._app_name, + self.mask, + permission_mask, + rx_queue_size, + interface_version, + xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, + ) + LOG.debug( "Open Port: PortHandle: %d, PermissionMask: 0x%X", self.port_handle.value, permission_mask.value, ) - if permission_mask.value == self.mask: - if fd: - self.canFdConf = xlclass.XLcanFdConf() - if bitrate: - self.canFdConf.arbitrationBitRate = int(bitrate) - else: - self.canFdConf.arbitrationBitRate = 500000 - self.canFdConf.sjwAbr = int(sjw_abr) - self.canFdConf.tseg1Abr = int(tseg1_abr) - self.canFdConf.tseg2Abr = int(tseg2_abr) - if data_bitrate: - self.canFdConf.dataBitRate = int(data_bitrate) - else: - self.canFdConf.dataBitRate = self.canFdConf.arbitrationBitRate - self.canFdConf.sjwDbr = int(sjw_dbr) - self.canFdConf.tseg1Dbr = int(tseg1_dbr) - self.canFdConf.tseg2Dbr = int(tseg2_dbr) - - self.xldriver.xlCanFdSetConfiguration( - self.port_handle, self.mask, self.canFdConf - ) - LOG.info( - "SetFdConfig.: ABaudr.=%u, DBaudr.=%u", - self.canFdConf.arbitrationBitRate, - self.canFdConf.dataBitRate, - ) - LOG.info( - "SetFdConfig.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u", - self.canFdConf.sjwAbr, - self.canFdConf.tseg1Abr, - self.canFdConf.tseg2Abr, - ) - LOG.info( - "SetFdConfig.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u", - self.canFdConf.sjwDbr, - self.canFdConf.tseg1Dbr, - self.canFdConf.tseg2Dbr, - ) - else: - if bitrate: - self.xldriver.xlCanSetChannelBitrate( - self.port_handle, permission_mask, bitrate + for channel in self.channels: + if permission_mask.value & self.channel_masks[channel]: + if fd: + self._set_bitrate_canfd( + channel=channel, + bitrate=bitrate, + data_bitrate=data_bitrate, + sjw_abr=sjw_abr, + tseg1_abr=tseg1_abr, + tseg2_abr=tseg2_abr, + sjw_dbr=sjw_dbr, + tseg1_dbr=tseg1_dbr, + tseg2_dbr=tseg2_dbr, ) - LOG.info("SetChannelBitrate: baudr.=%u", bitrate) - else: - LOG.info("No init access!") + elif bitrate: + self._set_bitrate_can(channel=channel, bitrate=bitrate) # Enable/disable TX receipts tx_receipts = 1 if receive_own_messages else 0 @@ -348,6 +285,153 @@ def __init__( self._is_filtered = False super().__init__(channel=channel, can_filters=can_filters, **kwargs) + def _find_global_channel_idx( + self, + channel: int, + serial: Optional[int], + app_name: Optional[str], + channel_configs: List["VectorChannelConfig"], + ) -> int: + if serial is not None: + hw_type: Optional[xldefine.XL_HardwareType] = None + for channel_config in channel_configs: + if channel_config.serialNumber != serial: + continue + + hw_type = xldefine.XL_HardwareType(channel_config.hwType) + if channel_config.hwChannel == channel: + return channel_config.channelIndex + + if hw_type is None: + err_msg = f"No interface with serial {serial} found." + else: + err_msg = f"Channel {channel} not found on interface {hw_type.name} ({serial})." + raise CanInitializationError( + err_msg, error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT + ) + + if app_name: + hw_type, hw_index, hw_channel = self.get_application_config( + app_name, channel + ) + idx = cast( + int, self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel) + ) + if idx < 0: + # Undocumented behavior! See issue #353. + # If hardware is unavailable, this function returns -1. + # Raise an exception as if the driver + # would have signalled XL_ERR_HW_NOT_PRESENT. + raise VectorInitializationError( + xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT, + xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.name, + "xlGetChannelIndex", + ) + return idx + + # check if channel is a valid global channel index + for channel_config in channel_configs: + if channel == channel_config.channelIndex: + return channel + + raise CanInitializationError( + f"Channel {channel} not found. The 'channel' parameter must be " + f"a valid global channel index if neither 'app_name' nor 'serial' were given.", + error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT, + ) + + def _set_bitrate_can( + self, + channel: int, + bitrate: int, + sjw: Optional[int] = None, + tseg1: Optional[int] = None, + tseg2: Optional[int] = None, + sam: int = 1, + ) -> None: + kwargs = [sjw, tseg1, tseg2] + if any(kwargs) and not all(kwargs): + raise ValueError( + f"Either all of sjw, tseg1, tseg2 must be set or none of them." + ) + + # set parameters if channel has init access + if any(kwargs): + chip_params = xlclass.XLchipParams() + chip_params.bitRate = bitrate + chip_params.sjw = sjw + chip_params.tseg1 = tseg1 + chip_params.tseg2 = tseg2 + chip_params.sam = sam + self.xldriver.xlCanSetChannelParams( + self.port_handle, + self.channel_masks[channel], + chip_params, + ) + LOG.info( + "xlCanSetChannelParams: baudr.=%u, sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u", + chip_params.bitRate, + chip_params.sjw, + chip_params.tseg1, + chip_params.tseg2, + ) + else: + self.xldriver.xlCanSetChannelBitrate( + self.port_handle, + self.channel_masks[channel], + bitrate, + ) + LOG.info("xlCanSetChannelBitrate: baudr.=%u", bitrate) + + def _set_bitrate_canfd( + self, + channel: int, + bitrate: Optional[int] = None, + data_bitrate: Optional[int] = None, + sjw_abr: int = 2, + tseg1_abr: int = 6, + tseg2_abr: int = 3, + sjw_dbr: int = 2, + tseg1_dbr: int = 6, + tseg2_dbr: int = 3, + ) -> None: + # set parameters if channel has init access + canfd_conf = xlclass.XLcanFdConf() + if bitrate: + canfd_conf.arbitrationBitRate = int(bitrate) + else: + canfd_conf.arbitrationBitRate = 500_000 + canfd_conf.sjwAbr = int(sjw_abr) + canfd_conf.tseg1Abr = int(tseg1_abr) + canfd_conf.tseg2Abr = int(tseg2_abr) + if data_bitrate: + canfd_conf.dataBitRate = int(data_bitrate) + else: + canfd_conf.dataBitRate = int(canfd_conf.arbitrationBitRate) + canfd_conf.sjwDbr = int(sjw_dbr) + canfd_conf.tseg1Dbr = int(tseg1_dbr) + canfd_conf.tseg2Dbr = int(tseg2_dbr) + self.xldriver.xlCanFdSetConfiguration( + self.port_handle, self.channel_masks[channel], canfd_conf + ) + LOG.info( + "xlCanFdSetConfiguration.: ABaudr.=%u, DBaudr.=%u", + canfd_conf.arbitrationBitRate, + canfd_conf.dataBitRate, + ) + LOG.info( + "xlCanFdSetConfiguration.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u", + canfd_conf.sjwAbr, + canfd_conf.tseg1Abr, + canfd_conf.tseg2Abr, + ) + LOG.info( + "xlCanFdSetConfiguration.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u", + canfd_conf.sjwDbr, + canfd_conf.tseg1Dbr, + canfd_conf.tseg2Dbr, + ) + def _apply_filters(self, filters: Optional[CanFilters]) -> None: if filters: # Only up to one filter per ID type allowed @@ -544,7 +628,7 @@ def _send_sequence(self, msgs: Sequence[Message]) -> int: def _get_tx_channel_mask(self, msgs: Sequence[Message]) -> int: if len(msgs) == 1: - return self.channel_masks.get(msgs[0].channel, self.mask) + return self.channel_masks.get(msgs[0].channel, self.mask) # type: ignore[arg-type] else: return self.mask