Skip to content

Commit 33a1ec7

Browse files
authored
Fix CI failures (#1898)
* implement join_threads helper method * simplify network_test.py * update tox.ini
1 parent 805f3fb commit 33a1ec7

File tree

3 files changed

+71
-76
lines changed

3 files changed

+71
-76
lines changed

test/network_test.py

Lines changed: 39 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@
66
import unittest
77

88
import can
9+
from test.config import IS_PYPY
910

1011
logging.getLogger(__file__).setLevel(logging.WARNING)
1112

1213

1314
# make a random bool:
1415
def rbool():
15-
return bool(round(random.random()))
16-
17-
18-
channel = "vcan0"
16+
return random.choice([False, True])
1917

2018

2119
class ControllerAreaNetworkTestCase(unittest.TestCase):
@@ -51,74 +49,51 @@ def tearDown(self):
5149
# Restore the defaults
5250
can.rc = self._can_rc
5351

54-
def producer(self, ready_event, msg_read):
55-
self.client_bus = can.interface.Bus(channel=channel)
56-
ready_event.wait()
57-
for i in range(self.num_messages):
58-
m = can.Message(
59-
arbitration_id=self.ids[i],
60-
is_remote_frame=self.remote_flags[i],
61-
is_error_frame=self.error_flags[i],
62-
is_extended_id=self.extended_flags[i],
63-
data=self.data[i],
64-
)
65-
# logging.debug("writing message: {}".format(m))
66-
if msg_read is not None:
67-
# Don't send until the other thread is ready
68-
msg_read.wait()
69-
msg_read.clear()
70-
71-
self.client_bus.send(m)
52+
def producer(self, channel: str):
53+
with can.interface.Bus(channel=channel) as client_bus:
54+
for i in range(self.num_messages):
55+
m = can.Message(
56+
arbitration_id=self.ids[i],
57+
is_remote_frame=self.remote_flags[i],
58+
is_error_frame=self.error_flags[i],
59+
is_extended_id=self.extended_flags[i],
60+
data=self.data[i],
61+
)
62+
client_bus.send(m)
7263

7364
def testProducer(self):
7465
"""Verify that we can send arbitrary messages on the bus"""
7566
logging.debug("testing producer alone")
76-
ready = threading.Event()
77-
ready.set()
78-
self.producer(ready, None)
79-
67+
self.producer(channel="testProducer")
8068
logging.debug("producer test complete")
8169

8270
def testProducerConsumer(self):
8371
logging.debug("testing producer/consumer")
84-
ready = threading.Event()
85-
msg_read = threading.Event()
86-
87-
self.server_bus = can.interface.Bus(channel=channel, interface="virtual")
88-
89-
t = threading.Thread(target=self.producer, args=(ready, msg_read))
90-
t.start()
91-
92-
# Ensure there are no messages on the bus
93-
while True:
94-
m = self.server_bus.recv(timeout=0.5)
95-
if m is None:
96-
print("No messages... lets go")
97-
break
98-
else:
99-
self.fail("received messages before the test has started ...")
100-
ready.set()
101-
i = 0
102-
while i < self.num_messages:
103-
msg_read.set()
104-
msg = self.server_bus.recv(timeout=0.5)
105-
self.assertIsNotNone(msg, "Didn't receive a message")
106-
# logging.debug("Received message {} with data: {}".format(i, msg.data))
107-
108-
self.assertEqual(msg.is_extended_id, self.extended_flags[i])
109-
if not msg.is_remote_frame:
110-
self.assertEqual(msg.data, self.data[i])
111-
self.assertEqual(msg.arbitration_id, self.ids[i])
112-
113-
self.assertEqual(msg.is_error_frame, self.error_flags[i])
114-
self.assertEqual(msg.is_remote_frame, self.remote_flags[i])
115-
116-
i += 1
117-
t.join()
118-
119-
with contextlib.suppress(NotImplementedError):
120-
self.server_bus.flush_tx_buffer()
121-
self.server_bus.shutdown()
72+
read_timeout = 2.0 if IS_PYPY else 0.5
73+
channel = "testProducerConsumer"
74+
75+
with can.interface.Bus(channel=channel, interface="virtual") as server_bus:
76+
t = threading.Thread(target=self.producer, args=(channel,))
77+
t.start()
78+
79+
i = 0
80+
while i < self.num_messages:
81+
msg = server_bus.recv(timeout=read_timeout)
82+
self.assertIsNotNone(msg, "Didn't receive a message")
83+
84+
self.assertEqual(msg.is_extended_id, self.extended_flags[i])
85+
if not msg.is_remote_frame:
86+
self.assertEqual(msg.data, self.data[i])
87+
self.assertEqual(msg.arbitration_id, self.ids[i])
88+
89+
self.assertEqual(msg.is_error_frame, self.error_flags[i])
90+
self.assertEqual(msg.is_remote_frame, self.remote_flags[i])
91+
92+
i += 1
93+
t.join()
94+
95+
with contextlib.suppress(NotImplementedError):
96+
server_bus.flush_tx_buffer()
12297

12398

12499
if __name__ == "__main__":

test/simplecyclic_test.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
"""
66

77
import gc
8+
import sys
89
import time
10+
import traceback
911
import unittest
12+
from threading import Thread
1013
from time import sleep
1114
from typing import List
1215
from unittest.mock import MagicMock
@@ -87,6 +90,8 @@ def test_removing_bus_tasks(self):
8790
# Note calling task.stop will remove the task from the Bus's internal task management list
8891
task.stop()
8992

93+
self.join_threads([task.thread for task in tasks], 5.0)
94+
9095
assert len(bus._periodic_tasks) == 0
9196
bus.shutdown()
9297

@@ -115,8 +120,7 @@ def test_managed_tasks(self):
115120
for task in tasks:
116121
task.stop()
117122

118-
for task in tasks:
119-
assert task.thread.join(5.0) is None, "Task didn't stop before timeout"
123+
self.join_threads([task.thread for task in tasks], 5.0)
120124

121125
bus.shutdown()
122126

@@ -142,9 +146,7 @@ def test_stopping_perodic_tasks(self):
142146

143147
# stop the other half using the bus api
144148
bus.stop_all_periodic_tasks(remove_tasks=False)
145-
146-
for task in tasks:
147-
assert task.thread.join(5.0) is None, "Task didn't stop before timeout"
149+
self.join_threads([task.thread for task in tasks], 5.0)
148150

149151
# Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should
150152
# still be associated with the bus (e.g. for restarting)
@@ -161,7 +163,7 @@ def test_restart_perodic_tasks(self):
161163
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
162164
)
163165

164-
def _read_all_messages(_bus: can.interfaces.virtual.VirtualBus) -> None:
166+
def _read_all_messages(_bus: "can.interfaces.virtual.VirtualBus") -> None:
165167
sleep(safe_timeout)
166168
while not _bus.queue.empty():
167169
_bus.recv(timeout=period)
@@ -207,9 +209,8 @@ def _read_all_messages(_bus: can.interfaces.virtual.VirtualBus) -> None:
207209

208210
# Stop all tasks and wait for the thread to exit
209211
bus.stop_all_periodic_tasks()
210-
if isinstance(task, can.broadcastmanager.ThreadBasedCyclicSendTask):
211-
# Avoids issues where the thread is still running when the bus is shutdown
212-
task.thread.join(safe_timeout)
212+
# Avoids issues where the thread is still running when the bus is shutdown
213+
self.join_threads([task.thread], 5.0)
213214

214215
@unittest.skipIf(IS_CI, "fails randomly when run on CI server")
215216
def test_thread_based_cyclic_send_task(self):
@@ -288,6 +289,27 @@ def increment_first_byte(msg: can.Message) -> None:
288289
self.assertEqual(b"\x06\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[5].data))
289290
self.assertEqual(b"\x07\x00\x00\x00\x00\x00\x00\x00", bytes(msg_list[6].data))
290291

292+
@staticmethod
293+
def join_threads(threads: List[Thread], timeout: float) -> None:
294+
stuck_threads: List[Thread] = []
295+
t0 = time.perf_counter()
296+
for thread in threads:
297+
time_left = timeout - (time.perf_counter() - t0)
298+
if time_left > 0.0:
299+
thread.join(time_left)
300+
if thread.is_alive():
301+
if platform.python_implementation() == "CPython":
302+
# print thread frame to help with debugging
303+
frame = sys._current_frames()[thread.ident]
304+
traceback.print_stack(frame, file=sys.stderr)
305+
stuck_threads.append(thread)
306+
if stuck_threads:
307+
err_message = (
308+
f"Threads did not stop within {timeout:.1f} seconds: "
309+
f"[{', '.join([str(t) for t in stuck_threads])}]"
310+
)
311+
raise RuntimeError(err_message)
312+
291313

292314
if __name__ == "__main__":
293315
unittest.main()

tox.ini

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,14 @@ deps =
1111
pyserial~=3.5
1212
parameterized~=0.8
1313
asammdf>=6.0; platform_python_implementation=="CPython" and python_version<"3.13"
14-
pywin32>=305; platform_system=="Windows" and platform_python_implementation=="CPython" and python_version<"3.13"
14+
pywin32>=305; platform_system=="Windows" and platform_python_implementation=="CPython" and python_version<"3.14"
1515

1616
commands =
1717
pytest {posargs}
1818

1919
extras =
2020
canalystii
2121

22-
recreate = True
23-
2422
[testenv:gh]
2523
passenv =
2624
CI

0 commit comments

Comments
 (0)