Skip to content

Commit dcc33a7

Browse files
authored
Keep Track of Active Notifiers, Make Notifier usable as ContextManager (#1890)
1 parent 2b1f6f6 commit dcc33a7

File tree

9 files changed

+234
-63
lines changed

9 files changed

+234
-63
lines changed

can/notifier.py

Lines changed: 160 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,16 @@
88
import threading
99
import time
1010
from collections.abc import Awaitable, Iterable
11-
from typing import Any, Callable, Optional, Union
11+
from contextlib import AbstractContextManager
12+
from types import TracebackType
13+
from typing import (
14+
Any,
15+
Callable,
16+
Final,
17+
NamedTuple,
18+
Optional,
19+
Union,
20+
)
1221

1322
from can.bus import BusABC
1423
from can.listener import Listener
@@ -19,7 +28,85 @@
1928
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]
2029

2130

22-
class Notifier:
31+
class _BusNotifierPair(NamedTuple):
32+
bus: "BusABC"
33+
notifier: "Notifier"
34+
35+
36+
class _NotifierRegistry:
37+
"""A registry to manage the association between CAN buses and Notifiers.
38+
39+
This class ensures that a bus is not added to multiple active Notifiers.
40+
"""
41+
42+
def __init__(self) -> None:
43+
"""Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
44+
self.pairs: list[_BusNotifierPair] = []
45+
self.lock = threading.Lock()
46+
47+
def register(self, bus: BusABC, notifier: "Notifier") -> None:
48+
"""Register a bus and its associated notifier.
49+
50+
Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances.
51+
52+
:param bus:
53+
The CAN bus to register.
54+
:param notifier:
55+
The :class:`~can.Notifier` instance associated with the bus.
56+
:raises ValueError:
57+
If the bus is already assigned to an active Notifier.
58+
"""
59+
with self.lock:
60+
for pair in self.pairs:
61+
if bus is pair.bus and not pair.notifier.stopped:
62+
raise ValueError(
63+
"A bus can not be added to multiple active Notifier instances."
64+
)
65+
self.pairs.append(_BusNotifierPair(bus, notifier))
66+
67+
def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
68+
"""Unregister a bus and its associated notifier.
69+
70+
Removes the bus-notifier pair from the registry.
71+
72+
:param bus:
73+
The CAN bus to unregister.
74+
:param notifier:
75+
The :class:`~can.Notifier` instance associated with the bus.
76+
"""
77+
with self.lock:
78+
registered_pairs_to_remove: list[_BusNotifierPair] = []
79+
for pair in self.pairs:
80+
if pair.bus is bus and pair.notifier is notifier:
81+
registered_pairs_to_remove.append(pair)
82+
for pair in registered_pairs_to_remove:
83+
self.pairs.remove(pair)
84+
85+
def find_instances(self, bus: BusABC) -> tuple["Notifier", ...]:
86+
"""Find the :class:`~can.Notifier` instances associated with a given CAN bus.
87+
88+
This method searches the registry for the :class:`~can.Notifier`
89+
that is linked to the specified bus. If the bus is found, the
90+
corresponding :class:`~can.Notifier` instances are returned. If the bus is not
91+
found in the registry, an empty tuple is returned.
92+
93+
:param bus:
94+
The CAN bus for which to find the associated :class:`~can.Notifier` .
95+
:return:
96+
A tuple of :class:`~can.Notifier` instances associated with the given bus.
97+
"""
98+
instance_list = []
99+
with self.lock:
100+
for pair in self.pairs:
101+
if bus is pair.bus:
102+
instance_list.append(pair.notifier)
103+
return tuple(instance_list)
104+
105+
106+
class Notifier(AbstractContextManager):
107+
108+
_registry: Final = _NotifierRegistry()
109+
23110
def __init__(
24111
self,
25112
bus: Union[BusABC, list[BusABC]],
@@ -33,69 +120,89 @@ def __init__(
33120
34121
.. Note::
35122
36-
Remember to call `stop()` after all messages are received as
123+
Remember to call :meth:`~can.Notifier.stop` after all messages are received as
37124
many listeners carry out flush operations to persist data.
38125
39126
40-
:param bus: A :ref:`bus` or a list of buses to listen to.
127+
:param bus:
128+
A :ref:`bus` or a list of buses to consume messages from.
41129
:param listeners:
42130
An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
43131
and return nothing.
44-
:param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`.
45-
:param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in.
132+
:param timeout:
133+
An optional maximum number of seconds to wait for any :class:`~can.Message`.
134+
:param loop:
135+
An :mod:`asyncio` event loop to schedule the ``listeners`` in.
136+
:raises ValueError:
137+
If a passed in *bus* is already assigned to an active :class:`~can.Notifier`.
46138
"""
47139
self.listeners: list[MessageRecipient] = list(listeners)
48-
self.bus = bus
140+
self._bus_list: list[BusABC] = []
49141
self.timeout = timeout
50142
self._loop = loop
51143

52144
#: Exception raised in thread
53145
self.exception: Optional[Exception] = None
54146

55-
self._running = True
147+
self._stopped = False
56148
self._lock = threading.Lock()
57149

58150
self._readers: list[Union[int, threading.Thread]] = []
59-
buses = self.bus if isinstance(self.bus, list) else [self.bus]
60-
for each_bus in buses:
151+
_bus_list: list[BusABC] = bus if isinstance(bus, list) else [bus]
152+
for each_bus in _bus_list:
61153
self.add_bus(each_bus)
62154

155+
@property
156+
def bus(self) -> Union[BusABC, tuple["BusABC", ...]]:
157+
"""Return the associated bus or a tuple of buses."""
158+
if len(self._bus_list) == 1:
159+
return self._bus_list[0]
160+
return tuple(self._bus_list)
161+
63162
def add_bus(self, bus: BusABC) -> None:
64163
"""Add a bus for notification.
65164
66165
:param bus:
67166
CAN bus instance.
167+
:raises ValueError:
168+
If the *bus* is already assigned to an active :class:`~can.Notifier`.
68169
"""
69-
reader: int = -1
170+
# add bus to notifier registry
171+
Notifier._registry.register(bus, self)
172+
173+
# add bus to internal bus list
174+
self._bus_list.append(bus)
175+
176+
file_descriptor: int = -1
70177
try:
71-
reader = bus.fileno()
178+
file_descriptor = bus.fileno()
72179
except NotImplementedError:
73180
# Bus doesn't support fileno, we fall back to thread based reader
74181
pass
75182

76-
if self._loop is not None and reader >= 0:
183+
if self._loop is not None and file_descriptor >= 0:
77184
# Use bus file descriptor to watch for messages
78-
self._loop.add_reader(reader, self._on_message_available, bus)
79-
self._readers.append(reader)
185+
self._loop.add_reader(file_descriptor, self._on_message_available, bus)
186+
self._readers.append(file_descriptor)
80187
else:
81188
reader_thread = threading.Thread(
82189
target=self._rx_thread,
83190
args=(bus,),
84-
name=f'can.notifier for bus "{bus.channel_info}"',
191+
name=f'{self.__class__.__qualname__} for bus "{bus.channel_info}"',
85192
)
86193
reader_thread.daemon = True
87194
reader_thread.start()
88195
self._readers.append(reader_thread)
89196

90-
def stop(self, timeout: float = 5) -> None:
197+
def stop(self, timeout: float = 5.0) -> None:
91198
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive
92199
and call :meth:`~can.Listener.stop` on each Listener.
93200
94201
:param timeout:
95202
Max time in seconds to wait for receive threads to finish.
96203
Should be longer than timeout given at instantiation.
97204
"""
98-
self._running = False
205+
self._stopped = True
99206
end_time = time.time() + timeout
100207
for reader in self._readers:
101208
if isinstance(reader, threading.Thread):
@@ -109,6 +216,10 @@ def stop(self, timeout: float = 5) -> None:
109216
if hasattr(listener, "stop"):
110217
listener.stop()
111218

219+
# remove bus from registry
220+
for bus in self._bus_list:
221+
Notifier._registry.unregister(bus, self)
222+
112223
def _rx_thread(self, bus: BusABC) -> None:
113224
# determine message handling callable early, not inside while loop
114225
if self._loop:
@@ -119,7 +230,7 @@ def _rx_thread(self, bus: BusABC) -> None:
119230
else:
120231
handle_message = self._on_message_received
121232

122-
while self._running:
233+
while not self._stopped:
123234
try:
124235
if msg := bus.recv(self.timeout):
125236
with self._lock:
@@ -184,3 +295,33 @@ def remove_listener(self, listener: MessageRecipient) -> None:
184295
:raises ValueError: if `listener` was never added to this notifier
185296
"""
186297
self.listeners.remove(listener)
298+
299+
@property
300+
def stopped(self) -> bool:
301+
"""Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`."""
302+
return self._stopped
303+
304+
@staticmethod
305+
def find_instances(bus: BusABC) -> tuple["Notifier", ...]:
306+
"""Find :class:`~can.Notifier` instances associated with a given CAN bus.
307+
308+
This method searches the registry for the :class:`~can.Notifier`
309+
that is linked to the specified bus. If the bus is found, the
310+
corresponding :class:`~can.Notifier` instances are returned. If the bus is not
311+
found in the registry, an empty tuple is returned.
312+
313+
:param bus:
314+
The CAN bus for which to find the associated :class:`~can.Notifier` .
315+
:return:
316+
A tuple of :class:`~can.Notifier` instances associated with the given bus.
317+
"""
318+
return Notifier._registry.find_instances(bus)
319+
320+
def __exit__(
321+
self,
322+
exc_type: Optional[type[BaseException]],
323+
exc_value: Optional[BaseException],
324+
traceback: Optional[TracebackType],
325+
) -> None:
326+
if not self._stopped:
327+
self.stop()

examples/asyncio_demo.py

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
"""
66

77
import asyncio
8-
from typing import List
8+
from typing import TYPE_CHECKING
99

1010
import can
11-
from can.notifier import MessageRecipient
11+
12+
if TYPE_CHECKING:
13+
from can.notifier import MessageRecipient
1214

1315

1416
def print_message(msg: can.Message) -> None:
@@ -25,32 +27,28 @@ async def main() -> None:
2527
reader = can.AsyncBufferedReader()
2628
logger = can.Logger("logfile.asc")
2729

28-
listeners: List[MessageRecipient] = [
30+
listeners: list[MessageRecipient] = [
2931
print_message, # Callback function
3032
reader, # AsyncBufferedReader() listener
3133
logger, # Regular Listener object
3234
]
3335
# Create Notifier with an explicit loop to use for scheduling of callbacks
34-
loop = asyncio.get_running_loop()
35-
notifier = can.Notifier(bus, listeners, loop=loop)
36-
# Start sending first message
37-
bus.send(can.Message(arbitration_id=0))
38-
39-
print("Bouncing 10 messages...")
40-
for _ in range(10):
41-
# Wait for next message from AsyncBufferedReader
42-
msg = await reader.get_message()
43-
# Delay response
44-
await asyncio.sleep(0.5)
45-
msg.arbitration_id += 1
46-
bus.send(msg)
47-
48-
# Wait for last message to arrive
49-
await reader.get_message()
50-
print("Done!")
51-
52-
# Clean-up
53-
notifier.stop()
36+
with can.Notifier(bus, listeners, loop=asyncio.get_running_loop()):
37+
# Start sending first message
38+
bus.send(can.Message(arbitration_id=0))
39+
40+
print("Bouncing 10 messages...")
41+
for _ in range(10):
42+
# Wait for next message from AsyncBufferedReader
43+
msg = await reader.get_message()
44+
# Delay response
45+
await asyncio.sleep(0.5)
46+
msg.arbitration_id += 1
47+
bus.send(msg)
48+
49+
# Wait for last message to arrive
50+
await reader.get_message()
51+
print("Done!")
5452

5553

5654
if __name__ == "__main__":

examples/cyclic_checksum.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,5 @@ def compute_xbr_checksum(message: can.Message, counter: int) -> int:
5959

6060
if __name__ == "__main__":
6161
with can.Bus(channel=0, interface="virtual", receive_own_messages=True) as _bus:
62-
notifier = can.Notifier(bus=_bus, listeners=[print])
63-
cyclic_checksum_send(_bus)
64-
notifier.stop()
62+
with can.Notifier(bus=_bus, listeners=[print]):
63+
cyclic_checksum_send(_bus)

examples/print_notifier.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88
def main():
99
with can.Bus(interface="virtual", receive_own_messages=True) as bus:
1010
print_listener = can.Printer()
11-
notifier = can.Notifier(bus, [print_listener])
12-
13-
bus.send(can.Message(arbitration_id=1, is_extended_id=True))
14-
bus.send(can.Message(arbitration_id=2, is_extended_id=True))
15-
bus.send(can.Message(arbitration_id=1, is_extended_id=False))
16-
17-
time.sleep(1.0)
18-
notifier.stop()
11+
with can.Notifier(bus, listeners=[print_listener]):
12+
# using Notifier as a context manager automatically calls `Notifier.stop()`
13+
# at the end of the `with` block
14+
bus.send(can.Message(arbitration_id=1, is_extended_id=True))
15+
bus.send(can.Message(arbitration_id=2, is_extended_id=True))
16+
bus.send(can.Message(arbitration_id=1, is_extended_id=False))
17+
time.sleep(1.0)
1918

2019

2120
if __name__ == "__main__":

examples/send_multiple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
This demo creates multiple processes of producers to spam a socketcan bus.
55
"""
66

7-
from time import sleep
87
from concurrent.futures import ProcessPoolExecutor
8+
from time import sleep
99

1010
import can
1111

examples/serial_com.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
com0com: http://com0com.sourceforge.net/
1919
"""
2020

21-
import time
2221
import threading
22+
import time
2323

2424
import can
2525

0 commit comments

Comments
 (0)