diff --git a/can/interfaces/slcan.py b/can/interfaces/slcan.py index 2ea12745e..be5d672d3 100644 --- a/can/interfaces/slcan.py +++ b/can/interfaces/slcan.py @@ -47,6 +47,9 @@ class slcanBus(BusABC): _SLEEP_AFTER_SERIAL_OPEN = 2 # in seconds + _OK = b"\r" + _ERROR = b"\a" + LINE_TERMINATOR = b"\r" def __init__( @@ -81,10 +84,8 @@ def __init__( if not channel: # if None or empty raise TypeError("Must specify a serial port.") - if "@" in channel: (channel, ttyBaudrate) = channel.split("@") - self.serialPortOrig = serial.serial_for_url( channel, baudrate=ttyBaudrate, rtscts=rtscts ) @@ -95,90 +96,123 @@ def __init__( if bitrate is not None and btr is not None: raise ValueError("Bitrate and btr mutually exclusive.") - if bitrate is not None: - self.close() - if bitrate in self._BITRATES: - self.write(self._BITRATES[bitrate]) - else: - raise ValueError( - "Invalid bitrate, choose one of " - + (", ".join(self._BITRATES)) - + "." - ) - + self.set_bitrate(self, bitrate) if btr is not None: - self.close() - self.write("s" + btr) - + self.set_bitrate_reg(self, btr) self.open() super().__init__( channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs ) - def write(self, string): + def set_bitrate(self, bitrate): + """ + :raise ValueError: if both *bitrate* is not among the possible values + + :param int bitrate: + Bitrate in bit/s + """ + self.close() + if bitrate in self._BITRATES: + self._write(self._BITRATES[bitrate]) + else: + raise ValueError( + "Invalid bitrate, choose one of " + (", ".join(self._BITRATES)) + "." + ) + self.open() + + def set_bitrate_reg(self, btr): + """ + :param str btr: + BTR register value to set custom can speed + """ + self.close() + self._write("s" + btr) + self.open() + + def _write(self, string): self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR) self.serialPortOrig.flush() + def _read(self, timeout): + + # first read what is already in receive buffer + while self.serialPortOrig.in_waiting: + self._buffer += self.serialPortOrig.read() + # if we still don't have a complete message, do a blocking read + start = time.time() + time_left = timeout + while not (ord(self._OK) in self._buffer or ord(self._ERROR) in self._buffer): + self.serialPortOrig.timeout = time_left + byte = self.serialPortOrig.read() + if byte: + self._buffer += byte + # if timeout is None, try indefinitely + if timeout is None: + continue + # try next one only if there still is time, and with + # reduced timeout + else: + time_left = timeout - (time.time() - start) + if time_left > 0: + continue + else: + return None + # return first message + for i in range(len(self._buffer)): + if self._buffer[i] == ord(self._OK) or self._buffer[i] == ord(self._ERROR): + string = self._buffer[: i + 1].decode() + del self._buffer[: i + 1] + break + return string + + def flush(self): + del self._buffer[:] + while self.serialPortOrig.in_waiting: + self.serialPortOrig.read() + def open(self): - self.write("O") + self._write("O") def close(self): - self.write("C") + self._write("C") def _recv_internal(self, timeout): - if timeout != self.serialPortOrig.timeout: - self.serialPortOrig.timeout = timeout canId = None remote = False extended = False frame = [] - # First read what is already in the receive buffer - while ( - self.serialPortOrig.in_waiting and self.LINE_TERMINATOR not in self._buffer - ): - self._buffer += self.serialPortOrig.read(1) - - # If we still don't have a complete message, do a blocking read - if self.LINE_TERMINATOR not in self._buffer: - self._buffer += self.serialPortOrig.read_until(self.LINE_TERMINATOR) + string = self._read(timeout) - if self.LINE_TERMINATOR not in self._buffer: - # Timed out - return None, False - - readStr = self._buffer.decode() - del self._buffer[:] - if not readStr: + if not string: pass - elif readStr[0] == "T": + elif string[0] == "T": # extended frame - canId = int(readStr[1:9], 16) - dlc = int(readStr[9]) + canId = int(string[1:9], 16) + dlc = int(string[9]) extended = True for i in range(0, dlc): - frame.append(int(readStr[10 + i * 2 : 12 + i * 2], 16)) - elif readStr[0] == "t": + frame.append(int(string[10 + i * 2 : 12 + i * 2], 16)) + elif string[0] == "t": # normal frame - canId = int(readStr[1:4], 16) - dlc = int(readStr[4]) + canId = int(string[1:4], 16) + dlc = int(string[4]) for i in range(0, dlc): - frame.append(int(readStr[5 + i * 2 : 7 + i * 2], 16)) - elif readStr[0] == "r": + frame.append(int(string[5 + i * 2 : 7 + i * 2], 16)) + elif string[0] == "r": # remote frame - canId = int(readStr[1:4], 16) - dlc = int(readStr[4]) + canId = int(string[1:4], 16) + dlc = int(string[4]) remote = True - elif readStr[0] == "R": + elif string[0] == "R": # remote extended frame - canId = int(readStr[1:9], 16) - dlc = int(readStr[9]) + canId = int(string[1:9], 16) + dlc = int(string[9]) extended = True remote = True - if canId is not None: msg = Message( arbitration_id=canId, @@ -194,7 +228,6 @@ def _recv_internal(self, timeout): def send(self, msg, timeout=None): if timeout != self.serialPortOrig.write_timeout: self.serialPortOrig.write_timeout = timeout - if msg.is_remote_frame: if msg.is_extended_id: sendStr = "R%08X%d" % (msg.arbitration_id, msg.dlc) @@ -205,9 +238,8 @@ def send(self, msg, timeout=None): sendStr = "T%08X%d" % (msg.arbitration_id, msg.dlc) else: sendStr = "t%03X%d" % (msg.arbitration_id, msg.dlc) - sendStr += "".join(["%02X" % b for b in msg.data]) - self.write(sendStr) + self._write(sendStr) def shutdown(self): self.close() @@ -218,3 +250,78 @@ def fileno(self): return self.serialPortOrig.fileno() # Return an invalid file descriptor on Windows return -1 + + def get_version(self, timeout): + """Get HW and SW version of the slcan interface. + + :type timeout: int or None + :param timeout: + seconds to wait for version or None to wait indefinitely + + :returns: tuple (hw_version, sw_version) + WHERE + int hw_version is the hardware version or None on timeout + int sw_version is the software version or None on timeout + """ + cmd = "V" + self._write(cmd) + + start = time.time() + time_left = timeout + while True: + string = self._read(time_left) + + if not string: + pass + elif string[0] == cmd and len(string) == 6: + # convert ASCII coded version + hw_version = int(string[1:3]) + sw_version = int(string[3:5]) + return hw_version, sw_version + # if timeout is None, try indefinitely + if timeout is None: + continue + # try next one only if there still is time, and with + # reduced timeout + else: + time_left = timeout - (time.time() - start) + if time_left > 0: + continue + else: + return None, None + + def get_serial_number(self, timeout): + """Get serial number of the slcan interface. + + :type timeout: int or None + :param timeout: + seconds to wait for serial number or None to wait indefinitely + + :rtype str or None + :return: + None on timeout or a str object. + """ + cmd = "N" + self._write(cmd) + + start = time.time() + time_left = timeout + while True: + string = self._read(time_left) + + if not string: + pass + elif string[0] == cmd and len(string) == 6: + serial_number = string[1:-1] + return serial_number + # if timeout is None, try indefinitely + if timeout is None: + continue + # try next one only if there still is time, and with + # reduced timeout + else: + time_left = timeout - (time.time() - start) + if time_left > 0: + continue + else: + return None diff --git a/test/test_slcan.py b/test/test_slcan.py index c5d0d47e0..781fa75df 100644 --- a/test/test_slcan.py +++ b/test/test_slcan.py @@ -105,6 +105,24 @@ def test_partial_recv(self): msg = self.bus.recv(0) self.assertIsNotNone(msg) + def test_version(self): + self.serial.write(b"V1013\r") + hw_ver, sw_ver = self.bus.get_version(0) + self.assertEqual(hw_ver, 10) + self.assertEqual(sw_ver, 13) + + hw_ver, sw_ver = self.bus.get_version(0) + self.assertIsNone(hw_ver) + self.assertIsNone(sw_ver) + + def test_serial_number(self): + self.serial.write(b"NA123\r") + sn = self.bus.get_serial_number(0) + self.assertEqual(sn, "A123") + + sn = self.bus.get_serial_number(0) + self.assertIsNone(sn) + if __name__ == "__main__": unittest.main()