From 3181b04a896504495b2491594d373b65fcf54a27 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Mon, 8 Jul 2024 16:02:56 +0200 Subject: [PATCH 1/5] Add tests for NMT base --- test/test_nmt.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/test/test_nmt.py b/test/test_nmt.py index 50db1187..e4bb607b 100644 --- a/test/test_nmt.py +++ b/test/test_nmt.py @@ -2,9 +2,46 @@ import unittest import canopen +from canopen.nmt import NMT_STATES, NMT_COMMANDS from .util import SAMPLE_EDS +class TestNmtBase(unittest.TestCase): + def setUp(self): + node_id = 2 + self.node_id = node_id + self.nmt = canopen.nmt.NmtBase(node_id) + + def test_send_command(self): + dataset = ( + "OPERATIONAL", + "PRE-OPERATIONAL", + "SLEEP", + "STANDBY", + "STOPPED", + ) + for cmd in dataset: + with self.subTest(cmd=cmd): + code = NMT_COMMANDS[cmd] + self.nmt.send_command(code) + self.assertNotEqual(self.nmt.state, "INITIALISING") + + def test_state_getset(self): + for state in NMT_STATES.values(): + with self.subTest(state=state): + self.nmt.state = state + self.assertEqual(self.nmt.state, state) + + def test_state_set_invalid(self): + with self.assertRaisesRegex(ValueError, "INVALID"): + self.nmt.state = "INVALID" + + def test_state_get_invalid(self): + # This is a known bug; it will be changed in gh-500. + self.nmt._state = 255 + self.assertEqual(self.nmt.state, 255) + + class TestNmtSlave(unittest.TestCase): def setUp(self): self.network1 = canopen.Network() From aae8f4f7c63d355361fad03fd95cf230593f5274 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Tue, 9 Jul 2024 00:49:46 +0200 Subject: [PATCH 2/5] Add tests for NMT master --- test/test_nmt.py | 95 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 87 insertions(+), 8 deletions(-) diff --git a/test/test_nmt.py b/test/test_nmt.py index e4bb607b..5acda27d 100644 --- a/test/test_nmt.py +++ b/test/test_nmt.py @@ -1,6 +1,7 @@ import time import unittest +import can import canopen from canopen.nmt import NMT_STATES, NMT_COMMANDS from .util import SAMPLE_EDS @@ -36,23 +37,101 @@ def test_state_set_invalid(self): with self.assertRaisesRegex(ValueError, "INVALID"): self.nmt.state = "INVALID" - def test_state_get_invalid(self): - # This is a known bug; it will be changed in gh-500. - self.nmt._state = 255 - self.assertEqual(self.nmt.state, 255) + +class TestNmtMaster(unittest.TestCase): + NODE_ID = 2 + COB_ID = 0x700 + NODE_ID + PERIOD = 0.01 + TIMEOUT = PERIOD * 2 + + def setUp(self): + bus = can.ThreadSafeBus( + interface="virtual", + channel="test", + receive_own_messages=True, + ) + net = canopen.Network(bus) + net.connect() + with self.assertLogs(): + node = net.add_node(self.NODE_ID, SAMPLE_EDS) + + self.bus = bus + self.net = net + self.node = node + + def tearDown(self): + self.net.disconnect() + + def test_nmt_master_no_heartbeat(self): + with self.assertRaisesRegex(canopen.nmt.NmtError, "heartbeat"): + self.node.nmt.wait_for_heartbeat(self.TIMEOUT) + with self.assertRaisesRegex(canopen.nmt.NmtError, "boot-up"): + self.node.nmt.wait_for_bootup(self.TIMEOUT) + + def test_nmt_master_on_heartbeat(self): + # Skip the special INITIALISING case. + for code in [st for st in NMT_STATES if st != 0]: + with self.subTest(code=code): + data = bytes([code]) + task = self.net.send_periodic(self.COB_ID, data, self.PERIOD) + self.addCleanup(task.stop) + actual = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) + task.stop() + expected = NMT_STATES[code] + self.assertEqual(actual, expected) + + def test_nmt_master_on_heartbeat_initialising(self): + task = self.net.send_periodic(self.COB_ID, b"\x00", self.PERIOD) + self.addCleanup(task.stop) + self.node.nmt.wait_for_bootup(self.TIMEOUT) + state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) + self.assertEqual(state, "PRE-OPERATIONAL") + + def test_nmt_master_on_heartbeat_unknown_state(self): + task = self.net.send_periodic(self.COB_ID, b"\xcb", self.PERIOD) + self.addCleanup(task.stop) + state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) + # Expect the high bit to be masked out, and the resulting integer + # returned as it is. See gh-500 for the data type inconsistency. + self.assertEqual(state, 0x4b) + + def test_nmt_master_add_heartbeat_callback(self): + from threading import Event + event = Event() + state = None + def hook(st): + nonlocal state + state = st + event.set() + self.node.nmt.add_heartbeat_callback(hook) + self.net.send_message(self.COB_ID, bytes([127])) + self.assertTrue(event.wait(self.TIMEOUT)) + self.assertEqual(state, 127) + + def test_nmt_master_node_guarding(self): + self.node.nmt.start_node_guarding(self.PERIOD) + msg = self.bus.recv(self.TIMEOUT) + self.assertIsNotNone(msg) + self.assertEqual(msg.arbitration_id, self.COB_ID) + self.assertEqual(msg.dlc, 0) + + self.node.nmt.stop_node_guarding() + self.assertIsNone(self.bus.recv(self.TIMEOUT)) class TestNmtSlave(unittest.TestCase): def setUp(self): self.network1 = canopen.Network() self.network1.connect("test", interface="virtual") - self.remote_node = self.network1.add_node(2, SAMPLE_EDS) + with self.assertLogs(): + self.remote_node = self.network1.add_node(2, SAMPLE_EDS) self.network2 = canopen.Network() self.network2.connect("test", interface="virtual") - self.local_node = self.network2.create_node(2, SAMPLE_EDS) - self.remote_node2 = self.network1.add_node(3, SAMPLE_EDS) - self.local_node2 = self.network2.create_node(3, SAMPLE_EDS) + with self.assertLogs(): + self.local_node = self.network2.create_node(2, SAMPLE_EDS) + self.remote_node2 = self.network1.add_node(3, SAMPLE_EDS) + self.local_node2 = self.network2.create_node(3, SAMPLE_EDS) def tearDown(self): self.network1.disconnect() From dde2abc8f46c5fe4c11ee4e92fc5f8350068be0d Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Tue, 9 Jul 2024 01:15:10 +0200 Subject: [PATCH 3/5] Simplify --- test/test_nmt.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/test_nmt.py b/test/test_nmt.py index 5acda27d..5a88a659 100644 --- a/test/test_nmt.py +++ b/test/test_nmt.py @@ -72,8 +72,7 @@ def test_nmt_master_on_heartbeat(self): # Skip the special INITIALISING case. for code in [st for st in NMT_STATES if st != 0]: with self.subTest(code=code): - data = bytes([code]) - task = self.net.send_periodic(self.COB_ID, data, self.PERIOD) + task = self.net.send_periodic(self.COB_ID, [code], self.PERIOD) self.addCleanup(task.stop) actual = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) task.stop() @@ -81,14 +80,14 @@ def test_nmt_master_on_heartbeat(self): self.assertEqual(actual, expected) def test_nmt_master_on_heartbeat_initialising(self): - task = self.net.send_periodic(self.COB_ID, b"\x00", self.PERIOD) + task = self.net.send_periodic(self.COB_ID, [0], self.PERIOD) self.addCleanup(task.stop) self.node.nmt.wait_for_bootup(self.TIMEOUT) state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) self.assertEqual(state, "PRE-OPERATIONAL") def test_nmt_master_on_heartbeat_unknown_state(self): - task = self.net.send_periodic(self.COB_ID, b"\xcb", self.PERIOD) + task = self.net.send_periodic(self.COB_ID, [0xcb], self.PERIOD) self.addCleanup(task.stop) state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) # Expect the high bit to be masked out, and the resulting integer From c2ff667bbb06a4fb2b6a10d5703ce5df211d29f9 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Tue, 9 Jul 2024 09:57:38 +0200 Subject: [PATCH 4/5] Address review --- test/test_nmt.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/test/test_nmt.py b/test/test_nmt.py index 5a88a659..44d30664 100644 --- a/test/test_nmt.py +++ b/test/test_nmt.py @@ -3,7 +3,7 @@ import can import canopen -from canopen.nmt import NMT_STATES, NMT_COMMANDS +from canopen.nmt import NMT_STATES, NMT_COMMANDS, NmtError from .util import SAMPLE_EDS @@ -63,9 +63,9 @@ def tearDown(self): self.net.disconnect() def test_nmt_master_no_heartbeat(self): - with self.assertRaisesRegex(canopen.nmt.NmtError, "heartbeat"): + with self.assertRaisesRegex(NmtError, "heartbeat"): self.node.nmt.wait_for_heartbeat(self.TIMEOUT) - with self.assertRaisesRegex(canopen.nmt.NmtError, "boot-up"): + with self.assertRaisesRegex(NmtError, "boot-up"): self.node.nmt.wait_for_bootup(self.TIMEOUT) def test_nmt_master_on_heartbeat(self): @@ -73,9 +73,12 @@ def test_nmt_master_on_heartbeat(self): for code in [st for st in NMT_STATES if st != 0]: with self.subTest(code=code): task = self.net.send_periodic(self.COB_ID, [code], self.PERIOD) - self.addCleanup(task.stop) - actual = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) - task.stop() + try: + actual = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) + except NmtError: + self.fail("Timed out waiting for heartbeat") + finally: + task.stop() expected = NMT_STATES[code] self.assertEqual(actual, expected) @@ -86,13 +89,14 @@ def test_nmt_master_on_heartbeat_initialising(self): state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) self.assertEqual(state, "PRE-OPERATIONAL") + @unittest.expectedFailure def test_nmt_master_on_heartbeat_unknown_state(self): task = self.net.send_periodic(self.COB_ID, [0xcb], self.PERIOD) self.addCleanup(task.stop) state = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) - # Expect the high bit to be masked out, and the resulting integer - # returned as it is. See gh-500 for the data type inconsistency. - self.assertEqual(state, 0x4b) + # Expect the high bit to be masked out, and and unknown state string to + # be returned. + self.assertEqual(state, "UNKNOWN STATE '75'") def test_nmt_master_add_heartbeat_callback(self): from threading import Event From 6fa2986d6253b47315e96cf8f7cee93e5dd84298 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Tue, 9 Jul 2024 12:10:30 +0200 Subject: [PATCH 5/5] Harden test_send_command() and relay NmtError exceptions --- test/test_nmt.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/test_nmt.py b/test/test_nmt.py index 44d30664..c24e8d96 100644 --- a/test/test_nmt.py +++ b/test/test_nmt.py @@ -3,7 +3,7 @@ import can import canopen -from canopen.nmt import NMT_STATES, NMT_COMMANDS, NmtError +from canopen.nmt import COMMAND_TO_STATE, NMT_STATES, NMT_COMMANDS, NmtError from .util import SAMPLE_EDS @@ -25,7 +25,8 @@ def test_send_command(self): with self.subTest(cmd=cmd): code = NMT_COMMANDS[cmd] self.nmt.send_command(code) - self.assertNotEqual(self.nmt.state, "INITIALISING") + expected = NMT_STATES[COMMAND_TO_STATE[code]] + self.assertEqual(self.nmt.state, expected) def test_state_getset(self): for state in NMT_STATES.values(): @@ -75,8 +76,6 @@ def test_nmt_master_on_heartbeat(self): task = self.net.send_periodic(self.COB_ID, [code], self.PERIOD) try: actual = self.node.nmt.wait_for_heartbeat(self.TIMEOUT) - except NmtError: - self.fail("Timed out waiting for heartbeat") finally: task.stop() expected = NMT_STATES[code]