Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 38 additions & 14 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import unittest
from threading import Event
import threading

import canopen
import can
Expand Down Expand Up @@ -231,34 +231,58 @@ def test_network_send_periodic(self):
DATA2 = bytes([4, 5, 6])
COB_ID = 0x123
PERIOD = 0.1
TIMEOUT = PERIOD * 10
self.network.connect(interface="virtual", receive_own_messages=True)
self.addCleanup(self.network.disconnect)

acc = []
event = Event()
condition = threading.Condition()

def hook(_, data, ts):
acc.append((data, ts))
event.set()
with condition:
item = data, ts
acc.append(item)
condition.notify_all()

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

event.wait(PERIOD*2)

# Update task data.
def periodicity():
# Check if periodicity is established; flakiness has been observed
# on macOS.
if len(acc) >= 2:
delta = acc[-1][1] - acc[-2][1]
return round(delta, ndigits=1) == PERIOD
return False

# Wait for frames to arrive; then check the result.
with condition:
condition.wait_for(periodicity, TIMEOUT)
self.assertTrue(all(v[0] == DATA1 for v in acc))

# Update task data, which may implicitly restart the timer.
# Wait for frames to arrive; then check the result.
task.update(DATA2)
event.clear()
event.wait(PERIOD*2)
task.stop()

with condition:
acc.clear()
condition.wait_for(periodicity, TIMEOUT)
# Find the first message with new data, and verify that all subsequent
# messages also carry the new payload.
data = [v[0] for v in acc]
self.assertEqual(data, [DATA1, DATA2])
ts = [v[1] for v in acc]
self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1)
idx = data.index(DATA2)
self.assertTrue(all(v[0] == DATA2 for v in acc[idx:]))

# Stop the task.
task.stop()
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
bus = self.network.bus
msg = bus.recv(TIMEOUT)
if msg is not None:
self.assertIsNone(bus.recv(TIMEOUT))


class TestScanner(unittest.TestCase):
Expand Down
6 changes: 5 additions & 1 deletion test/test_nmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def test_nmt_master_node_guarding(self):
self.assertEqual(msg.dlc, 0)

self.node.nmt.stop_node_guarding()
self.assertIsNone(self.bus.recv(self.TIMEOUT))
# A message may have been in flight when we stopped the timer,
# so allow a single failure.
msg = self.bus.recv(self.TIMEOUT)
if msg is not None:
self.assertIsNone(self.bus.recv(self.TIMEOUT))


class TestNmtSlave(unittest.TestCase):
Expand Down