diff --git a/can/interfaces/pcan/basic.py b/can/interfaces/pcan/basic.py index 91996680d..9fe7d1257 100644 --- a/can/interfaces/pcan/basic.py +++ b/can/interfaces/pcan/basic.py @@ -8,17 +8,12 @@ # # ------------------------------------------------------------------ # Author : Keneth Wagner -# Last change: 14.01.2021 Wagner -# -# Language: Python 2.7, 3.7 # ------------------------------------------------------------------ # # Copyright (C) 1999-2021 PEAK-System Technik GmbH, Darmstadt # more Info at http://www.peak-system.com -# # Module Imports -# from ctypes import * from ctypes.util import find_library from string import * @@ -553,6 +548,102 @@ class TPCANChannelInformation(Structure): ] # Availability status of a PCAN-Channel +# /////////////////////////////////////////////////////////// +# Additional objects +# /////////////////////////////////////////////////////////// + +PCAN_BITRATES = { + 1000000: PCAN_BAUD_1M, + 800000: PCAN_BAUD_800K, + 500000: PCAN_BAUD_500K, + 250000: PCAN_BAUD_250K, + 125000: PCAN_BAUD_125K, + 100000: PCAN_BAUD_100K, + 95000: PCAN_BAUD_95K, + 83000: PCAN_BAUD_83K, + 50000: PCAN_BAUD_50K, + 47000: PCAN_BAUD_47K, + 33000: PCAN_BAUD_33K, + 20000: PCAN_BAUD_20K, + 10000: PCAN_BAUD_10K, + 5000: PCAN_BAUD_5K, +} + +PCAN_FD_PARAMETER_LIST = ( + "nom_brp", + "nom_tseg1", + "nom_tseg2", + "nom_sjw", + "data_brp", + "data_tseg1", + "data_tseg2", + "data_sjw", +) + +PCAN_CHANNEL_NAMES = { + "PCAN_NONEBUS": PCAN_NONEBUS, + "PCAN_ISABUS1": PCAN_ISABUS1, + "PCAN_ISABUS2": PCAN_ISABUS2, + "PCAN_ISABUS3": PCAN_ISABUS3, + "PCAN_ISABUS4": PCAN_ISABUS4, + "PCAN_ISABUS5": PCAN_ISABUS5, + "PCAN_ISABUS6": PCAN_ISABUS6, + "PCAN_ISABUS7": PCAN_ISABUS7, + "PCAN_ISABUS8": PCAN_ISABUS8, + "PCAN_DNGBUS1": PCAN_DNGBUS1, + "PCAN_PCIBUS1": PCAN_PCIBUS1, + "PCAN_PCIBUS2": PCAN_PCIBUS2, + "PCAN_PCIBUS3": PCAN_PCIBUS3, + "PCAN_PCIBUS4": PCAN_PCIBUS4, + "PCAN_PCIBUS5": PCAN_PCIBUS5, + "PCAN_PCIBUS6": PCAN_PCIBUS6, + "PCAN_PCIBUS7": PCAN_PCIBUS7, + "PCAN_PCIBUS8": PCAN_PCIBUS8, + "PCAN_PCIBUS9": PCAN_PCIBUS9, + "PCAN_PCIBUS10": PCAN_PCIBUS10, + "PCAN_PCIBUS11": PCAN_PCIBUS11, + "PCAN_PCIBUS12": PCAN_PCIBUS12, + "PCAN_PCIBUS13": PCAN_PCIBUS13, + "PCAN_PCIBUS14": PCAN_PCIBUS14, + "PCAN_PCIBUS15": PCAN_PCIBUS15, + "PCAN_PCIBUS16": PCAN_PCIBUS16, + "PCAN_USBBUS1": PCAN_USBBUS1, + "PCAN_USBBUS2": PCAN_USBBUS2, + "PCAN_USBBUS3": PCAN_USBBUS3, + "PCAN_USBBUS4": PCAN_USBBUS4, + "PCAN_USBBUS5": PCAN_USBBUS5, + "PCAN_USBBUS6": PCAN_USBBUS6, + "PCAN_USBBUS7": PCAN_USBBUS7, + "PCAN_USBBUS8": PCAN_USBBUS8, + "PCAN_USBBUS9": PCAN_USBBUS9, + "PCAN_USBBUS10": PCAN_USBBUS10, + "PCAN_USBBUS11": PCAN_USBBUS11, + "PCAN_USBBUS12": PCAN_USBBUS12, + "PCAN_USBBUS13": PCAN_USBBUS13, + "PCAN_USBBUS14": PCAN_USBBUS14, + "PCAN_USBBUS15": PCAN_USBBUS15, + "PCAN_USBBUS16": PCAN_USBBUS16, + "PCAN_PCCBUS1": PCAN_PCCBUS1, + "PCAN_PCCBUS2": PCAN_PCCBUS2, + "PCAN_LANBUS1": PCAN_LANBUS1, + "PCAN_LANBUS2": PCAN_LANBUS2, + "PCAN_LANBUS3": PCAN_LANBUS3, + "PCAN_LANBUS4": PCAN_LANBUS4, + "PCAN_LANBUS5": PCAN_LANBUS5, + "PCAN_LANBUS6": PCAN_LANBUS6, + "PCAN_LANBUS7": PCAN_LANBUS7, + "PCAN_LANBUS8": PCAN_LANBUS8, + "PCAN_LANBUS9": PCAN_LANBUS9, + "PCAN_LANBUS10": PCAN_LANBUS10, + "PCAN_LANBUS11": PCAN_LANBUS11, + "PCAN_LANBUS12": PCAN_LANBUS12, + "PCAN_LANBUS13": PCAN_LANBUS13, + "PCAN_LANBUS14": PCAN_LANBUS14, + "PCAN_LANBUS15": PCAN_LANBUS15, + "PCAN_LANBUS16": PCAN_LANBUS16, +} + + # /////////////////////////////////////////////////////////// # PCAN-Basic API function declarations # /////////////////////////////////////////////////////////// @@ -601,8 +692,7 @@ def Initialize( Interrupt=c_ushort(0), ): - """ - Initializes a PCAN Channel + """Initializes a PCAN Channel Parameters: Channel : A TPCANHandle representing a PCAN Channel @@ -627,8 +717,7 @@ def Initialize( # def InitializeFD(self, Channel, BitrateFD): - """ - Initializes a FD capable PCAN Channel + """Initializes a FD capable PCAN Channel Parameters: Channel : The handle of a FD capable PCAN Channel @@ -659,8 +748,7 @@ def InitializeFD(self, Channel, BitrateFD): # def Uninitialize(self, Channel): - """ - Uninitializes one or all PCAN Channels initialized by CAN_Initialize + """Uninitializes one or all PCAN Channels initialized by CAN_Initialize Remarks: Giving the TPCANHandle value "PCAN_NONEBUS", uninitialize all initialized channels @@ -682,8 +770,7 @@ def Uninitialize(self, Channel): # def Reset(self, Channel): - """ - Resets the receive and transmit queues of the PCAN Channel + """Resets the receive and transmit queues of the PCAN Channel Remarks: A reset of the CAN controller is not performed @@ -705,8 +792,7 @@ def Reset(self, Channel): # def GetStatus(self, Channel): - """ - Gets the current status of a PCAN Channel + """Gets the current status of a PCAN Channel Parameters: Channel : A TPCANHandle representing a PCAN Channel @@ -725,8 +811,7 @@ def GetStatus(self, Channel): # def Read(self, Channel): - """ - Reads a CAN message from the receive queue of a PCAN Channel + """Reads a CAN message from the receive queue of a PCAN Channel Remarks: The return value of this method is a 3-touple, where @@ -740,7 +825,7 @@ def Read(self, Channel): Channel : A TPCANHandle representing a PCAN Channel Returns: - A touple with three values + A tuple with three values """ try: msg = TPCANMsg() @@ -755,8 +840,7 @@ def Read(self, Channel): # def ReadFD(self, Channel): - """ - Reads a CAN message from the receive queue of a FD capable PCAN Channel + """Reads a CAN message from the receive queue of a FD capable PCAN Channel Remarks: The return value of this method is a 3-touple, where @@ -770,7 +854,7 @@ def ReadFD(self, Channel): Channel : The handle of a FD capable PCAN Channel Returns: - A touple with three values + A tuple with three values """ try: msg = TPCANMsgFD() @@ -785,8 +869,7 @@ def ReadFD(self, Channel): # def Write(self, Channel, MessageBuffer): - """ - Transmits a CAN message + """Transmits a CAN message Parameters: Channel : A TPCANHandle representing a PCAN Channel @@ -806,8 +889,7 @@ def Write(self, Channel, MessageBuffer): # def WriteFD(self, Channel, MessageBuffer): - """ - Transmits a CAN message over a FD capable PCAN Channel + """Transmits a CAN message over a FD capable PCAN Channel Parameters: Channel : The handle of a FD capable PCAN Channel @@ -827,8 +909,7 @@ def WriteFD(self, Channel, MessageBuffer): # def FilterMessages(self, Channel, FromID, ToID, Mode): - """ - Configures the reception filter + """Configures the reception filter Remarks: The message filter will be expanded with every call to this function. @@ -855,8 +936,7 @@ def FilterMessages(self, Channel, FromID, ToID, Mode): # def GetValue(self, Channel, Parameter): - """ - Retrieves a PCAN Channel value + """Retrieves a PCAN Channel value Remarks: Parameters can be present or not according with the kind @@ -872,7 +952,7 @@ def GetValue(self, Channel, Parameter): Parameter : The TPCANParameter parameter to get Returns: - A touple with 2 values + A tuple with 2 values """ try: if ( @@ -912,9 +992,8 @@ def GetValue(self, Channel, Parameter): # def SetValue(self, Channel, Parameter, Buffer): - """ - Returns a descriptive text of a given TPCANStatus error - code, in any desired language + """Returns a descriptive text of a given TPCANStatus error + code, in any desired language Remarks: Parameters can be present or not according with the kind @@ -951,8 +1030,7 @@ def SetValue(self, Channel, Parameter, Buffer): def GetErrorText(self, Error, Language=0): - """ - Configures or sets a PCAN Channel value + """Configures or sets a PCAN Channel value Remarks: @@ -969,7 +1047,7 @@ def GetErrorText(self, Error, Language=0): Language : Indicates a 'Primary language ID' (Default is Neutral(0)) Returns: - A touple with 2 values + A tuple with 2 values """ try: mybuffer = create_string_buffer(256) @@ -981,8 +1059,7 @@ def GetErrorText(self, Error, Language=0): def LookUpChannel(self, Parameters): - """ - Finds a PCAN-Basic channel that matches with the given parameters + """Finds a PCAN-Basic channel that matches with the given parameters Remarks: @@ -995,7 +1072,7 @@ def LookUpChannel(self, Parameters): to be matched within a PCAN-Basic channel Returns: - A touple with 2 values + A tuple with 2 values """ try: mybuffer = TPCANHandle(0) diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index 670918a17..959874a64 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -5,12 +5,54 @@ import logging import time from datetime import datetime +import platform from typing import Optional -from can import CanError, Message, BusABC -from can.bus import BusState -from can.util import len2dlc, dlc2len -from .basic import * + +from ...message import Message +from ...bus import BusABC, BusState +from ...util import len2dlc, dlc2len +from ...exceptions import CanError, CanOperationError, CanInitializationError + + +from .basic import ( + PCAN_BITRATES, + PCAN_FD_PARAMETER_LIST, + PCAN_CHANNEL_NAMES, + PCAN_BAUD_500K, + PCAN_TYPE_ISA, + PCANBasic, + PCAN_ERROR_OK, + PCAN_ALLOW_ERROR_FRAMES, + PCAN_PARAMETER_ON, + PCAN_RECEIVE_EVENT, + PCAN_DEVICE_NUMBER, + PCAN_ERROR_QRCVEMPTY, + PCAN_ERROR_BUSLIGHT, + PCAN_ERROR_BUSHEAVY, + PCAN_MESSAGE_EXTENDED, + PCAN_MESSAGE_RTR, + PCAN_MESSAGE_FD, + PCAN_MESSAGE_BRS, + PCAN_MESSAGE_ESI, + PCAN_MESSAGE_ERRFRAME, + PCAN_MESSAGE_STANDARD, + TPCANMsgFD, + TPCANMsg, + PCAN_CHANNEL_IDENTIFYING, + PCAN_LISTEN_ONLY, + PCAN_PARAMETER_OFF, + TPCANHandle, + PCAN_PCIBUS1, + PCAN_USBBUS1, + PCAN_PCCBUS1, + PCAN_LANBUS1, + PCAN_CHANNEL_CONDITION, + PCAN_CHANNEL_AVAILABLE, + PCAN_CHANNEL_FEATURES, + FEATURE_FD_CAPABLE, + PCAN_DICT_STATUS, +) # Set up logging @@ -51,99 +93,6 @@ HAS_EVENTS = False -pcan_bitrate_objs = { - 1000000: PCAN_BAUD_1M, - 800000: PCAN_BAUD_800K, - 500000: PCAN_BAUD_500K, - 250000: PCAN_BAUD_250K, - 125000: PCAN_BAUD_125K, - 100000: PCAN_BAUD_100K, - 95000: PCAN_BAUD_95K, - 83000: PCAN_BAUD_83K, - 50000: PCAN_BAUD_50K, - 47000: PCAN_BAUD_47K, - 33000: PCAN_BAUD_33K, - 20000: PCAN_BAUD_20K, - 10000: PCAN_BAUD_10K, - 5000: PCAN_BAUD_5K, -} - - -pcan_fd_parameter_list = [ - "nom_brp", - "nom_tseg1", - "nom_tseg2", - "nom_sjw", - "data_brp", - "data_tseg1", - "data_tseg2", - "data_sjw", -] - -pcan_channel_names = { - "PCAN_NONEBUS": PCAN_NONEBUS, - "PCAN_ISABUS1": PCAN_ISABUS1, - "PCAN_ISABUS2": PCAN_ISABUS2, - "PCAN_ISABUS3": PCAN_ISABUS3, - "PCAN_ISABUS4": PCAN_ISABUS4, - "PCAN_ISABUS5": PCAN_ISABUS5, - "PCAN_ISABUS6": PCAN_ISABUS6, - "PCAN_ISABUS7": PCAN_ISABUS7, - "PCAN_ISABUS8": PCAN_ISABUS8, - "PCAN_DNGBUS1": PCAN_DNGBUS1, - "PCAN_PCIBUS1": PCAN_PCIBUS1, - "PCAN_PCIBUS2": PCAN_PCIBUS2, - "PCAN_PCIBUS3": PCAN_PCIBUS3, - "PCAN_PCIBUS4": PCAN_PCIBUS4, - "PCAN_PCIBUS5": PCAN_PCIBUS5, - "PCAN_PCIBUS6": PCAN_PCIBUS6, - "PCAN_PCIBUS7": PCAN_PCIBUS7, - "PCAN_PCIBUS8": PCAN_PCIBUS8, - "PCAN_PCIBUS9": PCAN_PCIBUS9, - "PCAN_PCIBUS10": PCAN_PCIBUS10, - "PCAN_PCIBUS11": PCAN_PCIBUS11, - "PCAN_PCIBUS12": PCAN_PCIBUS12, - "PCAN_PCIBUS13": PCAN_PCIBUS13, - "PCAN_PCIBUS14": PCAN_PCIBUS14, - "PCAN_PCIBUS15": PCAN_PCIBUS15, - "PCAN_PCIBUS16": PCAN_PCIBUS16, - "PCAN_USBBUS1": PCAN_USBBUS1, - "PCAN_USBBUS2": PCAN_USBBUS2, - "PCAN_USBBUS3": PCAN_USBBUS3, - "PCAN_USBBUS4": PCAN_USBBUS4, - "PCAN_USBBUS5": PCAN_USBBUS5, - "PCAN_USBBUS6": PCAN_USBBUS6, - "PCAN_USBBUS7": PCAN_USBBUS7, - "PCAN_USBBUS8": PCAN_USBBUS8, - "PCAN_USBBUS9": PCAN_USBBUS9, - "PCAN_USBBUS10": PCAN_USBBUS10, - "PCAN_USBBUS11": PCAN_USBBUS11, - "PCAN_USBBUS12": PCAN_USBBUS12, - "PCAN_USBBUS13": PCAN_USBBUS13, - "PCAN_USBBUS14": PCAN_USBBUS14, - "PCAN_USBBUS15": PCAN_USBBUS15, - "PCAN_USBBUS16": PCAN_USBBUS16, - "PCAN_PCCBUS1": PCAN_PCCBUS1, - "PCAN_PCCBUS2": PCAN_PCCBUS2, - "PCAN_LANBUS1": PCAN_LANBUS1, - "PCAN_LANBUS2": PCAN_LANBUS2, - "PCAN_LANBUS3": PCAN_LANBUS3, - "PCAN_LANBUS4": PCAN_LANBUS4, - "PCAN_LANBUS5": PCAN_LANBUS5, - "PCAN_LANBUS6": PCAN_LANBUS6, - "PCAN_LANBUS7": PCAN_LANBUS7, - "PCAN_LANBUS8": PCAN_LANBUS8, - "PCAN_LANBUS9": PCAN_LANBUS9, - "PCAN_LANBUS10": PCAN_LANBUS10, - "PCAN_LANBUS11": PCAN_LANBUS11, - "PCAN_LANBUS12": PCAN_LANBUS12, - "PCAN_LANBUS13": PCAN_LANBUS13, - "PCAN_LANBUS14": PCAN_LANBUS14, - "PCAN_LANBUS15": PCAN_LANBUS15, - "PCAN_LANBUS16": PCAN_LANBUS16, -} - - class PcanBus(BusABC): def __init__( self, @@ -245,14 +194,14 @@ def __init__( """ self.channel_info = str(channel) self.fd = kwargs.get("fd", False) - pcan_bitrate = pcan_bitrate_objs.get(bitrate, PCAN_BAUD_500K) + pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K) hwtype = PCAN_TYPE_ISA ioport = 0x02A0 interrupt = 11 - if type(channel) != int: - channel = pcan_channel_names[channel] + if not isinstance(channel, int): + channel = PCAN_CHANNEL_NAMES[channel] self.m_objPCANBasic = PCANBasic() self.m_PcanHandle = channel @@ -260,7 +209,7 @@ def __init__( if state is BusState.ACTIVE or state is BusState.PASSIVE: self.state = state else: - raise ArgumentError("BusState must be Active or Passive") + raise ValueError("BusState must be Active or Passive") if self.fd: f_clock_val = kwargs.get("f_clock", None) @@ -271,7 +220,7 @@ def __init__( fd_parameters_values = [f_clock] + [ "{}={}".format(key, kwargs.get(key, None)) - for key in pcan_fd_parameter_list + for key in PCAN_FD_PARAMETER_LIST if kwargs.get(key, None) is not None ] @@ -286,7 +235,7 @@ def __init__( ) if result != PCAN_ERROR_OK: - raise PcanError(self._get_formatted_error(result)) + raise PcanCanInitializationError(self._get_formatted_error(result)) result = self.m_objPCANBasic.SetValue( self.m_PcanHandle, PCAN_ALLOW_ERROR_FRAMES, PCAN_PARAMETER_ON @@ -294,7 +243,7 @@ def __init__( if result != PCAN_ERROR_OK: if platform.system() != "Darwin": - raise PcanError(self._get_formatted_error(result)) + raise PcanCanInitializationError(self._get_formatted_error(result)) else: # TODO Remove Filter when MACCan actually supports it: # https://github.com/mac-can/PCBUSB-Library/ @@ -308,7 +257,7 @@ def __init__( self.m_PcanHandle, PCAN_RECEIVE_EVENT, self._recv_event ) if result != PCAN_ERROR_OK: - raise PcanError(self._get_formatted_error(result)) + raise PcanCanInitializationError(self._get_formatted_error(result)) super().__init__(channel=channel, state=state, bitrate=bitrate, *args, **kwargs) @@ -407,7 +356,7 @@ def set_device_number(self, device_number): ) != PCAN_ERROR_OK ): - raise ValueError + raise ValueError() except ValueError: log.error("Invalid value '%s' for device number.", device_number) return False @@ -445,7 +394,7 @@ def _recv_internal(self, timeout): log.warning(self._get_formatted_error(result[0])) return None, False elif result[0] != PCAN_ERROR_OK: - raise PcanError(self._get_formatted_error(result[0])) + raise PcanCanOperationError(self._get_formatted_error(result[0])) theMsg = result[1] itsTimeStamp = result[2] @@ -554,7 +503,9 @@ def send(self, msg, timeout=None): result = self.m_objPCANBasic.Write(self.m_PcanHandle, CANMsg) if result != PCAN_ERROR_OK: - raise PcanError("Failed to send: " + self._get_formatted_error(result)) + raise PcanCanOperationError( + "Failed to send: " + self._get_formatted_error(result) + ) def flash(self, flash): """ @@ -646,15 +597,22 @@ def _detect_available_configs(): def status_string(self) -> Optional[str]: """ Query the PCAN bus status. - :return: The status in string. + + :return: The status description, if any was found. """ - if self.status() in PCAN_DICT_STATUS: + try: return PCAN_DICT_STATUS[self.status()] - else: + except KeyError: return None class PcanError(CanError): - """ - A generic error on a PCAN bus. - """ + """A generic error on a PCAN bus.""" + + +class PcanCanOperationError(CanOperationError, PcanError): + """Like :class:`can.exceptions.CanOperationError`, but specific to Pcan.""" + + +class PcanCanInitializationError(CanInitializationError, PcanError): + """Like :class:`can.exceptions.CanInitializationError`, but specific to Pcan.""" diff --git a/test/test_pcan.py b/test/test_pcan.py index 595d72b48..b9cecff26 100644 --- a/test/test_pcan.py +++ b/test/test_pcan.py @@ -42,9 +42,8 @@ def test_bus_creation(self) -> None: self.mock_pcan.InitializeFD.assert_not_called() def test_bus_creation_state_error(self) -> None: - with self.assertRaises(ctypes.ArgumentError): - self.bus = can.Bus(bustype="pcan", state=BusState.ERROR) - self.assertIsInstance(self.bus, PcanBus) + with self.assertRaises(ValueError): + can.Bus(bustype="pcan", state=BusState.ERROR) def test_bus_creation_fd(self) -> None: self.bus = can.Bus(bustype="pcan", fd=True) @@ -66,18 +65,18 @@ def test_bus_creation_fd(self) -> None: ] ) def test_get_formatted_error(self, name, status1, status2, expected_result: str): + with self.subTest(name): + self.bus = can.Bus(bustype="pcan") + self.mock_pcan.GetErrorText = Mock( + side_effect=[ + (status1, expected_result.encode("utf-8", errors="replace")), + (status2, expected_result.encode("utf-8", errors="replace")), + ] + ) - self.bus = can.Bus(bustype="pcan") - self.mock_pcan.GetErrorText = Mock( - side_effect=[ - (status1, expected_result.encode("utf-8", errors="replace")), - (status2, expected_result.encode("utf-8", errors="replace")), - ] - ) - - complete_text = self.bus._get_formatted_error(PCAN_ERROR_BUSHEAVY) + complete_text = self.bus._get_formatted_error(PCAN_ERROR_BUSHEAVY) - self.assertEqual(complete_text, expected_result) + self.assertEqual(complete_text, expected_result) def test_status(self) -> None: self.bus = can.Bus(bustype="pcan") @@ -88,43 +87,47 @@ def test_status(self) -> None: [("no_error", PCAN_ERROR_OK, True), ("error", PCAN_ERROR_UNKNOWN, False)] ) def test_status_is_ok(self, name, status, expected_result) -> None: - self.mock_pcan.GetStatus = Mock(return_value=status) - self.bus = can.Bus(bustype="pcan") - self.assertEqual(self.bus.status_is_ok(), expected_result) - self.mock_pcan.GetStatus.assert_called_once_with(PCAN_USBBUS1) + with self.subTest(name): + self.mock_pcan.GetStatus = Mock(return_value=status) + self.bus = can.Bus(bustype="pcan") + self.assertEqual(self.bus.status_is_ok(), expected_result) + self.mock_pcan.GetStatus.assert_called_once_with(PCAN_USBBUS1) @parameterized.expand( [("no_error", PCAN_ERROR_OK, True), ("error", PCAN_ERROR_UNKNOWN, False)] ) def test_reset(self, name, status, expected_result) -> None: - self.mock_pcan.Reset = Mock(return_value=status) - self.bus = can.Bus(bustype="pcan", fd=True) - self.assertEqual(self.bus.reset(), expected_result) - self.mock_pcan.Reset.assert_called_once_with(PCAN_USBBUS1) + with self.subTest(name): + self.mock_pcan.Reset = Mock(return_value=status) + self.bus = can.Bus(bustype="pcan", fd=True) + self.assertEqual(self.bus.reset(), expected_result) + self.mock_pcan.Reset.assert_called_once_with(PCAN_USBBUS1) @parameterized.expand( [("no_error", PCAN_ERROR_OK, 1), ("error", PCAN_ERROR_UNKNOWN, None)] ) def test_get_device_number(self, name, status, expected_result) -> None: - self.mock_pcan.GetValue = Mock(return_value=(status, 1)) - self.bus = can.Bus(bustype="pcan", fd=True) - self.assertEqual(self.bus.get_device_number(), expected_result) - self.mock_pcan.GetValue.assert_called_once_with( - PCAN_USBBUS1, PCAN_DEVICE_NUMBER - ) + with self.subTest(name): + self.mock_pcan.GetValue = Mock(return_value=(status, 1)) + self.bus = can.Bus(bustype="pcan", fd=True) + self.assertEqual(self.bus.get_device_number(), expected_result) + self.mock_pcan.GetValue.assert_called_once_with( + PCAN_USBBUS1, PCAN_DEVICE_NUMBER + ) @parameterized.expand( [("no_error", PCAN_ERROR_OK, True), ("error", PCAN_ERROR_UNKNOWN, False)] ) def test_set_device_number(self, name, status, expected_result) -> None: - self.bus = can.Bus(bustype="pcan") - self.mock_pcan.SetValue = Mock(return_value=status) - self.assertEqual(self.bus.set_device_number(3), expected_result) - # check last SetValue call - self.assertEqual( - self.mock_pcan.SetValue.call_args_list[-1][0], - (PCAN_USBBUS1, PCAN_DEVICE_NUMBER, 3), - ) + with self.subTest(name): + self.bus = can.Bus(bustype="pcan") + self.mock_pcan.SetValue = Mock(return_value=status) + self.assertEqual(self.bus.set_device_number(3), expected_result) + # check last SetValue call + self.assertEqual( + self.mock_pcan.SetValue.call_args_list[-1][0], + (PCAN_USBBUS1, PCAN_DEVICE_NUMBER, 3), + ) def test_recv(self): data = (ctypes.c_ubyte * 8)(*[x for x in range(8)]) @@ -219,32 +222,33 @@ def test_send_fd(self) -> None: ] ) def test_send_type(self, name, msg_type, expected_value) -> None: - ( - is_extended_id, - is_remote_frame, - is_error_frame, - is_fd, - bitrate_switch, - error_state_indicator, - ) = msg_type - - self.mock_pcan.Write = Mock(return_value=PCAN_ERROR_OK) - - self.bus = can.Bus(bustype="pcan") - msg = can.Message( - arbitration_id=0xC0FFEF, - data=[1, 2, 3, 4, 5, 6, 7, 8], - is_extended_id=is_extended_id, - is_remote_frame=is_remote_frame, - is_error_frame=is_error_frame, - bitrate_switch=bitrate_switch, - error_state_indicator=error_state_indicator, - is_fd=is_fd, - ) - self.bus.send(msg) - # self.mock_m_objPCANBasic.Write.assert_called_once() - CANMsg = self.mock_pcan.Write.call_args_list[0][0][1] - self.assertEqual(CANMsg.MSGTYPE, expected_value.value) + with self.subTest(name): + ( + is_extended_id, + is_remote_frame, + is_error_frame, + is_fd, + bitrate_switch, + error_state_indicator, + ) = msg_type + + self.mock_pcan.Write = Mock(return_value=PCAN_ERROR_OK) + + self.bus = can.Bus(bustype="pcan") + msg = can.Message( + arbitration_id=0xC0FFEF, + data=[1, 2, 3, 4, 5, 6, 7, 8], + is_extended_id=is_extended_id, + is_remote_frame=is_remote_frame, + is_error_frame=is_error_frame, + bitrate_switch=bitrate_switch, + error_state_indicator=error_state_indicator, + is_fd=is_fd, + ) + self.bus.send(msg) + # self.mock_m_objPCANBasic.Write.assert_called_once() + CANMsg = self.mock_pcan.Write.call_args_list[0][0][1] + self.assertEqual(CANMsg.MSGTYPE, expected_value.value) def test_send_error(self) -> None: self.mock_pcan.Write = Mock(return_value=PCAN_ERROR_BUSHEAVY) @@ -258,13 +262,14 @@ def test_send_error(self) -> None: @parameterized.expand([("on", True), ("off", False)]) def test_flash(self, name, flash) -> None: - self.bus = can.Bus(bustype="pcan") - self.bus.flash(flash) - call_list = self.mock_pcan.SetValue.call_args_list - last_call_args_list = call_list[-1][0] - self.assertEqual( - last_call_args_list, (PCAN_USBBUS1, PCAN_CHANNEL_IDENTIFYING, flash) - ) + with self.subTest(name): + self.bus = can.Bus(bustype="pcan") + self.bus.flash(flash) + call_list = self.mock_pcan.SetValue.call_args_list + last_call_args_list = call_list[-1][0] + self.assertEqual( + last_call_args_list, (PCAN_USBBUS1, PCAN_CHANNEL_IDENTIFYING, flash) + ) def test_shutdown(self) -> None: self.bus = can.Bus(bustype="pcan") @@ -278,14 +283,16 @@ def test_shutdown(self) -> None: ] ) def test_state(self, name, bus_state: BusState, expected_parameter) -> None: - self.bus = can.Bus(bustype="pcan") + with self.subTest(name): + self.bus = can.Bus(bustype="pcan") - self.bus.state = bus_state - call_list = self.mock_pcan.SetValue.call_args_list - last_call_args_list = call_list[-1][0] - self.assertEqual( - last_call_args_list, (PCAN_USBBUS1, PCAN_LISTEN_ONLY, expected_parameter) - ) + self.bus.state = bus_state + call_list = self.mock_pcan.SetValue.call_args_list + last_call_args_list = call_list[-1][0] + self.assertEqual( + last_call_args_list, + (PCAN_USBBUS1, PCAN_LISTEN_ONLY, expected_parameter), + ) def test_detect_available_configs(self) -> None: self.mock_pcan.GetValue = Mock( @@ -296,10 +303,11 @@ def test_detect_available_configs(self) -> None: @parameterized.expand([("valid", PCAN_ERROR_OK, "OK"), ("invalid", 0x00005, None)]) def test_status_string(self, name, status, expected_result) -> None: - self.bus = can.Bus(bustype="pcan") - self.mock_pcan.GetStatus = Mock(return_value=status) - self.assertEqual(self.bus.status_string(), expected_result) - self.mock_pcan.GetStatus.assert_called() + with self.subTest(name): + self.bus = can.Bus(bustype="pcan") + self.mock_pcan.GetStatus = Mock(return_value=status) + self.assertEqual(self.bus.status_string(), expected_result) + self.mock_pcan.GetStatus.assert_called() if __name__ == "__main__":