-
Notifications
You must be signed in to change notification settings - Fork 635
Keep Track of Active Notifiers. Make Notifier usable as ContextManager. #1890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
328435d
bcd4778
2b8154a
6fc0d7d
e35610a
f9095a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,7 +7,20 @@ | |||||
import logging | ||||||
import threading | ||||||
import time | ||||||
from typing import Any, Awaitable, Callable, Iterable, List, Optional, Union | ||||||
from contextlib import AbstractContextManager | ||||||
from types import TracebackType | ||||||
from typing import ( | ||||||
Any, | ||||||
Awaitable, | ||||||
Callable, | ||||||
Final, | ||||||
Iterable, | ||||||
List, | ||||||
NamedTuple, | ||||||
Optional, | ||||||
Type, | ||||||
Union, | ||||||
) | ||||||
|
||||||
from can.bus import BusABC | ||||||
from can.listener import Listener | ||||||
|
@@ -18,7 +31,85 @@ | |||||
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]] | ||||||
|
||||||
|
||||||
class Notifier: | ||||||
class _BusNotifierPair(NamedTuple): | ||||||
bus: "BusABC" | ||||||
notifier: "Notifier" | ||||||
|
||||||
|
||||||
class _NotifierRegistry: | ||||||
"""A registry to manage the association between CAN buses and Notifiers. | ||||||
|
||||||
This class ensures that a bus is not added to multiple active Notifiers. | ||||||
""" | ||||||
|
||||||
def __init__(self) -> None: | ||||||
"""Initialize the registry with an empty list of bus-notifier pairs and a threading lock.""" | ||||||
self.pairs: List[_BusNotifierPair] = [] | ||||||
self.lock = threading.Lock() | ||||||
|
||||||
def register(self, bus: BusABC, notifier: "Notifier") -> None: | ||||||
"""Register a bus and its associated notifier. | ||||||
|
||||||
Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances. | ||||||
|
||||||
:param bus: | ||||||
The CAN bus to register. | ||||||
:param notifier: | ||||||
The :class:`~can.Notifier` instance associated with the bus. | ||||||
:raises ValueError: | ||||||
If the bus is already assigned to an active Notifier. | ||||||
""" | ||||||
with self.lock: | ||||||
for pair in self.pairs: | ||||||
if bus is pair.bus and not pair.notifier.stopped: | ||||||
raise ValueError( | ||||||
"A bus can not be added to multiple active Notifier instances." | ||||||
) | ||||||
self.pairs.append(_BusNotifierPair(bus, notifier)) | ||||||
|
||||||
def unregister(self, bus: BusABC, notifier: "Notifier") -> None: | ||||||
"""Unregister a bus and its associated notifier. | ||||||
|
||||||
Removes the bus-notifier pair from the registry. | ||||||
|
||||||
:param bus: | ||||||
The CAN bus to unregister. | ||||||
:param notifier: | ||||||
The :class:`~can.Notifier` instance associated with the bus. | ||||||
""" | ||||||
with self.lock: | ||||||
registered_pairs_to_remove: List[_BusNotifierPair] = [] | ||||||
for pair in self.pairs: | ||||||
if pair.bus is bus and pair.notifier is notifier: | ||||||
registered_pairs_to_remove.append(pair) | ||||||
for pair in registered_pairs_to_remove: | ||||||
self.pairs.remove(pair) | ||||||
|
||||||
def find_instance(self, bus: BusABC) -> Optional["Notifier"]: | ||||||
"""Find the :class:`~can.Notifier` instance associated with a given CAN bus. | ||||||
|
||||||
This method searches the registry for the :class:`~can.Notifier` | ||||||
that is linked to the specified bus. If the bus is found, the | ||||||
corresponding :class:`~can.Notifier` instance is returned. If the bus is not | ||||||
found in the registry, `None` is returned. | ||||||
|
||||||
:param bus: | ||||||
The CAN bus for which to find the associated :class:`~can.Notifier` . | ||||||
:return: | ||||||
The :class:`~can.Notifier` instance associated with the given bus, | ||||||
or `None` if no such association exists. | ||||||
""" | ||||||
with self.lock: | ||||||
for pair in self.pairs: | ||||||
if bus is pair.bus: | ||||||
return pair.notifier | ||||||
return None | ||||||
|
||||||
|
||||||
class Notifier(AbstractContextManager): | ||||||
|
||||||
_registry: Final = _NotifierRegistry() | ||||||
|
||||||
def __init__( | ||||||
self, | ||||||
bus: Union[BusABC, List[BusABC]], | ||||||
|
@@ -32,16 +123,21 @@ def __init__( | |||||
|
||||||
.. Note:: | ||||||
|
||||||
Remember to call `stop()` after all messages are received as | ||||||
Remember to call :meth:`~can.Notifier.stop` after all messages are received as | ||||||
many listeners carry out flush operations to persist data. | ||||||
|
||||||
|
||||||
:param bus: A :ref:`bus` or a list of buses to listen to. | ||||||
:param bus: | ||||||
A :ref:`bus` or a list of buses to listen to. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
:param listeners: | ||||||
An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message` | ||||||
and return nothing. | ||||||
:param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`. | ||||||
:param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. | ||||||
:param timeout: | ||||||
An optional maximum number of seconds to wait for any :class:`~can.Message`. | ||||||
:param loop: | ||||||
An :mod:`asyncio` event loop to schedule the ``listeners`` in. | ||||||
:raises ValueError: | ||||||
If the *bus* is already assigned to an active :class:`~can.Notifier`. | ||||||
zariiii9003 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
""" | ||||||
self.listeners: List[MessageRecipient] = list(listeners) | ||||||
self.bus = bus | ||||||
|
@@ -51,31 +147,36 @@ def __init__( | |||||
#: Exception raised in thread | ||||||
self.exception: Optional[Exception] = None | ||||||
|
||||||
self._running = True | ||||||
self._stopped = False | ||||||
self._lock = threading.Lock() | ||||||
|
||||||
self._readers: List[Union[int, threading.Thread]] = [] | ||||||
buses = self.bus if isinstance(self.bus, list) else [self.bus] | ||||||
for each_bus in buses: | ||||||
self._bus_list = self.bus if isinstance(self.bus, list) else [self.bus] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. The same was true for |
||||||
for each_bus in self._bus_list: | ||||||
self.add_bus(each_bus) | ||||||
|
||||||
def add_bus(self, bus: BusABC) -> None: | ||||||
"""Add a bus for notification. | ||||||
|
||||||
:param bus: | ||||||
CAN bus instance. | ||||||
:raises ValueError: | ||||||
If the *bus* is already assigned to an active :class:`~can.Notifier`. | ||||||
""" | ||||||
reader: int = -1 | ||||||
# add bus to notifier registry | ||||||
self._registry.register(bus, self) | ||||||
zariiii9003 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
file_descriptor: int = -1 | ||||||
try: | ||||||
reader = bus.fileno() | ||||||
file_descriptor = bus.fileno() | ||||||
zariiii9003 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
except NotImplementedError: | ||||||
# Bus doesn't support fileno, we fall back to thread based reader | ||||||
pass | ||||||
|
||||||
if self._loop is not None and reader >= 0: | ||||||
if self._loop is not None and file_descriptor >= 0: | ||||||
# Use bus file descriptor to watch for messages | ||||||
self._loop.add_reader(reader, self._on_message_available, bus) | ||||||
self._readers.append(reader) | ||||||
self._loop.add_reader(file_descriptor, self._on_message_available, bus) | ||||||
self._readers.append(file_descriptor) | ||||||
else: | ||||||
reader_thread = threading.Thread( | ||||||
target=self._rx_thread, | ||||||
|
@@ -86,15 +187,15 @@ def add_bus(self, bus: BusABC) -> None: | |||||
reader_thread.start() | ||||||
self._readers.append(reader_thread) | ||||||
|
||||||
def stop(self, timeout: float = 5) -> None: | ||||||
def stop(self, timeout: float = 5.0) -> None: | ||||||
"""Stop notifying Listeners when new :class:`~can.Message` objects arrive | ||||||
and call :meth:`~can.Listener.stop` on each Listener. | ||||||
|
||||||
:param timeout: | ||||||
Max time in seconds to wait for receive threads to finish. | ||||||
Should be longer than timeout given at instantiation. | ||||||
""" | ||||||
self._running = False | ||||||
self._stopped = True | ||||||
end_time = time.time() + timeout | ||||||
for reader in self._readers: | ||||||
if isinstance(reader, threading.Thread): | ||||||
|
@@ -108,6 +209,10 @@ def stop(self, timeout: float = 5) -> None: | |||||
if hasattr(listener, "stop"): | ||||||
listener.stop() | ||||||
|
||||||
# remove bus from registry | ||||||
for bus in self._bus_list: | ||||||
self._registry.unregister(bus, self) | ||||||
|
||||||
def _rx_thread(self, bus: BusABC) -> None: | ||||||
# determine message handling callable early, not inside while loop | ||||||
if self._loop: | ||||||
|
@@ -118,7 +223,7 @@ def _rx_thread(self, bus: BusABC) -> None: | |||||
else: | ||||||
handle_message = self._on_message_received | ||||||
|
||||||
while self._running: | ||||||
while not self._stopped: | ||||||
try: | ||||||
if msg := bus.recv(self.timeout): | ||||||
with self._lock: | ||||||
|
@@ -183,3 +288,34 @@ def remove_listener(self, listener: MessageRecipient) -> None: | |||||
:raises ValueError: if `listener` was never added to this notifier | ||||||
""" | ||||||
self.listeners.remove(listener) | ||||||
|
||||||
@property | ||||||
def stopped(self) -> bool: | ||||||
"""Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`.""" | ||||||
return self._stopped | ||||||
|
||||||
@classmethod | ||||||
def find_instance(cls, bus: BusABC) -> Optional["Notifier"]: | ||||||
"""Find the :class:`~can.Notifier` instance associated with a given CAN bus. | ||||||
|
||||||
This method searches the registry for the :class:`~can.Notifier` | ||||||
that is linked to the specified bus. If the bus is found, the | ||||||
corresponding :class:`~can.Notifier` instance is returned. If the bus is not | ||||||
found in the registry, `None` is returned. | ||||||
|
||||||
:param bus: | ||||||
The CAN bus for which to find the associated :class:`~can.Notifier` . | ||||||
:return: | ||||||
The :class:`~can.Notifier` instance associated with the given bus, | ||||||
or `None` if no such association exists. | ||||||
""" | ||||||
return cls._registry.find_instance(bus) | ||||||
|
||||||
def __exit__( | ||||||
self, | ||||||
exc_type: Optional[Type[BaseException]], | ||||||
exc_value: Optional[BaseException], | ||||||
traceback: Optional[TracebackType], | ||||||
) -> None: | ||||||
if not self._stopped: | ||||||
self.stop() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,8 @@ | |
com0com: http://com0com.sourceforge.net/ | ||
""" | ||
|
||
import time | ||
import threading | ||
import time | ||
|
||
import can | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.