Skip to content

Commit 94f07d2

Browse files
committed
implement NotifierRegistry
1 parent 9a766ce commit 94f07d2

File tree

1 file changed

+106
-17
lines changed

1 file changed

+106
-17
lines changed

can/notifier.py

Lines changed: 106 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import logging
88
import threading
99
import time
10-
from typing import Any, Awaitable, Callable, Iterable, List, Optional, Union
10+
import typing
11+
from contextlib import AbstractContextManager
12+
from types import TracebackType
13+
from typing import Any, Awaitable, Callable, Iterable, List, NamedTuple, Optional, Union
1114

1215
from can.bus import BusABC
1316
from can.listener import Listener
@@ -18,7 +21,65 @@
1821
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]
1922

2023

21-
class Notifier:
24+
class _BusNotifierPair(NamedTuple):
25+
bus: "BusABC"
26+
notifier: "Notifier"
27+
28+
29+
class _NotifierRegistry:
30+
"""A registry to manage the association between CAN buses and Notifiers.
31+
32+
This class ensures that a bus is not added to multiple active Notifiers.
33+
"""
34+
35+
def __init__(self) -> None:
36+
"""Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
37+
self.pairs: typing.List[_BusNotifierPair] = []
38+
self.lock = threading.Lock()
39+
40+
def register(self, bus: BusABC, notifier: "Notifier") -> None:
41+
"""Register a bus and its associated notifier.
42+
43+
Ensures that a bus is not added to multiple active Notifier instances.
44+
45+
:param bus:
46+
The CAN bus to register.
47+
:param notifier:
48+
The Notifier instance associated with the bus.
49+
:raises ValueError:
50+
If the bus is already assigned to an active Notifier.
51+
"""
52+
with self.lock:
53+
for pair in self.pairs:
54+
if bus is pair.bus and not pair.notifier.stopped:
55+
raise ValueError(
56+
"A bus can not be added to multiple active Notifier instances."
57+
)
58+
self.pairs.append(_BusNotifierPair(bus, notifier))
59+
60+
def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
61+
"""Unregister a bus and its associated notifier.
62+
63+
Removes the bus-notifier pair from the registry.
64+
65+
:param bus:
66+
The CAN bus to unregister.
67+
:param notifier:
68+
The Notifier instance associated with the bus.
69+
"""
70+
with self.lock:
71+
registered_pairs_to_remove: typing.List[_BusNotifierPair] = []
72+
for pair in self.pairs:
73+
if pair.bus is bus and pair.notifier is notifier:
74+
registered_pairs_to_remove.append(pair)
75+
for pair in registered_pairs_to_remove:
76+
self.pairs.remove(pair)
77+
78+
79+
class Notifier(AbstractContextManager):
80+
81+
_registry: typing.Final = _NotifierRegistry()
82+
2283
def __init__(
2384
self,
2485
bus: Union[BusABC, List[BusABC]],
@@ -32,16 +93,21 @@ def __init__(
3293
3394
.. Note::
3495
35-
Remember to call `stop()` after all messages are received as
96+
Remember to call :meth:`~can.Notifier.stop` after all messages are received as
3697
many listeners carry out flush operations to persist data.
3798
3899
39-
:param bus: A :ref:`bus` or a list of buses to listen to.
100+
:param bus:
101+
A :ref:`bus` or a list of buses to listen to.
40102
:param listeners:
41103
An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
42104
and return nothing.
43-
:param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`.
44-
:param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in.
105+
:param timeout:
106+
An optional maximum number of seconds to wait for any :class:`~can.Message`.
107+
:param loop:
108+
An :mod:`asyncio` event loop to schedule the ``listeners`` in.
109+
:raises ValueError:
110+
If the *bus* is already assigned to an active :class:`~can.Notifier`.
45111
"""
46112
self.listeners: List[MessageRecipient] = list(listeners)
47113
self.bus = bus
@@ -51,31 +117,36 @@ def __init__(
51117
#: Exception raised in thread
52118
self.exception: Optional[Exception] = None
53119

54-
self._running = True
120+
self._stopped = False
55121
self._lock = threading.Lock()
56122

57123
self._readers: List[Union[int, threading.Thread]] = []
58-
buses = self.bus if isinstance(self.bus, list) else [self.bus]
59-
for each_bus in buses:
124+
self._bus_list = self.bus if isinstance(self.bus, list) else [self.bus]
125+
for each_bus in self._bus_list:
60126
self.add_bus(each_bus)
61127

62128
def add_bus(self, bus: BusABC) -> None:
63129
"""Add a bus for notification.
64130
65131
:param bus:
66132
CAN bus instance.
133+
:raises ValueError:
134+
If the *bus* is already assigned to an active :class:`~can.Notifier`.
67135
"""
68-
reader: int = -1
136+
# add bus to notifier registry
137+
self._registry.register(bus, self)
138+
139+
file_descriptor: int = -1
69140
try:
70-
reader = bus.fileno()
141+
file_descriptor = bus.fileno()
71142
except NotImplementedError:
72143
# Bus doesn't support fileno, we fall back to thread based reader
73144
pass
74145

75-
if self._loop is not None and reader >= 0:
146+
if self._loop is not None and file_descriptor >= 0:
76147
# Use bus file descriptor to watch for messages
77-
self._loop.add_reader(reader, self._on_message_available, bus)
78-
self._readers.append(reader)
148+
self._loop.add_reader(file_descriptor, self._on_message_available, bus)
149+
self._readers.append(file_descriptor)
79150
else:
80151
reader_thread = threading.Thread(
81152
target=self._rx_thread,
@@ -86,15 +157,15 @@ def add_bus(self, bus: BusABC) -> None:
86157
reader_thread.start()
87158
self._readers.append(reader_thread)
88159

89-
def stop(self, timeout: float = 5) -> None:
160+
def stop(self, timeout: float = 5.0) -> None:
90161
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive
91162
and call :meth:`~can.Listener.stop` on each Listener.
92163
93164
:param timeout:
94165
Max time in seconds to wait for receive threads to finish.
95166
Should be longer than timeout given at instantiation.
96167
"""
97-
self._running = False
168+
self._stopped = True
98169
end_time = time.time() + timeout
99170
for reader in self._readers:
100171
if isinstance(reader, threading.Thread):
@@ -108,6 +179,10 @@ def stop(self, timeout: float = 5) -> None:
108179
if hasattr(listener, "stop"):
109180
listener.stop()
110181

182+
# remove bus from registry
183+
for bus in self._bus_list:
184+
self._registry.unregister(bus, self)
185+
111186
def _rx_thread(self, bus: BusABC) -> None:
112187
# determine message handling callable early, not inside while loop
113188
if self._loop:
@@ -118,7 +193,7 @@ def _rx_thread(self, bus: BusABC) -> None:
118193
else:
119194
handle_message = self._on_message_received
120195

121-
while self._running:
196+
while not self._stopped:
122197
try:
123198
if msg := bus.recv(self.timeout):
124199
with self._lock:
@@ -183,3 +258,17 @@ def remove_listener(self, listener: MessageRecipient) -> None:
183258
:raises ValueError: if `listener` was never added to this notifier
184259
"""
185260
self.listeners.remove(listener)
261+
262+
@property
263+
def stopped(self) -> bool:
264+
"""Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`."""
265+
return self._stopped
266+
267+
def __exit__(
268+
self,
269+
exc_type: Optional[typing.Type[BaseException]],
270+
exc_value: Optional[BaseException],
271+
traceback: Optional[TracebackType],
272+
) -> None:
273+
if not self._stopped:
274+
self.stop()

0 commit comments

Comments
 (0)