Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions can/interfaces/robotell.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging

from can import BusABC, Message
from ..exceptions import CanInterfaceNotImplementedError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,15 +57,17 @@ def __init__(
):
"""
:param str channel:
port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...)
Must not be empty.
port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
:param int ttyBaudrate:
baudrate of underlying serial or usb device
baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter)
:param int bitrate:
CAN Bitrate in bit/s. Value is stored in the adapter and will be used as default if no bitrate is specified
:param bool rtscts:
turn hardware handshake (RTS/CTS) on and off
"""
if serial is None:
raise CanInterfaceNotImplementedError("The serial module is not installed")

if not channel: # if None or empty
raise TypeError("Must specify a serial port.")
Expand All @@ -74,7 +77,7 @@ def __init__(
channel, baudrate=ttyBaudrate, rtscts=rtscts
)

## Disable flushing queued config ACKs on lookup channel (for unit tests)
# Disable flushing queued config ACKs on lookup channel (for unit tests)
self._loopback_test = channel == "loop://"

self._rxbuffer = bytearray() # raw bytes from the serial port
Expand All @@ -86,11 +89,10 @@ def __init__(
if bitrate is not None:
self.set_bitrate(bitrate)

self.channel_info = "Robotell USB-CAN s/n %s on %s" % (
self.get_serial_number(1),
channel,
self.channel_info = (
f"Robotell USB-CAN s/n {self.get_serial_number(1)} on {channel}"
)
logger.info("Using device: {}".format(self.channel_info))
logger.info("Using device: %s", self.channel_info)

super().__init__(channel=channel, **kwargs)

Expand All @@ -103,9 +105,7 @@ def set_bitrate(self, bitrate):
if bitrate <= self._MAX_CAN_BAUD:
self._writeconfig(self._CAN_BAUD_ID, bitrate)
else:
raise ValueError(
"Invalid bitrate, must be less than " + str(self._MAX_CAN_BAUD)
)
raise ValueError(f"Invalid bitrate, must be less than {self._MAX_CAN_BAUD}")

def set_auto_retransmit(self, retrans_flag):
"""
Expand All @@ -119,8 +119,8 @@ def set_auto_bus_management(self, auto_man):
:param bool auto_man:
Enable/disable automatic bus management
"""
## Not sure what "automatic bus managemenet" does. Does not seem to control
## automatic ACK of CAN frames (listen only mode)
# Not sure what "automatic bus management" does. Does not seem to control
# automatic ACK of CAN frames (listen only mode)
self._writeconfig(self._CAN_ABOM_ID, 1 if auto_man else 0)

def set_serial_rate(self, serial_bps):
Expand Down Expand Up @@ -161,7 +161,7 @@ def _getconfigsize(self, configid):
return 4
if configid == self._CAN_READ_SERIAL1 or configid <= self._CAN_READ_SERIAL2:
return 8
if configid >= self._CAN_FILTER_BASE_ID and configid <= self._CAN_FILTER_MAX_ID:
if self._CAN_FILTER_BASE_ID <= configid <= self._CAN_FILTER_MAX_ID:
return 8
return 0

Expand All @@ -178,9 +178,7 @@ def _readconfig(self, configid, timeout):
newmsg = self._readmessage(not self._loopback_test, True, timeout)
if newmsg is None:
logger.warning(
"Timeout waiting for response when reading config value {:04X}.".format(
configid
)
f"Timeout waiting for response when reading config value {configid:04X}."
)
return None
return newmsg[4:12]
Expand Down Expand Up @@ -211,8 +209,7 @@ def _writeconfig(self, configid, value, value2=0):
newmsg = self._readmessage(not self._loopback_test, True, 1)
if newmsg is None:
logger.warning(
"Timeout waiting for response when writing config value "
+ str(configid)
"Timeout waiting for response when writing config value %d", configid
)

def _readmessage(self, flushold, cfgchannel, timeout):
Expand Down Expand Up @@ -261,17 +258,16 @@ def _readmessage(self, flushold, cfgchannel, timeout):
cs = (cs + newmsg[idx]) & 0xFF
if newmsg[16] == cs:
# OK, valid message - place it in the correct queue
if newmsg[13] == 0xFF: ## Check for config channel
if newmsg[13] == 0xFF: # Check for config channel
self._configmsg.append(newmsg)
else:
self._rxmsg.append(newmsg)
else:
logger.warning("Incorrect message checksum, discarded message")
else:
logger.warning(
"Invalid message structure length "
+ str(len(newmsg))
+ ", ignoring message"
"Invalid message structure length %d, ignoring message",
len(newmsg),
)

# Check if we have a message in the desired queue - if so copy and return
Expand Down