diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 55aeb5da4..559f9974d 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -878,7 +878,40 @@ def _build_xl_can_tx_event(msg: Message) -> xlclass.XLcanTxEvent: return xl_can_tx_event def flush_tx_buffer(self) -> None: - self.xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask) + """ + Flush the TX buffer of the bus. + + Implementation does not use function ``xlCanFlushTransmitQueue`` of the XL driver, as it works only + for XL family devices. + + .. warning:: + Using this function will flush the queue and send a high voltage message (ID = 0, DLC = 0, no data). + """ + if self._can_protocol is CanProtocol.CAN_FD: + xl_can_tx_event = xlclass.XLcanTxEvent() + xl_can_tx_event.tag = xldefine.XL_CANFD_TX_EventTags.XL_CAN_EV_TAG_TX_MSG + xl_can_tx_event.tagData.canMsg.msgFlags |= ( + xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_HIGHPRIO + ) + + self.xldriver.xlCanTransmitEx( + self.port_handle, + self.mask, + ctypes.c_uint(1), + ctypes.c_uint(0), + xl_can_tx_event, + ) + else: + xl_event = xlclass.XLevent() + xl_event.tag = xldefine.XL_EventTags.XL_TRANSMIT_MSG + xl_event.tagData.msg.flags |= ( + xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_OVERRUN + | xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_WAKEUP + ) + + self.xldriver.xlCanTransmit( + self.port_handle, self.mask, ctypes.c_uint(1), xl_event + ) def shutdown(self) -> None: super().shutdown() diff --git a/test/test_vector.py b/test/test_vector.py index 93aba9c7b..2b37a479e 100644 --- a/test/test_vector.py +++ b/test/test_vector.py @@ -613,7 +613,38 @@ def test_receive_fd_non_msg_event() -> None: def test_flush_tx_buffer_mocked(mock_xldriver) -> None: bus = can.Bus(channel=0, interface="vector", _testing=True) bus.flush_tx_buffer() - can.interfaces.vector.canlib.xldriver.xlCanFlushTransmitQueue.assert_called() + transmit_args = can.interfaces.vector.canlib.xldriver.xlCanTransmit.call_args[0] + + num_msg = transmit_args[2] + assert num_msg.value == ctypes.c_uint(1).value + + event = transmit_args[3] + assert isinstance(event, xlclass.XLevent) + assert event.tag & xldefine.XL_EventTags.XL_TRANSMIT_MSG + assert event.tagData.msg.flags & ( + xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_OVERRUN + | xldefine.XL_MessageFlags.XL_CAN_MSG_FLAG_WAKEUP + ) + + +def test_flush_tx_buffer_fd_mocked(mock_xldriver) -> None: + bus = can.Bus(channel=0, interface="vector", fd=True, _testing=True) + bus.flush_tx_buffer() + transmit_args = can.interfaces.vector.canlib.xldriver.xlCanTransmitEx.call_args[0] + + num_msg = transmit_args[2] + assert num_msg.value == ctypes.c_uint(1).value + + num_msg_sent = transmit_args[3] + assert num_msg_sent.value == ctypes.c_uint(0).value + + event = transmit_args[4] + assert isinstance(event, xlclass.XLcanTxEvent) + assert event.tag & xldefine.XL_CANFD_TX_EventTags.XL_CAN_EV_TAG_TX_MSG + assert ( + event.tagData.canMsg.msgFlags + & xldefine.XL_CANFD_TX_MessageFlags.XL_CAN_TXMSG_FLAG_HIGHPRIO + ) @pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable")