Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 81 additions & 41 deletions can/interfaces/seeedstudio/seeedstudio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import struct
import io
from time import time

import can
from can import BusABC, Message

logger = logging.getLogger("seeedbus")
Expand Down Expand Up @@ -84,19 +86,32 @@ def __init__(
:param bitrate
CAN bus bit rate, selected from available list.

:raises can.CanInitializationError: If the given parameters are invalid.
:raises can.CanInterfaceNotImplementedError: If the serial module is not installed.
"""

if serial is None:
raise can.CanInterfaceNotImplementedError(
"the serial module is not installed"
)

self.bit_rate = bitrate
self.frame_type = frame_type
self.op_mode = operation_mode
self.filter_id = bytearray([0x00, 0x00, 0x00, 0x00])
self.mask_id = bytearray([0x00, 0x00, 0x00, 0x00])
if not channel:
raise ValueError("Must specify a serial port.")
raise can.CanInitializationError("Must specify a serial port.")

self.channel_info = "Serial interface: " + channel
self.ser = serial.Serial(
channel, baudrate=baudrate, timeout=timeout, rtscts=False
)
try:
self.ser = serial.Serial(
channel, baudrate=baudrate, timeout=timeout, rtscts=False
)
except ValueError as error:
raise can.CanInitializationError(
"could not create the serial device"
) from error

super(SeeedBus, self).__init__(channel=channel, *args, **kwargs)
self.init_frame()
Expand Down Expand Up @@ -133,7 +148,10 @@ def init_frame(self, timeout=None):
byte_msg.append(crc)

logger.debug("init_frm:\t%s", byte_msg.hex())
self.ser.write(byte_msg)
try:
self.ser.write(byte_msg)
except Exception as error:
raise can.CanInitializationError("could send init frame") from error

def flush_buffer(self):
self.ser.flushInput()
Expand All @@ -160,7 +178,7 @@ def status_frame(self, timeout=None):
byte_msg.append(crc)

logger.debug("status_frm:\t%s", byte_msg.hex())
self.ser.write(byte_msg)
self._write(byte_msg)

def send(self, msg, timeout=None):
"""
Expand Down Expand Up @@ -197,7 +215,15 @@ def send(self, msg, timeout=None):
byte_msg.append(0x55)

logger.debug("sending:\t%s", byte_msg.hex())
self.ser.write(byte_msg)
self._write(byte_msg)

def _write(self, byte_msg: bytearray) -> None:
try:
self.ser.write(byte_msg)
except serial.PortNotOpenError as error:
raise can.CanOperationError("writing to closed port") from error
except serial.SerialTimeoutException as error:
raise can.CanTimeoutError() from error

def _recv_internal(self, timeout):
"""
Expand All @@ -220,50 +246,64 @@ def _recv_internal(self, timeout):
# or raise a SerialException
rx_byte_1 = self.ser.read()

except serial.PortNotOpenError as error:
raise can.CanOperationError("reading from closed port") from error
except serial.SerialException:
return None, False

if rx_byte_1 and ord(rx_byte_1) == 0xAA:
rx_byte_2 = ord(self.ser.read())
time_stamp = time()
if rx_byte_2 == 0x55:
status = bytearray([0xAA, 0x55])
status += bytearray(self.ser.read(18))
logger.debug("status resp:\t%s", status.hex())

else:
length = int(rx_byte_2 & 0x0F)
is_extended = bool(rx_byte_2 & 0x20)
is_remote = bool(rx_byte_2 & 0x10)
if is_extended:
s_3_4_5_6 = bytearray(self.ser.read(4))
arb_id = (struct.unpack("<I", s_3_4_5_6))[0]
try:
rx_byte_2 = ord(self.ser.read())

else:
s_3_4 = bytearray(self.ser.read(2))
arb_id = (struct.unpack("<H", s_3_4))[0]

data = bytearray(self.ser.read(length))
end_packet = ord(self.ser.read())
if end_packet == 0x55:
msg = Message(
timestamp=time_stamp,
arbitration_id=arb_id,
is_extended_id=is_extended,
is_remote_frame=is_remote,
dlc=length,
data=data,
)
logger.debug("recv message: %s", str(msg))
return msg, False
time_stamp = time()
if rx_byte_2 == 0x55:
status = bytearray([0xAA, 0x55])
status += bytearray(self.ser.read(18))
logger.debug("status resp:\t%s", status.hex())

else:
return None, False
length = int(rx_byte_2 & 0x0F)
is_extended = bool(rx_byte_2 & 0x20)
is_remote = bool(rx_byte_2 & 0x10)
if is_extended:
s_3_4_5_6 = bytearray(self.ser.read(4))
arb_id = (struct.unpack("<I", s_3_4_5_6))[0]

else:
s_3_4 = bytearray(self.ser.read(2))
arb_id = (struct.unpack("<H", s_3_4))[0]

data = bytearray(self.ser.read(length))
end_packet = ord(self.ser.read())
if end_packet == 0x55:
msg = Message(
timestamp=time_stamp,
arbitration_id=arb_id,
is_extended_id=is_extended,
is_remote_frame=is_remote,
dlc=length,
data=data,
)
logger.debug("recv message: %s", str(msg))
return msg, False

else:
return None, False

except serial.PortNotOpenError as error:
raise can.CanOperationError("reading from closed port") from error
except serial.SerialException as error:
raise can.CanOperationError(
"failed to read message information"
) from error

return None, None

def fileno(self):
try:
return self.ser.fileno()
except io.UnsupportedOperation:
raise NotImplementedError("fileno is not implemented using current CAN bus")
except io.UnsupportedOperation as excption:
logger.warning(
"fileno is not implemented using current CAN bus: %s", str(excption)
)
return -1