diff --git a/README.rst b/README.rst index dc823e3a..43d2d5eb 100644 --- a/README.rst +++ b/README.rst @@ -126,7 +126,6 @@ Pull requests are most welcome! * More unit test coverage * Period transmits using python-can cyclic API -* SDO block transfer * XDD support diff --git a/canopen/sdo.py b/canopen/sdo.py index af015e8a..4d18c928 100644 --- a/canopen/sdo.py +++ b/canopen/sdo.py @@ -2,11 +2,15 @@ import struct import logging import io +import binascii +import time try: import queue except ImportError: import Queue as queue +import can + from . import objectdictionary from . import common @@ -22,16 +26,29 @@ REQUEST_DOWNLOAD = 1 << 5 REQUEST_UPLOAD = 2 << 5 REQUEST_SEGMENT_UPLOAD = 3 << 5 +REQUEST_ABORTED = 4 << 5 +REQUEST_BLOCK_UPLOAD = 5 << 5 +REQUEST_BLOCK_DOWNLOAD = 6 << 5 RESPONSE_SEGMENT_UPLOAD = 0 << 5 RESPONSE_SEGMENT_DOWNLOAD = 1 << 5 RESPONSE_UPLOAD = 2 << 5 RESPONSE_DOWNLOAD = 3 << 5 RESPONSE_ABORTED = 4 << 5 +RESPONSE_BLOCK_DOWNLOAD = 5 << 5 +RESPONSE_BLOCK_UPLOAD = 6 << 5 + +INITIATE_BLOCK_TRANSFER = 0 +END_BLOCK_TRANSFER = 1 +BLOCK_TRANSFER_RESPONSE = 2 +START_BLOCK_UPLOAD = 3 EXPEDITED = 0x2 SIZE_SPECIFIED = 0x1 +BLOCK_SIZE_SPECIFIED = 0x2 +CRC_SUPPORTED = 0x4 NO_MORE_DATA = 0x1 +NO_MORE_BLOCKS = 0x80 TOGGLE_BIT = 0x10 @@ -62,33 +79,57 @@ def __init__(self, rx_cobid, tx_cobid, od): def on_response(self, can_id, data, timestamp): self.responses.put(bytes(data)) - def send_request(self, sdo_request): + def send_request(self, request): retries_left = self.MAX_RETRIES - response = None - if not self.responses.empty(): - #logger.warning("There were unexpected messages in the queue") - self.responses = queue.Queue() - while retries_left: - # Wait for node to respond - self.network.send_message(self.rx_cobid, sdo_request) + while True: try: - response = self.responses.get( - block=True, timeout=self.RESPONSE_TIMEOUT) - except queue.Empty: + self.network.send_message(self.rx_cobid, request) + except can.CanError as e: + # Could be a buffer overflow. Wait some time before trying again retries_left -= 1 + if not retries_left: + raise + logger.info(str(e)) + time.sleep(0.1) else: break - if not response: + def read_response(self): + try: + response = self.responses.get( + block=True, timeout=self.RESPONSE_TIMEOUT) + except queue.Empty: raise SdoCommunicationError("No SDO response received") - if retries_left < self.MAX_RETRIES: - logger.warning("There were some issues while communicating with the node") - res_command, = struct.unpack("B", response[0:1]) + res_command, = struct.unpack_from("B", response) if res_command == RESPONSE_ABORTED: - abort_code, = struct.unpack(" 1 to indicate the size in bytes of a fixed-size chunk buffer. + :param int size: + Size of data to that will be transmitted. + :param bool block_transfer: + If block transfer should be used. :returns: A file like object. """ + buffer_size = buffering if buffering > 1 else io.DEFAULT_BUFFER_SIZE if "r" in mode: - RawStreamCls = ReadableStream - BufferedStreamCls = io.BufferedReader + if block_transfer: + raw_stream = BlockUploadStream( + self.sdo_node, self.od.index, self.od.subindex) + else: + raw_stream = ReadableStream( + self.sdo_node, self.od.index, self.od.subindex) + if buffering: + buffered_stream = io.BufferedReader(raw_stream, buffer_size=buffer_size) + else: + return raw_stream if "w" in mode: - RawStreamCls = WritableStream - BufferedStreamCls = io.BufferedWriter - raw_stream = RawStreamCls(self.sdo_node, self.od.index, self.od.subindex) - if buffering == 0: - return raw_stream - # Line buffering is not supported by BufferedReader - buffer_size = buffering if buffering > 1 else io.DEFAULT_BUFFER_SIZE - buffered_stream = BufferedStreamCls(raw_stream, buffer_size=buffer_size) + if block_transfer: + raw_stream = BlockDownloadStream( + self.sdo_node, self.od.index, self.od.subindex, size) + else: + raw_stream = WritableStream( + self.sdo_node, self.od.index, self.od.subindex, size) + if buffering: + buffered_stream = io.BufferedWriter(raw_stream, buffer_size=buffer_size) + else: + return raw_stream if "b" not in mode: # Text mode line_buffering = buffering == 1 @@ -308,7 +365,7 @@ def __init__(self, sdo_client, index, subindex=0): sdo_client.rx_cobid - 0x600) request = SDO_STRUCT.pack(REQUEST_UPLOAD, index, subindex) request += b"\x00\x00\x00\x00" - response = sdo_client.send_request(request) + response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack(response[0:4]) res_data = response[4:8] @@ -357,7 +414,7 @@ def read(self, size=-1): command |= self._toggle request = bytearray(8) request[0] = command - response = self.sdo_client.send_request(request) + response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_UPLOAD: raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) @@ -414,7 +471,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False else: size_data = b"\x00\x00\x00\x00" request = SDO_STRUCT.pack(command, index, subindex) + size_data - response = sdo_client.send_request(request) + response = sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command != RESPONSE_DOWNLOAD: raise SdoCommunicationError( @@ -441,7 +498,7 @@ def write(self, b): assert len(b) <= 4, "More data received than expected" data = b.tobytes() if isinstance(b, memoryview) else b request = self._exp_header + data.ljust(4, b"\x00") - response = self.sdo_client.send_request(request) + response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_DOWNLOAD: raise SdoCommunicationError( @@ -465,7 +522,7 @@ def write(self, b): command |= (7 - bytes_sent) << 1 request[0] = command request[1:bytes_sent + 1] = b[0:bytes_sent] - response = self.sdo_client.send_request(request) + response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_DOWNLOAD: raise SdoCommunicationError( @@ -488,8 +545,322 @@ def close(self): command |= 7 << 1 request = bytearray(8) request[0] = command + self.sdo_client.request_response(request) + self._done = True + + def writable(self): + return True + + +class BlockUploadStream(io.RawIOBase): + """File like object for reading from a variable using block upload.""" + + #: Total size of data or ``None`` if not specified + size = None + + blksize = 127 + + crc_supported = False + + def __init__(self, sdo_client, index, subindex=0): + """ + :param canopen.sdo.SdoClient sdo_client: + The SDO client to use for reading. + :param int index: + Object dictionary index to read from. + :param int subindex: + Object dictionary sub-index to read from. + """ + self._done = False + self.sdo_client = sdo_client + self.pos = 0 + self._crc = 0 + self._server_crc = None + self._ackseq = 0 + + logger.debug("Reading 0x%X:%d from node %d", index, subindex, + sdo_client.rx_cobid - 0x600) + # Initiate Block Upload + request = bytearray(8) + command = REQUEST_BLOCK_UPLOAD | INITIATE_BLOCK_TRANSFER | CRC_SUPPORTED + struct.pack_into("= self.blksize or res_command & NO_MORE_BLOCKS: + self._ack_block() + if res_command & NO_MORE_BLOCKS: + n = self._end_upload() + data = response[1:8 - n] + self._done = True + else: + data = response[1:8] + if self.crc_supported: + self._crc = binascii.crc_hqx(data, self._crc) + if self._done: + if self._server_crc != self._crc: + self.sdo_client.abort(0x05040004) + raise SdoCommunicationError("CRC is not OK") + logger.info("CRC is OK") + self.pos += len(data) + return data + + def _retransmit(self): + logger.info("Only %d sequences were received. Requesting retransmission", + self._ackseq) + end_time = time.time() + self.sdo_client.RESPONSE_TIMEOUT + self._ack_block() + while time.time() < end_time: + response = self.sdo_client.read_response() + res_command, = struct.unpack_from("B", response) + seqno = res_command & 0x7F + if seqno == self._ackseq + 1: + # We should be back in sync + self._ackseq = seqno + return response + raise SdoCommunicationError("Some data were lost and could not be retransmitted") + + def _ack_block(self): + request = bytearray(8) + request[0] = REQUEST_BLOCK_UPLOAD | BLOCK_TRANSFER_RESPONSE + request[1] = self._ackseq + request[2] = self.blksize + self.sdo_client.send_request(request) + if self._ackseq == self.blksize: + self._ackseq = 0 + + def _end_upload(self): + response = self.sdo_client.read_response() + res_command, self._server_crc = struct.unpack_from("> 2) & 0x7 + + def close(self): + if self.closed: + return + super(BlockUploadStream, self).close() + if self._done: + request = bytearray(8) + request[0] = REQUEST_BLOCK_UPLOAD | END_BLOCK_TRANSFER self.sdo_client.send_request(request) + + def tell(self): + return self.pos + + def readinto(self, b): + """ + Read bytes into a pre-allocated, writable bytes-like object b, + and return the number of bytes read. + """ + data = self.read(7) + b[:len(data)] = data + return len(data) + + def readable(self): + return True + + +class BlockDownloadStream(io.RawIOBase): + """File like object for block download.""" + + def __init__(self, sdo_client, index, subindex=0, size=None): + """ + :param canopen.sdo.SdoClient sdo_client: + The SDO client to use for communication. + :param int index: + Object dictionary index to read from. + :param int subindex: + Object dictionary sub-index to read from. + :param int size: + Size of data in number of bytes if known in advance. + """ + self.sdo_client = sdo_client + self.size = size + self.pos = 0 + self._done = False + self._seqno = 0 + self._crc = 0 + self._last_bytes_sent = 0 + command = REQUEST_BLOCK_DOWNLOAD | INITIATE_BLOCK_TRANSFER | CRC_SUPPORTED + request = bytearray(8) + logger.info("Initiating block download for 0x%X:%d", index, subindex) + if size is not None: + logger.debug("Expected size of data is %d bytes", size) + command |= BLOCK_SIZE_SPECIFIED + struct.pack_into("= self.size: + # This is the last data to be transmitted based on expected size + self.send(data, end=True) + elif len(data) < 7: + # We can't send less than 7 bytes in the middle of a transmission + return None + else: + self.send(data) + return len(data) + + def send(self, b, end=False): + """Send up to 7 bytes of data. + + :param bytes b: + 0 - 7 bytes of data to transmit. + :param bool end: + If this is the last data. + """ + assert len(b) <= 7 + if not end: + assert len(b) == 7 + self._seqno += 1 + command = self._seqno + if end: + command |= NO_MORE_BLOCKS self._done = True + # Change expected ACK:ed sequence + self._blksize = self._seqno + # Save how many bytes this message contains since this is the last + self._last_bytes_sent = len(b) + request = bytearray(8) + request[0] = command + request[1:len(b) + 1] = b + self.sdo_client.send_request(request) + self.pos += len(b) + if self.crc_supported: + # Calculate CRC + self._crc = binascii.crc_hqx(b, self._crc) + if self._seqno >= self._blksize: + # End of this block, wait for ACK + self._block_ack() + + def tell(self): + return self.pos + + def _block_ack(self): + logger.debug("Waiting for acknowledgement of last block...") + response = self.sdo_client.read_response() + res_command, ackseq, blksize = struct.unpack_from("BBB", response) + if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: + self.sdo_client.abort(0x05040001) + raise SdoCommunicationError( + "Unexpected response 0x%02X" % res_command) + if res_command & 0x3 != BLOCK_TRANSFER_RESPONSE: + self.sdo_client.abort(0x05040001) + raise SdoCommunicationError("Server did not respond with a " + "block download response") + if ackseq != self._blksize: + self.sdo_client.abort(0x05040003) + raise SdoCommunicationError( + ("%d of %d sequences were received. " + "Retransmission is not supported yet.") % (ackseq, self._blksize)) + logger.debug("All %d sequences were received successfully", ackseq) + logger.debug("Server requested a block size of %d", blksize) + self._blksize = blksize + self._seqno = 0 + + def close(self): + """Closes the stream.""" + if self.closed: + return + super(BlockDownloadStream, self).close() + if not self._done: + logger.error("Block transfer was not finished") + command = REQUEST_BLOCK_DOWNLOAD | END_BLOCK_TRANSFER + # Specify number of bytes in last message that did not contain data + command |= (7 - self._last_bytes_sent) << 2 + request = bytearray(8) + request[0] = command + if self.crc_supported: + # Add CRC + struct.pack_into(" %s (%s)" % (binascii.hexlify(data), binascii.hexlify(next_data[1]))) + self.assertSequenceEqual(data, next_data[1]) self.assertEqual(can_id, 0x602) - self.network.notify(0x582, self.data.pop(0), 0.0) + while self.data and self.data[0][0] == RX: + #print("< %s" % binascii.hexlify(self.data[0][1])) + self.network.notify(0x582, self.data.pop(0)[1], 0.0) def setUp(self): network = canopen.Network() @@ -32,64 +40,101 @@ def setUp(self): def test_expedited_upload(self): self.data = [ - b'\x40\x18\x10\x01\x00\x00\x00\x00', - b'\x43\x18\x10\x01\x04\x00\x00\x00' + (TX, b'\x40\x18\x10\x01\x00\x00\x00\x00'), + (RX, b'\x43\x18\x10\x01\x04\x00\x00\x00') ] vendor_id = self.network[2].sdo[0x1018][1].raw self.assertEqual(vendor_id, 4) # UNSIGNED8 without padded data part (see issue #5) self.data = [ - b'\x40\x00\x14\x02\x00\x00\x00\x00', - b'\x4f\x00\x14\x02\xfe' + (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), + (RX, b'\x4f\x00\x14\x02\xfe') ] trans_type = self.network[2].sdo[0x1400]['Transmission type RPDO 1'].raw self.assertEqual(trans_type, 254) def test_expedited_download(self): self.data = [ - b'\x2b\x17\x10\x00\xa0\x0f\x00\x00', - b'\x60\x17\x10\x00\x00\x00\x00\x00' + (TX, b'\x2b\x17\x10\x00\xa0\x0f\x00\x00'), + (RX, b'\x60\x17\x10\x00\x00\x00\x00\x00') ] self.network[2].sdo[0x1017].raw = 4000 def test_segmented_upload(self): self.data = [ - b'\x40\x08\x10\x00\x00\x00\x00\x00', - b'\x41\x08\x10\x00\x1A\x00\x00\x00', - b'\x60\x00\x00\x00\x00\x00\x00\x00', - b'\x00\x54\x69\x6E\x79\x20\x4E\x6F', - b'\x70\x00\x00\x00\x00\x00\x00\x00', - b'\x10\x64\x65\x20\x2D\x20\x4D\x65', - b'\x60\x00\x00\x00\x00\x00\x00\x00', - b'\x00\x67\x61\x20\x44\x6F\x6D\x61', - b'\x70\x00\x00\x00\x00\x00\x00\x00', - b'\x15\x69\x6E\x73\x20\x21\x00\x00' + (TX, b'\x40\x08\x10\x00\x00\x00\x00\x00'), + (RX, b'\x41\x08\x10\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') ] device_name = self.network[2].sdo[0x1008].raw self.assertEqual(device_name, "Tiny Node - Mega Domains !") def test_segmented_download(self): self.data = [ - b'\x21\x00\x20\x00\x0d\x00\x00\x00', - b'\x60\x00\x20\x00\x00\x00\x00\x00', - b'\x00\x41\x20\x6c\x6f\x6e\x67\x20', - b'\x20\x00\x20\x00\x00\x00\x00\x00', - b'\x13\x73\x74\x72\x69\x6e\x67\x00', - b'\x30\x00\x20\x00\x00\x00\x00\x00' + (TX, b'\x21\x00\x20\x00\x0d\x00\x00\x00'), + (RX, b'\x60\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x00\x41\x20\x6c\x6f\x6e\x67\x20'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x13\x73\x74\x72\x69\x6e\x67\x00'), + (RX, b'\x30\x00\x20\x00\x00\x00\x00\x00') ] self.network[2].sdo['Writable string'].raw = 'A long string' + def test_block_download(self): + self.data = [ + (TX, b'\xc6\x00\x20\x00\x1e\x00\x00\x00'), + (RX, b'\xa4\x00\x20\x00\x7f\x00\x00\x00'), + (TX, b'\x01\x41\x20\x72\x65\x61\x6c\x6c'), + (TX, b'\x02\x79\x20\x72\x65\x61\x6c\x6c'), + (TX, b'\x03\x79\x20\x6c\x6f\x6e\x67\x20'), + (TX, b'\x04\x73\x74\x72\x69\x6e\x67\x2e'), + (TX, b'\x85\x2e\x2e\x00\x00\x00\x00\x00'), + (RX, b'\xa2\x05\x7f\x00\x00\x00\x00\x00'), + (TX, b'\xd5\x45\x69\x00\x00\x00\x00\x00'), + (RX, b'\xa1\x00\x00\x00\x00\x00\x00\x00') + ] + data = b'A really really long string...' + fp = self.network[2].sdo['Writable string'].open( + 'wb', size=len(data), block_transfer=True) + fp.write(data) + fp.close() + + def test_block_upload(self): + self.data = [ + (TX, b'\xa4\x08\x10\x00\x7f\x00\x00\x00'), + (RX, b'\xc6\x08\x10\x00\x1a\x00\x00\x00'), + (TX, b'\xa3\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x01\x54\x69\x6e\x79\x20\x4e\x6f'), + (RX, b'\x02\x64\x65\x20\x2d\x20\x4d\x65'), + (RX, b'\x03\x67\x61\x20\x44\x6f\x6d\x61'), + (RX, b'\x84\x69\x6e\x73\x20\x21\x00\x00'), + (TX, b'\xa2\x04\x7f\x00\x00\x00\x00\x00'), + (RX, b'\xc9\x40\xe1\x00\x00\x00\x00\x00'), + (TX, b'\xa1\x00\x00\x00\x00\x00\x00\x00') + ] + fp = self.network[2].sdo[0x1008].open('r', block_transfer=True) + data = fp.read() + fp.close() + self.assertEqual(data, 'Tiny Node - Mega Domains !') + def test_writable_file(self): self.data = [ - b'\x20\x00\x20\x00\x00\x00\x00\x00', - b'\x60\x00\x20\x00\x00\x00\x00\x00', - b'\x00\x31\x32\x33\x34\x35\x36\x37', - b'\x20\x00\x20\x00\x00\x00\x00\x00', - b'\x1a\x38\x39\x00\x00\x00\x00\x00', - b'\x30\x00\x20\x00\x00\x00\x00\x00', - b'\x0f\x00\x00\x00\x00\x00\x00\x00', - b'\x20\x00\x20\x00\x00\x00\x00\x00' + (TX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (RX, b'\x60\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x00\x31\x32\x33\x34\x35\x36\x37'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x1a\x38\x39\x00\x00\x00\x00\x00'), + (RX, b'\x30\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x0f\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00') ] fp = self.network[2].sdo['Writable string'].open('wb') fp.write(b'1234') @@ -102,8 +147,8 @@ def test_writable_file(self): def test_abort(self): self.data = [ - b'\x40\x18\x10\x01\x00\x00\x00\x00', - b'\x80\x18\x10\x01\x11\x00\x09\x06' + (TX, b'\x40\x18\x10\x01\x00\x00\x00\x00'), + (RX, b'\x80\x18\x10\x01\x11\x00\x09\x06') ] with self.assertRaises(canopen.SdoAbortedError) as cm: vendor_id = self.network[2].sdo[0x1018][1].raw