From 982d58ba4aead7e0546afd3c09d43d1565075f92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frieder=20Sch=C3=BCler?= Date: Fri, 17 Feb 2023 15:12:51 +0100 Subject: [PATCH 1/3] Fixed incorrect min (LowLImit) and max (HighLimit) values when using signed integer types. --- canopen/objectdictionary/eds.py | 34 ++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index 872df234..0a5d8f25 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -1,7 +1,9 @@ -import re -import io -import logging import copy +import logging +import re + +from canopen.objectdictionary import datatypes + try: from configparser import RawConfigParser, NoOptionError, NoSectionError except ImportError: @@ -190,6 +192,28 @@ def import_from_node(node_id, network): return od +def _calc_bit_length(data_type: int) -> int: + if data_type == datatypes.INTEGER8: + return 8 + elif data_type == datatypes.INTEGER16: + return 16 + elif data_type == datatypes.INTEGER32: + return 32 + elif data_type == datatypes.INTEGER64: + return 64 + else: + raise ValueError(f"Invalid data_type '{data_type}', expecting an signed integer data_type.") + + +def _signed_int_from_hex(hex_str: str, bit_length: int): + number = int(hex_str, 0) + limit = ((1 << bit_length - 1) - 1) + if number > limit: + return limit - number + else: + return number + + def _convert_variable(node_id, var_type, value): if var_type in (objectdictionary.OCTET_STRING, objectdictionary.DOMAIN): return bytes.fromhex(value) @@ -251,12 +275,12 @@ def build_variable(eds, section, node_id, index, subindex=0): if eds.has_option(section, "LowLimit"): try: - var.min = int(eds.get(section, "LowLimit"), 0) + var.min = _signed_int_from_hex(eds.get(section, "LowLimit"), _calc_bit_length(var.data_type)) except ValueError: pass if eds.has_option(section, "HighLimit"): try: - var.max = int(eds.get(section, "HighLimit"), 0) + var.max = _signed_int_from_hex(eds.get(section, "HighLimit"), _calc_bit_length(var.data_type)) except ValueError: pass if eds.has_option(section, "DefaultValue"): From 4cf240ba1b3747f0a160a955994e16804b554817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frieder=20Sch=C3=BCler?= Date: Fri, 17 Feb 2023 16:30:13 +0100 Subject: [PATCH 2/3] Fixed min/max for non-signed datatypes and added tests --- canopen/objectdictionary/eds.py | 12 ++++- test/sample.eds | 36 +++++++++++++ test/test_eds.py | 92 ++++++++++++++++++++------------- 3 files changed, 101 insertions(+), 39 deletions(-) diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index 0a5d8f25..2518c6f9 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -275,12 +275,20 @@ def build_variable(eds, section, node_id, index, subindex=0): if eds.has_option(section, "LowLimit"): try: - var.min = _signed_int_from_hex(eds.get(section, "LowLimit"), _calc_bit_length(var.data_type)) + min_string = eds.get(section, "LowLimit") + if var.data_type in objectdictionary.SIGNED_TYPES: + var.min = _signed_int_from_hex(min_string, _calc_bit_length(var.data_type)) + else: + var.min = int(min_string, 0) except ValueError: pass if eds.has_option(section, "HighLimit"): try: - var.max = _signed_int_from_hex(eds.get(section, "HighLimit"), _calc_bit_length(var.data_type)) + max_string = eds.get(section, "HighLimit") + if var.data_type in objectdictionary.SIGNED_TYPES: + var.max = _signed_int_from_hex(max_string, _calc_bit_length(var.data_type)) + else: + var.max = int(max_string, 0) except ValueError: pass if eds.has_option(section, "DefaultValue"): diff --git a/test/sample.eds b/test/sample.eds index bea6b9c3..671a559e 100644 --- a/test/sample.eds +++ b/test/sample.eds @@ -902,3 +902,39 @@ DataType=0x0008 AccessType=ro DefaultValue=0 PDOMapping=1 + +[3020] +ParameterName=INTEGER8 only positive values +ObjectType=0x7 +DataType=0x02 +AccessType=rw +HighLimit=0x7F +LowLimit=0x00 +PDOMapping=0 + +[3021] +ParameterName=UNSIGNED8 value range +2 to +10 +ObjectType=0x7 +DataType=0x05 +AccessType=rw +HighLimit=0x0A +LowLimit=0x02 +PDOMapping=0 + +[3030] +ParameterName=INTEGER32 only negative values +ObjectType=0x7 +DataType=0x04 +AccessType=rw +HighLimit=0x00000000 +LowLimit=0xFFFFFFFF +PDOMapping=0 + +[3040] +ParameterName=INTEGER64 value range -10 to +10 +ObjectType=0x7 +DataType=0x15 +AccessType=rw +HighLimit=0x000000000000000A +LowLimit=0x8000000000000009 +PDOMapping=0 diff --git a/test/test_eds.py b/test/test_eds.py index e5f6c89e..2a6d5098 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -4,6 +4,7 @@ EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') + class TestEDS(unittest.TestCase): def setUp(self): @@ -47,6 +48,20 @@ def test_record(self): self.assertEqual(var.data_type, canopen.objectdictionary.UNSIGNED32) self.assertEqual(var.access_type, 'ro') + def test_record_with_limits(self): + int8 = self.od[0x3020] + self.assertEqual(int8.min, 0) + self.assertEqual(int8.max, 127) + uint8 = self.od[0x3021] + self.assertEqual(uint8.min, 2) + self.assertEqual(uint8.max, 10) + int32 = self.od[0x3030] + self.assertEqual(int32.min, -2147483648) + self.assertEqual(int32.max, 0) + int64 = self.od[0x3040] + self.assertEqual(int64.min, -10) + self.assertEqual(int64.max, +10) + def test_array_compact_subobj(self): array = self.od[0x1003] self.assertIsInstance(array, canopen.objectdictionary.Array) @@ -98,18 +113,16 @@ def test_dummy_variable_undefined(self): def test_comments(self): self.assertEqual(self.od.comments, -""" + """ |-------------| | Don't panic | |-------------| -""".strip() - ) - +""".strip()) def test_export_eds(self): import tempfile for doctype in {"eds", "dcf"}: - with tempfile.NamedTemporaryFile(suffix="."+doctype, mode="w+") as tempeds: + with tempfile.NamedTemporaryFile(suffix="." + doctype, mode="w+") as tempeds: print("exporting %s to " % doctype + tempeds.name) canopen.export_od(self.od, tempeds, doc_type=doctype) tempeds.flush() @@ -117,54 +130,59 @@ def test_export_eds(self): for index in exported_od: self.assertIn(exported_od[index].name, self.od) - self.assertIn(index , self.od) + self.assertIn(index, self.od) for index in self.od: if index < 0x0008: # ignore dummies continue self.assertIn(self.od[index].name, exported_od) - self.assertIn(index , exported_od) + self.assertIn(index, exported_od) - actual_object = exported_od[index] - expected_object = self.od[index] + actual_object = exported_od[index] + expected_object = self.od[index] self.assertEqual(type(actual_object), type(expected_object)) self.assertEqual(actual_object.name, expected_object.name) if type(actual_object) is canopen.objectdictionary.Variable: expected_vars = [expected_object] - actual_vars = [actual_object ] - else : + actual_vars = [actual_object] + else: expected_vars = [expected_object[idx] for idx in expected_object] - actual_vars = [actual_object [idx] for idx in actual_object] + actual_vars = [actual_object[idx] for idx in actual_object] for prop in [ - "allowed_baudrates", - "vendor_name", - "vendor_number", - "product_name", - "product_number", - "revision_number", - "order_code", - "simple_boot_up_master", - "simple_boot_up_slave", - "granularity", - "dynamic_channels_supported", - "group_messaging", - "nr_of_RXPDO", - "nr_of_TXPDO", - "LSS_supported", + "allowed_baudrates", + "vendor_name", + "vendor_number", + "product_name", + "product_number", + "revision_number", + "order_code", + "simple_boot_up_master", + "simple_boot_up_slave", + "granularity", + "dynamic_channels_supported", + "group_messaging", + "nr_of_RXPDO", + "nr_of_TXPDO", + "LSS_supported", ]: - self.assertEqual(getattr(self.od.device_information, prop), getattr(exported_od.device_information, prop), f"prop {prop!r} mismatch on DeviceInfo") - - - for evar,avar in zip(expected_vars,actual_vars): - self. assertEqual(getattr(avar, "data_type" , None) , getattr(evar,"data_type" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) - self. assertEqual(getattr(avar, "default_raw", None) , getattr(evar,"default_raw",None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) - self. assertEqual(getattr(avar, "min" , None) , getattr(evar,"min" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) - self. assertEqual(getattr(avar, "max" , None) , getattr(evar,"max" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) + self.assertEqual(getattr(self.od.device_information, prop), + getattr(exported_od.device_information, prop), + f"prop {prop!r} mismatch on DeviceInfo") + + for evar, avar in zip(expected_vars, actual_vars): + self.assertEqual(getattr(avar, "data_type", None), getattr(evar, "data_type", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "default_raw", None), getattr(evar, "default_raw", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "min", None), getattr(evar, "min", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "max", None), getattr(evar, "max", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) if doctype == "dcf": - self.assertEqual(getattr(avar, "value" , None) , getattr(evar,"value" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "value", None), getattr(evar, "value", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) self.assertEqual(self.od.comments, exported_od.comments) - From 544a07dd820ea1794345e206da5bdd6ee1131a2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frieder=20Sch=C3=BCler?= Date: Wed, 22 Feb 2023 16:14:38 +0100 Subject: [PATCH 3/3] Removed type hints --- canopen/objectdictionary/eds.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index 2518c6f9..c1c54d78 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -192,7 +192,7 @@ def import_from_node(node_id, network): return od -def _calc_bit_length(data_type: int) -> int: +def _calc_bit_length(data_type): if data_type == datatypes.INTEGER8: return 8 elif data_type == datatypes.INTEGER16: @@ -202,10 +202,10 @@ def _calc_bit_length(data_type: int) -> int: elif data_type == datatypes.INTEGER64: return 64 else: - raise ValueError(f"Invalid data_type '{data_type}', expecting an signed integer data_type.") + raise ValueError(f"Invalid data_type '{data_type}', expecting a signed integer data_type.") -def _signed_int_from_hex(hex_str: str, bit_length: int): +def _signed_int_from_hex(hex_str, bit_length): number = int(hex_str, 0) limit = ((1 << bit_length - 1) - 1) if number > limit: