diff --git a/can/interfaces/robotell.py b/can/interfaces/robotell.py index cb35aa774..ba3c40dcd 100644 --- a/can/interfaces/robotell.py +++ b/can/interfaces/robotell.py @@ -6,6 +6,7 @@ import logging from can import BusABC, Message +from ..exceptions import CanInterfaceNotImplementedError logger = logging.getLogger(__name__) @@ -56,15 +57,17 @@ def __init__( ): """ :param str channel: - port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...) - Must not be empty. + port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...) + Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate. :param int ttyBaudrate: - baudrate of underlying serial or usb device + baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter) :param int bitrate: CAN Bitrate in bit/s. Value is stored in the adapter and will be used as default if no bitrate is specified :param bool rtscts: turn hardware handshake (RTS/CTS) on and off """ + if serial is None: + raise CanInterfaceNotImplementedError("The serial module is not installed") if not channel: # if None or empty raise TypeError("Must specify a serial port.") @@ -74,7 +77,7 @@ def __init__( channel, baudrate=ttyBaudrate, rtscts=rtscts ) - ## Disable flushing queued config ACKs on lookup channel (for unit tests) + # Disable flushing queued config ACKs on lookup channel (for unit tests) self._loopback_test = channel == "loop://" self._rxbuffer = bytearray() # raw bytes from the serial port @@ -86,11 +89,10 @@ def __init__( if bitrate is not None: self.set_bitrate(bitrate) - self.channel_info = "Robotell USB-CAN s/n %s on %s" % ( - self.get_serial_number(1), - channel, + self.channel_info = ( + f"Robotell USB-CAN s/n {self.get_serial_number(1)} on {channel}" ) - logger.info("Using device: {}".format(self.channel_info)) + logger.info("Using device: %s", self.channel_info) super().__init__(channel=channel, **kwargs) @@ -103,9 +105,7 @@ def set_bitrate(self, bitrate): if bitrate <= self._MAX_CAN_BAUD: self._writeconfig(self._CAN_BAUD_ID, bitrate) else: - raise ValueError( - "Invalid bitrate, must be less than " + str(self._MAX_CAN_BAUD) - ) + raise ValueError(f"Invalid bitrate, must be less than {self._MAX_CAN_BAUD}") def set_auto_retransmit(self, retrans_flag): """ @@ -119,8 +119,8 @@ def set_auto_bus_management(self, auto_man): :param bool auto_man: Enable/disable automatic bus management """ - ## Not sure what "automatic bus managemenet" does. Does not seem to control - ## automatic ACK of CAN frames (listen only mode) + # Not sure what "automatic bus management" does. Does not seem to control + # automatic ACK of CAN frames (listen only mode) self._writeconfig(self._CAN_ABOM_ID, 1 if auto_man else 0) def set_serial_rate(self, serial_bps): @@ -161,7 +161,7 @@ def _getconfigsize(self, configid): return 4 if configid == self._CAN_READ_SERIAL1 or configid <= self._CAN_READ_SERIAL2: return 8 - if configid >= self._CAN_FILTER_BASE_ID and configid <= self._CAN_FILTER_MAX_ID: + if self._CAN_FILTER_BASE_ID <= configid <= self._CAN_FILTER_MAX_ID: return 8 return 0 @@ -178,9 +178,7 @@ def _readconfig(self, configid, timeout): newmsg = self._readmessage(not self._loopback_test, True, timeout) if newmsg is None: logger.warning( - "Timeout waiting for response when reading config value {:04X}.".format( - configid - ) + f"Timeout waiting for response when reading config value {configid:04X}." ) return None return newmsg[4:12] @@ -211,8 +209,7 @@ def _writeconfig(self, configid, value, value2=0): newmsg = self._readmessage(not self._loopback_test, True, 1) if newmsg is None: logger.warning( - "Timeout waiting for response when writing config value " - + str(configid) + "Timeout waiting for response when writing config value %d", configid ) def _readmessage(self, flushold, cfgchannel, timeout): @@ -261,7 +258,7 @@ def _readmessage(self, flushold, cfgchannel, timeout): cs = (cs + newmsg[idx]) & 0xFF if newmsg[16] == cs: # OK, valid message - place it in the correct queue - if newmsg[13] == 0xFF: ## Check for config channel + if newmsg[13] == 0xFF: # Check for config channel self._configmsg.append(newmsg) else: self._rxmsg.append(newmsg) @@ -269,9 +266,8 @@ def _readmessage(self, flushold, cfgchannel, timeout): logger.warning("Incorrect message checksum, discarded message") else: logger.warning( - "Invalid message structure length " - + str(len(newmsg)) - + ", ignoring message" + "Invalid message structure length %d, ignoring message", + len(newmsg), ) # Check if we have a message in the desired queue - if so copy and return