From a70f1b122b1fac12fb3f5827e258ed398ebe1114 Mon Sep 17 00:00:00 2001 From: Christian Sandberg Date: Sun, 30 Apr 2017 21:22:16 +0200 Subject: [PATCH 1/6] First implementation of SDO block download Addresses part of issue #14. --- canopen/sdo.py | 234 ++++++++++++++++++++++++++++++++++++++++------- test/test_sdo.py | 97 +++++++++++++------- 2 files changed, 262 insertions(+), 69 deletions(-) diff --git a/canopen/sdo.py b/canopen/sdo.py index af015e8a..88021f44 100644 --- a/canopen/sdo.py +++ b/canopen/sdo.py @@ -2,11 +2,15 @@ import struct import logging import io +import binascii +import time try: import queue except ImportError: import Queue as queue +import can + from . import objectdictionary from . import common @@ -22,16 +26,22 @@ REQUEST_DOWNLOAD = 1 << 5 REQUEST_UPLOAD = 2 << 5 REQUEST_SEGMENT_UPLOAD = 3 << 5 +REQUEST_BLOCK_DOWNLOAD = 6 << 5 RESPONSE_SEGMENT_UPLOAD = 0 << 5 RESPONSE_SEGMENT_DOWNLOAD = 1 << 5 RESPONSE_UPLOAD = 2 << 5 RESPONSE_DOWNLOAD = 3 << 5 RESPONSE_ABORTED = 4 << 5 +RESPONSE_BLOCK_DOWNLOAD = 5 << 5 EXPEDITED = 0x2 SIZE_SPECIFIED = 0x1 +BLOCK_SIZE_SPECIFIED = 0x2 +CRC_SUPPORTED = 0x4 NO_MORE_DATA = 0x1 +NO_MORE_BLOCKS = 0x80 +END_BLOCK_DOWNLOAD = 0x1 TOGGLE_BIT = 0x10 @@ -62,33 +72,50 @@ def __init__(self, rx_cobid, tx_cobid, od): def on_response(self, can_id, data, timestamp): self.responses.put(bytes(data)) - def send_request(self, sdo_request): + def send_request(self, request): retries_left = self.MAX_RETRIES - response = None - if not self.responses.empty(): - #logger.warning("There were unexpected messages in the queue") - self.responses = queue.Queue() while retries_left: - # Wait for node to respond - self.network.send_message(self.rx_cobid, sdo_request) try: - response = self.responses.get( - block=True, timeout=self.RESPONSE_TIMEOUT) - except queue.Empty: - retries_left -= 1 + self.network.send_message(self.rx_cobid, request) + except can.CanError as e: + # Could be a buffer overflow. Wait some time before trying again + logger.info(str(e)) + time.sleep(0.1) + if retries_left: + retries_left -= 1 + else: + raise else: break - if not response: + def read_response(self): + try: + response = self.responses.get( + block=True, timeout=self.RESPONSE_TIMEOUT) + except queue.Empty: raise SdoCommunicationError("No SDO response received") - if retries_left < self.MAX_RETRIES: - logger.warning("There were some issues while communicating with the node") - res_command, = struct.unpack("B", response[0:1]) + res_command, = struct.unpack_from("B", response) if res_command == RESPONSE_ABORTED: - abort_code, = struct.unpack(" 1 to indicate the size in bytes of a fixed-size chunk buffer. + :param int size: + Size of data to that will be transmitted. + :param bool block_transfer: + If block transfer should be used. :returns: A file like object. """ + buffer_size = buffering if buffering > 1 else io.DEFAULT_BUFFER_SIZE if "r" in mode: - RawStreamCls = ReadableStream - BufferedStreamCls = io.BufferedReader + if block_transfer: + raise NotImplementedError("Block upload not supported") + else: + raw_stream = ReadableStream( + self.sdo_node, self.od.index, self.od.subindex) + if buffering: + buffered_stream = io.BufferedReader(raw_stream, buffer_size=buffer_size) + else: + return raw_stream if "w" in mode: - RawStreamCls = WritableStream - BufferedStreamCls = io.BufferedWriter - raw_stream = RawStreamCls(self.sdo_node, self.od.index, self.od.subindex) - if buffering == 0: - return raw_stream - # Line buffering is not supported by BufferedReader - buffer_size = buffering if buffering > 1 else io.DEFAULT_BUFFER_SIZE - buffered_stream = BufferedStreamCls(raw_stream, buffer_size=buffer_size) + if block_transfer: + raw_stream = BlockDownloadStream( + self.sdo_node, self.od.index, self.od.subindex, size) + else: + raw_stream = WritableStream( + self.sdo_node, self.od.index, self.od.subindex, size) + if buffering: + buffered_stream = io.BufferedWriter(raw_stream, buffer_size=buffer_size) + else: + return raw_stream if "b" not in mode: # Text mode line_buffering = buffering == 1 @@ -308,7 +350,7 @@ def __init__(self, sdo_client, index, subindex=0): sdo_client.rx_cobid - 0x600) request = SDO_STRUCT.pack(REQUEST_UPLOAD, index, subindex) request += b"\x00\x00\x00\x00" - response = sdo_client.send_request(request) + response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack(response[0:4]) res_data = response[4:8] @@ -357,7 +399,7 @@ def read(self, size=-1): command |= self._toggle request = bytearray(8) request[0] = command - response = self.sdo_client.send_request(request) + response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_UPLOAD: raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) @@ -414,7 +456,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False else: size_data = b"\x00\x00\x00\x00" request = SDO_STRUCT.pack(command, index, subindex) + size_data - response = sdo_client.send_request(request) + response = sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command != RESPONSE_DOWNLOAD: raise SdoCommunicationError( @@ -441,7 +483,7 @@ def write(self, b): assert len(b) <= 4, "More data received than expected" data = b.tobytes() if isinstance(b, memoryview) else b request = self._exp_header + data.ljust(4, b"\x00") - response = self.sdo_client.send_request(request) + response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_DOWNLOAD: raise SdoCommunicationError( @@ -465,7 +507,7 @@ def write(self, b): command |= (7 - bytes_sent) << 1 request[0] = command request[1:bytes_sent + 1] = b[0:bytes_sent] - response = self.sdo_client.send_request(request) + response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_DOWNLOAD: raise SdoCommunicationError( @@ -488,8 +530,132 @@ def close(self): command |= 7 << 1 request = bytearray(8) request[0] = command + self.sdo_client.request_response(request) + self._done = True + + def writable(self): + return True + + +class BlockDownloadStream(io.RawIOBase): + """File like object for block download.""" + + def __init__(self, sdo_client, index, subindex=0, size=None): + """ + :param canopen.sdo.SdoClient sdo_client: + The SDO client to use for communication. + :param int index: + Object dictionary index to read from. + :param int subindex: + Object dictionary sub-index to read from. + :param int size: + Size of data in number of bytes if known in advance. + """ + self.sdo_client = sdo_client + self.size = size + self.pos = 0 + self._done = False + self._seqno = 0 + self._crc = 0 + self._last_bytes_sent = 0 + self._unconsumed = bytearray() + command = REQUEST_BLOCK_DOWNLOAD | CRC_SUPPORTED + request = bytearray(8) + if size is not None: + command |= BLOCK_SIZE_SPECIFIED + struct.pack_into("= self.size: + # No more blocks after this message + command |= NO_MORE_BLOCKS + self._done = True + # Change expected number of ACK:ed sequences + self._blksize = self._seqno + elif bytes_sent < 7: + # We can't send less than 7 bytes in the middle of a transmission + # Save this data for next transmission + self._unconsumed = bytearray(data) + # Calculate CRC + self._crc = binascii.crc_hqx(data, self._crc) + request = bytearray(8) + request[0] = command + request[1:bytes_sent + 1] = data + self.sdo_client.send_request(request) + if self._seqno >= self._blksize or self._done: + # End of this block, wait for ACK + self._block_ack() + return bytes_sent + + def _block_ack(self): + response = self.sdo_client.read_response() + res_command, ackseq, blksize = struct.unpack_from("BBB", response) + if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: + raise SdoCommunicationError( + "Unexpected response 0x%02X" % res_command) + if ackseq != self._blksize: + raise SdoCommunicationError( + ("%d of %d sequences were received. " + "Retransmission is not supported yet.") % (ackseq, self._blksize)) + self._blksize = blksize + self._seqno = 0 + + def close(self): + """Closes the stream.""" + super(BlockDownloadStream, self).close() + if not self._done: + # This might happen if size is unknown or an error occurred + self._seqno += 1 + request = bytearray(8) + request[0] = self._seqno | NO_MORE_BLOCKS + request[1:len(self._unconsumed) + 1] = self._unconsumed self.sdo_client.send_request(request) self._done = True + self._last_bytes_sent = len(self._unconsumed) + self._blksize = self._seqno + self._block_ack() + command = REQUEST_BLOCK_DOWNLOAD | END_BLOCK_DOWNLOAD + # Specify number of bytes in last message that did not contain data + command |= (7 - self._last_bytes_sent) << 2 + request = bytearray(8) + request[0] = command + # Add CRC + struct.pack_into(" %s (%s)" % (binascii.hexlify(data), binascii.hexlify(next_data[1]))) + self.assertSequenceEqual(data, next_data[1]) self.assertEqual(can_id, 0x602) - self.network.notify(0x582, self.data.pop(0), 0.0) + while self.data and self.data[0][0] == RX: + #print("< %s" % binascii.hexlify(self.data[0][1])) + self.network.notify(0x582, self.data.pop(0)[1], 0.0) def setUp(self): network = canopen.Network() @@ -32,64 +40,83 @@ def setUp(self): def test_expedited_upload(self): self.data = [ - b'\x40\x18\x10\x01\x00\x00\x00\x00', - b'\x43\x18\x10\x01\x04\x00\x00\x00' + (TX, b'\x40\x18\x10\x01\x00\x00\x00\x00'), + (RX, b'\x43\x18\x10\x01\x04\x00\x00\x00') ] vendor_id = self.network[2].sdo[0x1018][1].raw self.assertEqual(vendor_id, 4) # UNSIGNED8 without padded data part (see issue #5) self.data = [ - b'\x40\x00\x14\x02\x00\x00\x00\x00', - b'\x4f\x00\x14\x02\xfe' + (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), + (RX, b'\x4f\x00\x14\x02\xfe') ] trans_type = self.network[2].sdo[0x1400]['Transmission type RPDO 1'].raw self.assertEqual(trans_type, 254) def test_expedited_download(self): self.data = [ - b'\x2b\x17\x10\x00\xa0\x0f\x00\x00', - b'\x60\x17\x10\x00\x00\x00\x00\x00' + (TX, b'\x2b\x17\x10\x00\xa0\x0f\x00\x00'), + (RX, b'\x60\x17\x10\x00\x00\x00\x00\x00') ] self.network[2].sdo[0x1017].raw = 4000 def test_segmented_upload(self): self.data = [ - b'\x40\x08\x10\x00\x00\x00\x00\x00', - b'\x41\x08\x10\x00\x1A\x00\x00\x00', - b'\x60\x00\x00\x00\x00\x00\x00\x00', - b'\x00\x54\x69\x6E\x79\x20\x4E\x6F', - b'\x70\x00\x00\x00\x00\x00\x00\x00', - b'\x10\x64\x65\x20\x2D\x20\x4D\x65', - b'\x60\x00\x00\x00\x00\x00\x00\x00', - b'\x00\x67\x61\x20\x44\x6F\x6D\x61', - b'\x70\x00\x00\x00\x00\x00\x00\x00', - b'\x15\x69\x6E\x73\x20\x21\x00\x00' + (TX, b'\x40\x08\x10\x00\x00\x00\x00\x00'), + (RX, b'\x41\x08\x10\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') ] device_name = self.network[2].sdo[0x1008].raw self.assertEqual(device_name, "Tiny Node - Mega Domains !") def test_segmented_download(self): self.data = [ - b'\x21\x00\x20\x00\x0d\x00\x00\x00', - b'\x60\x00\x20\x00\x00\x00\x00\x00', - b'\x00\x41\x20\x6c\x6f\x6e\x67\x20', - b'\x20\x00\x20\x00\x00\x00\x00\x00', - b'\x13\x73\x74\x72\x69\x6e\x67\x00', - b'\x30\x00\x20\x00\x00\x00\x00\x00' + (TX, b'\x21\x00\x20\x00\x0d\x00\x00\x00'), + (RX, b'\x60\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x00\x41\x20\x6c\x6f\x6e\x67\x20'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x13\x73\x74\x72\x69\x6e\x67\x00'), + (RX, b'\x30\x00\x20\x00\x00\x00\x00\x00') ] self.network[2].sdo['Writable string'].raw = 'A long string' + def test_block_download(self): + self.data = [ + (TX, b'\xc6\x00\x20\x00\x1e\x00\x00\x00'), + (RX, b'\xa4\x00\x20\x00\x7f\x00\x00\x00'), + (TX, b'\x01\x41\x20\x72\x65\x61\x6c\x6c'), + (TX, b'\x02\x79\x20\x72\x65\x61\x6c\x6c'), + (TX, b'\x03\x79\x20\x6c\x6f\x6e\x67\x20'), + (TX, b'\x04\x73\x74\x72\x69\x6e\x67\x2e'), + (TX, b'\x85\x2e\x2e\x00\x00\x00\x00\x00'), + (RX, b'\xa2\x05\x7f\x00\x00\x00\x00\x00'), + (TX, b'\xd5\x45\x69\x00\x00\x00\x00\x00'), + (RX, b'\xa1\x00\x00\x00\x00\x00\x00\x00') + ] + data = b'A really really long string...' + fp = self.network[2].sdo['Writable string'].open( + 'wb', size=len(data), block_transfer=True) + fp.write(data) + fp.close() + def test_writable_file(self): self.data = [ - b'\x20\x00\x20\x00\x00\x00\x00\x00', - b'\x60\x00\x20\x00\x00\x00\x00\x00', - b'\x00\x31\x32\x33\x34\x35\x36\x37', - b'\x20\x00\x20\x00\x00\x00\x00\x00', - b'\x1a\x38\x39\x00\x00\x00\x00\x00', - b'\x30\x00\x20\x00\x00\x00\x00\x00', - b'\x0f\x00\x00\x00\x00\x00\x00\x00', - b'\x20\x00\x20\x00\x00\x00\x00\x00' + (TX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (RX, b'\x60\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x00\x31\x32\x33\x34\x35\x36\x37'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x1a\x38\x39\x00\x00\x00\x00\x00'), + (RX, b'\x30\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x0f\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00') ] fp = self.network[2].sdo['Writable string'].open('wb') fp.write(b'1234') @@ -102,8 +129,8 @@ def test_writable_file(self): def test_abort(self): self.data = [ - b'\x40\x18\x10\x01\x00\x00\x00\x00', - b'\x80\x18\x10\x01\x11\x00\x09\x06' + (TX, b'\x40\x18\x10\x01\x00\x00\x00\x00'), + (RX, b'\x80\x18\x10\x01\x11\x00\x09\x06') ] with self.assertRaises(canopen.SdoAbortedError) as cm: vendor_id = self.network[2].sdo[0x1018][1].raw From 27627011d17cc271be47184d1b9127d8f596a0ee Mon Sep 17 00:00:00 2001 From: Christian Sandberg Date: Sun, 30 Apr 2017 21:43:47 +0200 Subject: [PATCH 2/6] Fix Python 3 issue --- canopen/sdo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canopen/sdo.py b/canopen/sdo.py index 88021f44..5dbb0d1e 100644 --- a/canopen/sdo.py +++ b/canopen/sdo.py @@ -576,7 +576,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None): "Node returned a value for 0x{:X}:{:d} instead, " "maybe there is another SDO client communicating " "on the same SDO channel?").format(res_index, res_subindex)) - self._blksize = struct.unpack_from("B", response, 4) + self._blksize, = struct.unpack_from("B", response, 4) def write(self, b): """ From 28e172be321aeb0f74360843fab5d4bff99452d9 Mon Sep 17 00:00:00 2001 From: Christian Sandberg Date: Mon, 1 May 2017 10:52:33 +0200 Subject: [PATCH 3/6] Block download refactoring Return None if data could not be sent Add debug messages Add some block transfer abort codes --- canopen/sdo.py | 85 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/canopen/sdo.py b/canopen/sdo.py index 5dbb0d1e..9d9a803b 100644 --- a/canopen/sdo.py +++ b/canopen/sdo.py @@ -558,10 +558,11 @@ def __init__(self, sdo_client, index, subindex=0, size=None): self._seqno = 0 self._crc = 0 self._last_bytes_sent = 0 - self._unconsumed = bytearray() command = REQUEST_BLOCK_DOWNLOAD | CRC_SUPPORTED request = bytearray(8) + logger.info("Initiating block download for 0x%X:%d", index, subindex) if size is not None: + logger.debug("Expected size of data is %d bytes", size) command |= BLOCK_SIZE_SPECIFIED struct.pack_into("= self.size: + # This is the last data to be transmitted based on expected size + self.send(data, end=True) + elif len(data) < 7: + # We can't send less than 7 bytes in the middle of a transmission + return None + else: + self.send(data) + return len(data) + + def send(self, b, end=False): + """Send up to 7 bytes of data. + + :param bytes b: + 0 - 7 bytes of data to transmit. + :param bool end: + If this is the last data. + """ + assert len(b) <= 7 self._seqno += 1 command = self._seqno - # Can send up to 7 bytes at a time - bytes_sent = min(len(b), 7) - data = b[0:bytes_sent] - if self._unconsumed: - # There were uncomsumed data from last time - data = self._unconsumed + data[0:7 - len(self._unconsumed)] - self._unconsumed = bytearray() - # Save how many bytes this message contains if this should be the last - self._last_bytes_sent = bytes_sent - self.pos += bytes_sent - if self.size is not None and self.pos >= self.size: - # No more blocks after this message + if end: command |= NO_MORE_BLOCKS self._done = True - # Change expected number of ACK:ed sequences + # Change expected ACK:ed sequence self._blksize = self._seqno - elif bytes_sent < 7: - # We can't send less than 7 bytes in the middle of a transmission - # Save this data for next transmission - self._unconsumed = bytearray(data) - # Calculate CRC - self._crc = binascii.crc_hqx(data, self._crc) + # Save how many bytes this message contains since this is the last + self._last_bytes_sent = len(b) request = bytearray(8) request[0] = command - request[1:bytes_sent + 1] = data + request[1:len(b) + 1] = b self.sdo_client.send_request(request) - if self._seqno >= self._blksize or self._done: + self.pos += len(b) + # Calculate CRC + self._crc = binascii.crc_hqx(b, self._crc) + if self._seqno >= self._blksize: # End of this block, wait for ACK self._block_ack() - return bytes_sent + + def tell(self): + return self.pos def _block_ack(self): + logger.debug("Waiting for acknowledgement of last block...") response = self.sdo_client.read_response() res_command, ackseq, blksize = struct.unpack_from("BBB", response) if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: @@ -628,6 +649,8 @@ def _block_ack(self): raise SdoCommunicationError( ("%d of %d sequences were received. " "Retransmission is not supported yet.") % (ackseq, self._blksize)) + logger.debug("All %d sequences were received successfully", ackseq) + logger.debug("Server requested a block size of %d", blksize) self._blksize = blksize self._seqno = 0 @@ -635,16 +658,8 @@ def close(self): """Closes the stream.""" super(BlockDownloadStream, self).close() if not self._done: - # This might happen if size is unknown or an error occurred - self._seqno += 1 - request = bytearray(8) - request[0] = self._seqno | NO_MORE_BLOCKS - request[1:len(self._unconsumed) + 1] = self._unconsumed - self.sdo_client.send_request(request) - self._done = True - self._last_bytes_sent = len(self._unconsumed) - self._blksize = self._seqno - self._block_ack() + # Send an empty sequence with end flag + self.send(b"", end=True) command = REQUEST_BLOCK_DOWNLOAD | END_BLOCK_DOWNLOAD # Specify number of bytes in last message that did not contain data command |= (7 - self._last_bytes_sent) << 2 @@ -652,10 +667,12 @@ def close(self): request[0] = command # Add CRC struct.pack_into(" Date: Mon, 1 May 2017 21:06:21 +0200 Subject: [PATCH 4/6] Fix issue when .close is called many times --- canopen/sdo.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/canopen/sdo.py b/canopen/sdo.py index 9d9a803b..730b52cb 100644 --- a/canopen/sdo.py +++ b/canopen/sdo.py @@ -579,6 +579,8 @@ def __init__(self, sdo_client, index, subindex=0, size=None): "on the same SDO channel?").format(res_index, res_subindex)) self._blksize, = struct.unpack_from("B", response, 4) logger.debug("Server requested a block size of %d", self._blksize) + if res_command & CRC_SUPPORTED: + logger.debug("The server supports CRC verification") def write(self, b): """ @@ -656,6 +658,8 @@ def _block_ack(self): def close(self): """Closes the stream.""" + if self.closed: + return super(BlockDownloadStream, self).close() if not self._done: # Send an empty sequence with end flag @@ -671,8 +675,8 @@ def close(self): response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if not res_command & END_BLOCK_DOWNLOAD: - raise SdoCommunicationError("SDO block download unsuccessful") - logger.info("Block transfer successful") + raise SdoCommunicationError("Block download unsuccessful") + logger.info("Block download successful") def writable(self): return True From c66e79f0d23e96298afa99b94542dd4b2d51d55c Mon Sep 17 00:00:00 2001 From: Christian Sandberg Date: Thu, 4 May 2017 21:12:14 +0200 Subject: [PATCH 5/6] Implement block upload --- canopen/sdo.py | 149 ++++++++++++++++++++++++++++++++++++++++++++++- test/test_sdo.py | 18 ++++++ 2 files changed, 166 insertions(+), 1 deletion(-) diff --git a/canopen/sdo.py b/canopen/sdo.py index 730b52cb..e3e1f5c1 100644 --- a/canopen/sdo.py +++ b/canopen/sdo.py @@ -26,6 +26,7 @@ REQUEST_DOWNLOAD = 1 << 5 REQUEST_UPLOAD = 2 << 5 REQUEST_SEGMENT_UPLOAD = 3 << 5 +REQUEST_BLOCK_UPLOAD = 5 << 5 REQUEST_BLOCK_DOWNLOAD = 6 << 5 RESPONSE_SEGMENT_UPLOAD = 0 << 5 @@ -34,6 +35,12 @@ RESPONSE_DOWNLOAD = 3 << 5 RESPONSE_ABORTED = 4 << 5 RESPONSE_BLOCK_DOWNLOAD = 5 << 5 +RESPONSE_BLOCK_UPLOAD = 6 << 5 + +INITIATE_BLOCK_UPLOAD = 0 +END_BLOCK_UPLOAD = 1 +BLOCK_UPLOAD_RESPONSE = 2 +START_BLOCK_UPLOAD = 3 EXPEDITED = 0x2 SIZE_SPECIFIED = 0x1 @@ -300,7 +307,8 @@ def open(self, mode="rb", encoding="ascii", buffering=8192, size=None, buffer_size = buffering if buffering > 1 else io.DEFAULT_BUFFER_SIZE if "r" in mode: if block_transfer: - raise NotImplementedError("Block upload not supported") + raw_stream = BlockUploadStream( + self.sdo_node, self.od.index, self.od.subindex) else: raw_stream = ReadableStream( self.sdo_node, self.od.index, self.od.subindex) @@ -537,6 +545,145 @@ def writable(self): return True +class BlockUploadStream(io.RawIOBase): + """File like object for reading from a variable using block upload.""" + + #: Total size of data or ``None`` if not specified + size = None + + blksize = 127 + + crc_supported = False + + def __init__(self, sdo_client, index, subindex=0): + """ + :param canopen.sdo.SdoClient sdo_client: + The SDO client to use for reading. + :param int index: + Object dictionary index to read from. + :param int subindex: + Object dictionary sub-index to read from. + """ + self._done = False + self.sdo_client = sdo_client + self._crc = 0 + self._server_crc = None + self._ackseq = 0 + + logger.debug("Reading 0x%X:%d from node %d", index, subindex, + sdo_client.rx_cobid - 0x600) + # Initiate Block Upload + request = bytearray(8) + command = REQUEST_BLOCK_UPLOAD | INITIATE_BLOCK_UPLOAD | CRC_SUPPORTED + struct.pack_into("= self.blksize or res_command & NO_MORE_BLOCKS: + self._ack_block() + if res_command & NO_MORE_BLOCKS: + n = self._end_upload() + data = response[1:8 - n] + self._done = True + else: + data = response[1:8] + if self.crc_supported: + self._crc = binascii.crc_hqx(data, self._crc) + if self._done: + if self._server_crc != self._crc: + raise SdoCommunicationError("CRC is not OK") + logger.info("CRC is OK") + return data + + def _retransmit(self): + end_time = time.time() + self.sdo_client.RESPONSE_TIMEOUT + self._ack_block() + while time.time() < end_time: + response = self.sdo_client.read_response() + res_command, = struct.unpack_from("B", response) + seqno = res_command & 0x7F + if seqno == self._ackseq + 1: + # We should be back in sync + self._ackseq = seqno + return + raise SdoCommunicationError("Some data were lost and could not be retransmitted") + + def _ack_block(self): + command = REQUEST_BLOCK_UPLOAD | BLOCK_UPLOAD_RESPONSE + request = bytearray(8) + struct.pack_into("BBB", request, 0, command, self._ackseq, self.blksize) + self.sdo_client.send_request(request) + if self._ackseq == self.blksize: + self._ackseq = 0 + + def _end_upload(self): + response = self.sdo_client.read_response() + res_command, crc = struct.unpack_from("> 2) & 0x7 + + def readinto(self, b): + """ + Read bytes into a pre-allocated, writable bytes-like object b, + and return the number of bytes read. + """ + data = self.read(7) + b[:len(data)] = data + return len(data) + + def readable(self): + return True + + class BlockDownloadStream(io.RawIOBase): """File like object for block download.""" diff --git a/test/test_sdo.py b/test/test_sdo.py index a354b10c..3f02b50d 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -107,6 +107,24 @@ def test_block_download(self): fp.write(data) fp.close() + def test_block_upload(self): + self.data = [ + (TX, b'\xa4\x08\x10\x00\x7f\x00\x00\x00'), + (RX, b'\xc6\x08\x10\x00\x1a\x00\x00\x00'), + (TX, b'\xa3\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x01\x54\x69\x6e\x79\x20\x4e\x6f'), + (RX, b'\x02\x64\x65\x20\x2d\x20\x4d\x65'), + (RX, b'\x03\x67\x61\x20\x44\x6f\x6d\x61'), + (RX, b'\x84\x69\x6e\x73\x20\x21\x00\x00'), + (TX, b'\xa2\x04\x7f\x00\x00\x00\x00\x00'), + (RX, b'\xc9\x40\xe1\x00\x00\x00\x00\x00'), + (TX, b'\xa1\x00\x00\x00\x00\x00\x00\x00') + ] + fp = self.network[2].sdo[0x1008].open('r', block_transfer=True) + data = fp.read() + fp.close() + self.assertEqual(data, 'Tiny Node - Mega Domains !') + def test_writable_file(self): self.data = [ (TX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), From db0cef359d1d12599a17cfcbdffb036572303f92 Mon Sep 17 00:00:00 2001 From: Christian Sandberg Date: Wed, 17 May 2017 13:54:13 +0200 Subject: [PATCH 6/6] SDO block transfer refactoring Start sending some abort codes from client on errors Add block transfer to docs --- README.rst | 1 - canopen/sdo.py | 119 ++++++++++++++++++++++++++++++++----------------- doc/sdo.rst | 33 ++++++++++++-- 3 files changed, 108 insertions(+), 45 deletions(-) diff --git a/README.rst b/README.rst index dc823e3a..43d2d5eb 100644 --- a/README.rst +++ b/README.rst @@ -126,7 +126,6 @@ Pull requests are most welcome! * More unit test coverage * Period transmits using python-can cyclic API -* SDO block transfer * XDD support diff --git a/canopen/sdo.py b/canopen/sdo.py index e3e1f5c1..4d18c928 100644 --- a/canopen/sdo.py +++ b/canopen/sdo.py @@ -26,6 +26,7 @@ REQUEST_DOWNLOAD = 1 << 5 REQUEST_UPLOAD = 2 << 5 REQUEST_SEGMENT_UPLOAD = 3 << 5 +REQUEST_ABORTED = 4 << 5 REQUEST_BLOCK_UPLOAD = 5 << 5 REQUEST_BLOCK_DOWNLOAD = 6 << 5 @@ -37,9 +38,9 @@ RESPONSE_BLOCK_DOWNLOAD = 5 << 5 RESPONSE_BLOCK_UPLOAD = 6 << 5 -INITIATE_BLOCK_UPLOAD = 0 -END_BLOCK_UPLOAD = 1 -BLOCK_UPLOAD_RESPONSE = 2 +INITIATE_BLOCK_TRANSFER = 0 +END_BLOCK_TRANSFER = 1 +BLOCK_TRANSFER_RESPONSE = 2 START_BLOCK_UPLOAD = 3 EXPEDITED = 0x2 @@ -48,7 +49,6 @@ CRC_SUPPORTED = 0x4 NO_MORE_DATA = 0x1 NO_MORE_BLOCKS = 0x80 -END_BLOCK_DOWNLOAD = 0x1 TOGGLE_BIT = 0x10 @@ -81,17 +81,16 @@ def on_response(self, can_id, data, timestamp): def send_request(self, request): retries_left = self.MAX_RETRIES - while retries_left: + while True: try: self.network.send_message(self.rx_cobid, request) except can.CanError as e: # Could be a buffer overflow. Wait some time before trying again + retries_left -= 1 + if not retries_left: + raise logger.info(str(e)) time.sleep(0.1) - if retries_left: - retries_left -= 1 - else: - raise else: break @@ -112,17 +111,25 @@ def request_response(self, sdo_request): if not self.responses.empty(): #logger.warning("There were unexpected messages in the queue") self.responses = queue.Queue() - while retries_left: + while True: self.send_request(sdo_request) # Wait for node to respond try: return self.read_response() except SdoCommunicationError as e: - logger.warning(str(e)) - if retries_left: - retries_left -= 1 - else: + retries_left -= 1 + if not retries_left: raise + logger.warning(str(e)) + + def abort(self, abort_code=0x08000000): + """Abort current transfer.""" + request = bytearray(8) + request[0] = REQUEST_ABORTED + # TODO: Is it necessary to include index and subindex? + struct.pack_into("= self.blksize or res_command & NO_MORE_BLOCKS: self._ack_block() if res_command & NO_MORE_BLOCKS: @@ -631,11 +642,15 @@ def read(self, size=-1): self._crc = binascii.crc_hqx(data, self._crc) if self._done: if self._server_crc != self._crc: + self.sdo_client.abort(0x05040004) raise SdoCommunicationError("CRC is not OK") logger.info("CRC is OK") + self.pos += len(data) return data def _retransmit(self): + logger.info("Only %d sequences were received. Requesting retransmission", + self._ackseq) end_time = time.time() + self.sdo_client.RESPONSE_TIMEOUT self._ack_block() while time.time() < end_time: @@ -645,32 +660,42 @@ def _retransmit(self): if seqno == self._ackseq + 1: # We should be back in sync self._ackseq = seqno - return + return response raise SdoCommunicationError("Some data were lost and could not be retransmitted") def _ack_block(self): - command = REQUEST_BLOCK_UPLOAD | BLOCK_UPLOAD_RESPONSE request = bytearray(8) - struct.pack_into("BBB", request, 0, command, self._ackseq, self.blksize) + request[0] = REQUEST_BLOCK_UPLOAD | BLOCK_TRANSFER_RESPONSE + request[1] = self._ackseq + request[2] = self.blksize self.sdo_client.send_request(request) if self._ackseq == self.blksize: self._ackseq = 0 def _end_upload(self): response = self.sdo_client.read_response() - res_command, crc = struct.unpack_from("> 2) & 0x7 + def close(self): + if self.closed: + return + super(BlockUploadStream, self).close() + if self._done: + request = bytearray(8) + request[0] = REQUEST_BLOCK_UPLOAD | END_BLOCK_TRANSFER + self.sdo_client.send_request(request) + + def tell(self): + return self.pos + def readinto(self, b): """ Read bytes into a pre-allocated, writable bytes-like object b, @@ -705,29 +730,32 @@ def __init__(self, sdo_client, index, subindex=0, size=None): self._seqno = 0 self._crc = 0 self._last_bytes_sent = 0 - command = REQUEST_BLOCK_DOWNLOAD | CRC_SUPPORTED + command = REQUEST_BLOCK_DOWNLOAD | INITIATE_BLOCK_TRANSFER | CRC_SUPPORTED request = bytearray(8) logger.info("Initiating block download for 0x%X:%d", index, subindex) if size is not None: logger.debug("Expected size of data is %d bytes", size) command |= BLOCK_SIZE_SPECIFIED struct.pack_into("= self._blksize: # End of this block, wait for ACK self._block_ack() @@ -792,9 +823,15 @@ def _block_ack(self): response = self.sdo_client.read_response() res_command, ackseq, blksize = struct.unpack_from("BBB", response) if res_command & 0xE0 != RESPONSE_BLOCK_DOWNLOAD: + self.sdo_client.abort(0x05040001) raise SdoCommunicationError( "Unexpected response 0x%02X" % res_command) + if res_command & 0x3 != BLOCK_TRANSFER_RESPONSE: + self.sdo_client.abort(0x05040001) + raise SdoCommunicationError("Server did not respond with a " + "block download response") if ackseq != self._blksize: + self.sdo_client.abort(0x05040003) raise SdoCommunicationError( ("%d of %d sequences were received. " "Retransmission is not supported yet.") % (ackseq, self._blksize)) @@ -809,19 +846,19 @@ def close(self): return super(BlockDownloadStream, self).close() if not self._done: - # Send an empty sequence with end flag - self.send(b"", end=True) - command = REQUEST_BLOCK_DOWNLOAD | END_BLOCK_DOWNLOAD + logger.error("Block transfer was not finished") + command = REQUEST_BLOCK_DOWNLOAD | END_BLOCK_TRANSFER # Specify number of bytes in last message that did not contain data command |= (7 - self._last_bytes_sent) << 2 request = bytearray(8) request[0] = command - # Add CRC - struct.pack_into("