diff --git a/can/notifier.py b/can/notifier.py index 088f0802e..cf8dd7540 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -7,7 +7,21 @@ import logging import threading import time -from typing import Any, Awaitable, Callable, Iterable, List, Optional, Union +from contextlib import AbstractContextManager +from types import TracebackType +from typing import ( + Any, + Awaitable, + Callable, + Final, + Iterable, + List, + NamedTuple, + Optional, + Tuple, + Type, + Union, +) from can.bus import BusABC from can.listener import Listener @@ -18,7 +32,85 @@ MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]] -class Notifier: +class _BusNotifierPair(NamedTuple): + bus: "BusABC" + notifier: "Notifier" + + +class _NotifierRegistry: + """A registry to manage the association between CAN buses and Notifiers. + + This class ensures that a bus is not added to multiple active Notifiers. + """ + + def __init__(self) -> None: + """Initialize the registry with an empty list of bus-notifier pairs and a threading lock.""" + self.pairs: List[_BusNotifierPair] = [] + self.lock = threading.Lock() + + def register(self, bus: BusABC, notifier: "Notifier") -> None: + """Register a bus and its associated notifier. + + Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances. + + :param bus: + The CAN bus to register. + :param notifier: + The :class:`~can.Notifier` instance associated with the bus. + :raises ValueError: + If the bus is already assigned to an active Notifier. + """ + with self.lock: + for pair in self.pairs: + if bus is pair.bus and not pair.notifier.stopped: + raise ValueError( + "A bus can not be added to multiple active Notifier instances." + ) + self.pairs.append(_BusNotifierPair(bus, notifier)) + + def unregister(self, bus: BusABC, notifier: "Notifier") -> None: + """Unregister a bus and its associated notifier. + + Removes the bus-notifier pair from the registry. + + :param bus: + The CAN bus to unregister. + :param notifier: + The :class:`~can.Notifier` instance associated with the bus. + """ + with self.lock: + registered_pairs_to_remove: List[_BusNotifierPair] = [] + for pair in self.pairs: + if pair.bus is bus and pair.notifier is notifier: + registered_pairs_to_remove.append(pair) + for pair in registered_pairs_to_remove: + self.pairs.remove(pair) + + def find_instances(self, bus: BusABC) -> Tuple["Notifier", ...]: + """Find the :class:`~can.Notifier` instances associated with a given CAN bus. + + This method searches the registry for the :class:`~can.Notifier` + that is linked to the specified bus. If the bus is found, the + corresponding :class:`~can.Notifier` instances are returned. If the bus is not + found in the registry, an empty tuple is returned. + + :param bus: + The CAN bus for which to find the associated :class:`~can.Notifier` . + :return: + A tuple of :class:`~can.Notifier` instances associated with the given bus. + """ + instance_list = [] + with self.lock: + for pair in self.pairs: + if bus is pair.bus: + instance_list.append(pair.notifier) + return tuple(instance_list) + + +class Notifier(AbstractContextManager): + + _registry: Final = _NotifierRegistry() + def __init__( self, bus: Union[BusABC, List[BusABC]], @@ -32,61 +124,81 @@ def __init__( .. Note:: - Remember to call `stop()` after all messages are received as + Remember to call :meth:`~can.Notifier.stop` after all messages are received as many listeners carry out flush operations to persist data. - :param bus: A :ref:`bus` or a list of buses to listen to. + :param bus: + A :ref:`bus` or a list of buses to consume messages from. :param listeners: An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message` and return nothing. - :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`. - :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. + :param timeout: + An optional maximum number of seconds to wait for any :class:`~can.Message`. + :param loop: + An :mod:`asyncio` event loop to schedule the ``listeners`` in. + :raises ValueError: + If a passed in *bus* is already assigned to an active :class:`~can.Notifier`. """ self.listeners: List[MessageRecipient] = list(listeners) - self.bus = bus + self._bus_list: List[BusABC] = [] self.timeout = timeout self._loop = loop #: Exception raised in thread self.exception: Optional[Exception] = None - self._running = True + self._stopped = False self._lock = threading.Lock() self._readers: List[Union[int, threading.Thread]] = [] - buses = self.bus if isinstance(self.bus, list) else [self.bus] - for each_bus in buses: + _bus_list: List[BusABC] = bus if isinstance(bus, list) else [bus] + for each_bus in _bus_list: self.add_bus(each_bus) + @property + def bus(self) -> Union[BusABC, Tuple["BusABC", ...]]: + """Return the associated bus or a tuple of buses.""" + if len(self._bus_list) == 1: + return self._bus_list[0] + return tuple(self._bus_list) + def add_bus(self, bus: BusABC) -> None: """Add a bus for notification. :param bus: CAN bus instance. + :raises ValueError: + If the *bus* is already assigned to an active :class:`~can.Notifier`. """ - reader: int = -1 + # add bus to notifier registry + Notifier._registry.register(bus, self) + + # add bus to internal bus list + self._bus_list.append(bus) + + file_descriptor: int = -1 try: - reader = bus.fileno() + file_descriptor = bus.fileno() except NotImplementedError: # Bus doesn't support fileno, we fall back to thread based reader pass - if self._loop is not None and reader >= 0: + if self._loop is not None and file_descriptor >= 0: # Use bus file descriptor to watch for messages - self._loop.add_reader(reader, self._on_message_available, bus) - self._readers.append(reader) + self._loop.add_reader(file_descriptor, self._on_message_available, bus) + self._readers.append(file_descriptor) else: reader_thread = threading.Thread( target=self._rx_thread, args=(bus,), - name=f'can.notifier for bus "{bus.channel_info}"', + name=f'{self.__class__.__qualname__} for bus "{bus.channel_info}"', ) reader_thread.daemon = True reader_thread.start() self._readers.append(reader_thread) - def stop(self, timeout: float = 5) -> None: + def stop(self, timeout: float = 5.0) -> None: """Stop notifying Listeners when new :class:`~can.Message` objects arrive and call :meth:`~can.Listener.stop` on each Listener. @@ -94,7 +206,7 @@ def stop(self, timeout: float = 5) -> None: Max time in seconds to wait for receive threads to finish. Should be longer than timeout given at instantiation. """ - self._running = False + self._stopped = True end_time = time.time() + timeout for reader in self._readers: if isinstance(reader, threading.Thread): @@ -108,6 +220,10 @@ def stop(self, timeout: float = 5) -> None: if hasattr(listener, "stop"): listener.stop() + # remove bus from registry + for bus in self._bus_list: + Notifier._registry.unregister(bus, self) + def _rx_thread(self, bus: BusABC) -> None: # determine message handling callable early, not inside while loop if self._loop: @@ -118,7 +234,7 @@ def _rx_thread(self, bus: BusABC) -> None: else: handle_message = self._on_message_received - while self._running: + while not self._stopped: try: if msg := bus.recv(self.timeout): with self._lock: @@ -183,3 +299,33 @@ def remove_listener(self, listener: MessageRecipient) -> None: :raises ValueError: if `listener` was never added to this notifier """ self.listeners.remove(listener) + + @property + def stopped(self) -> bool: + """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`.""" + return self._stopped + + @staticmethod + def find_instances(bus: BusABC) -> Tuple["Notifier", ...]: + """Find :class:`~can.Notifier` instances associated with a given CAN bus. + + This method searches the registry for the :class:`~can.Notifier` + that is linked to the specified bus. If the bus is found, the + corresponding :class:`~can.Notifier` instances are returned. If the bus is not + found in the registry, an empty tuple is returned. + + :param bus: + The CAN bus for which to find the associated :class:`~can.Notifier` . + :return: + A tuple of :class:`~can.Notifier` instances associated with the given bus. + """ + return Notifier._registry.find_instances(bus) + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + if not self._stopped: + self.stop() diff --git a/examples/asyncio_demo.py b/examples/asyncio_demo.py index d29f03bc5..c3bfe8df5 100755 --- a/examples/asyncio_demo.py +++ b/examples/asyncio_demo.py @@ -5,10 +5,12 @@ """ import asyncio -from typing import List +from typing import TYPE_CHECKING, List import can -from can.notifier import MessageRecipient + +if TYPE_CHECKING: + from can.notifier import MessageRecipient def print_message(msg: can.Message) -> None: @@ -31,26 +33,22 @@ async def main() -> None: logger, # Regular Listener object ] # Create Notifier with an explicit loop to use for scheduling of callbacks - loop = asyncio.get_running_loop() - notifier = can.Notifier(bus, listeners, loop=loop) - # Start sending first message - bus.send(can.Message(arbitration_id=0)) - - print("Bouncing 10 messages...") - for _ in range(10): - # Wait for next message from AsyncBufferedReader - msg = await reader.get_message() - # Delay response - await asyncio.sleep(0.5) - msg.arbitration_id += 1 - bus.send(msg) - - # Wait for last message to arrive - await reader.get_message() - print("Done!") - - # Clean-up - notifier.stop() + with can.Notifier(bus, listeners, loop=asyncio.get_running_loop()): + # Start sending first message + bus.send(can.Message(arbitration_id=0)) + + print("Bouncing 10 messages...") + for _ in range(10): + # Wait for next message from AsyncBufferedReader + msg = await reader.get_message() + # Delay response + await asyncio.sleep(0.5) + msg.arbitration_id += 1 + bus.send(msg) + + # Wait for last message to arrive + await reader.get_message() + print("Done!") if __name__ == "__main__": diff --git a/examples/cyclic_checksum.py b/examples/cyclic_checksum.py index 3ab6c78ac..763fcd72b 100644 --- a/examples/cyclic_checksum.py +++ b/examples/cyclic_checksum.py @@ -59,6 +59,5 @@ def compute_xbr_checksum(message: can.Message, counter: int) -> int: if __name__ == "__main__": with can.Bus(channel=0, interface="virtual", receive_own_messages=True) as _bus: - notifier = can.Notifier(bus=_bus, listeners=[print]) - cyclic_checksum_send(_bus) - notifier.stop() + with can.Notifier(bus=_bus, listeners=[print]): + cyclic_checksum_send(_bus) diff --git a/examples/print_notifier.py b/examples/print_notifier.py index 8d55ca1dc..e6e11dbec 100755 --- a/examples/print_notifier.py +++ b/examples/print_notifier.py @@ -8,14 +8,13 @@ def main(): with can.Bus(interface="virtual", receive_own_messages=True) as bus: print_listener = can.Printer() - notifier = can.Notifier(bus, [print_listener]) - - bus.send(can.Message(arbitration_id=1, is_extended_id=True)) - bus.send(can.Message(arbitration_id=2, is_extended_id=True)) - bus.send(can.Message(arbitration_id=1, is_extended_id=False)) - - time.sleep(1.0) - notifier.stop() + with can.Notifier(bus, listeners=[print_listener]): + # using Notifier as a context manager automatically calls `Notifier.stop()` + # at the end of the `with` block + bus.send(can.Message(arbitration_id=1, is_extended_id=True)) + bus.send(can.Message(arbitration_id=2, is_extended_id=True)) + bus.send(can.Message(arbitration_id=1, is_extended_id=False)) + time.sleep(1.0) if __name__ == "__main__": diff --git a/examples/send_multiple.py b/examples/send_multiple.py index fdcaa5b59..9123e1bc8 100755 --- a/examples/send_multiple.py +++ b/examples/send_multiple.py @@ -4,8 +4,8 @@ This demo creates multiple processes of producers to spam a socketcan bus. """ -from time import sleep from concurrent.futures import ProcessPoolExecutor +from time import sleep import can diff --git a/examples/serial_com.py b/examples/serial_com.py index 538c8d12f..9f203b2e0 100755 --- a/examples/serial_com.py +++ b/examples/serial_com.py @@ -18,8 +18,8 @@ com0com: http://com0com.sourceforge.net/ """ -import time import threading +import time import can diff --git a/examples/vcan_filtered.py b/examples/vcan_filtered.py index 9c67390ab..22bca706c 100755 --- a/examples/vcan_filtered.py +++ b/examples/vcan_filtered.py @@ -18,14 +18,11 @@ def main(): # print all incoming messages, which includes the ones sent, # since we set receive_own_messages to True # assign to some variable so it does not garbage collected - notifier = can.Notifier(bus, [can.Printer()]) # pylint: disable=unused-variable - - bus.send(can.Message(arbitration_id=1, is_extended_id=True)) - bus.send(can.Message(arbitration_id=2, is_extended_id=True)) - bus.send(can.Message(arbitration_id=1, is_extended_id=False)) - - time.sleep(1.0) - notifier.stop() + with can.Notifier(bus, [can.Printer()]): # pylint: disable=unused-variable + bus.send(can.Message(arbitration_id=1, is_extended_id=True)) + bus.send(can.Message(arbitration_id=2, is_extended_id=True)) + bus.send(can.Message(arbitration_id=1, is_extended_id=False)) + time.sleep(1.0) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index f2b6ac04f..592eeef12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -167,6 +167,7 @@ ignore = [ ] "can/logger.py" = ["T20"] # flake8-print "can/player.py" = ["T20"] # flake8-print +"examples/*" = ["T20"] # flake8-print [tool.ruff.lint.isort] known-first-party = ["can"] diff --git a/test/notifier_test.py b/test/notifier_test.py index 6982130cf..c21d51f04 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -12,16 +12,19 @@ def test_single_bus(self): with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: reader = can.BufferedReader() notifier = can.Notifier(bus, [reader], 0.1) + self.assertFalse(notifier.stopped) msg = can.Message() bus.send(msg) self.assertIsNotNone(reader.get_message(1)) notifier.stop() + self.assertTrue(notifier.stopped) def test_multiple_bus(self): with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1: with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2: reader = can.BufferedReader() notifier = can.Notifier([bus1, bus2], [reader], 0.1) + self.assertFalse(notifier.stopped) msg = can.Message() bus1.send(msg) time.sleep(0.1) @@ -33,6 +36,39 @@ def test_multiple_bus(self): self.assertIsNotNone(recv_msg) self.assertEqual(recv_msg.channel, 1) notifier.stop() + self.assertTrue(notifier.stopped) + + def test_context_manager(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.BufferedReader() + with can.Notifier(bus, [reader], 0.1) as notifier: + self.assertFalse(notifier.stopped) + msg = can.Message() + bus.send(msg) + self.assertIsNotNone(reader.get_message(1)) + notifier.stop() + self.assertTrue(notifier.stopped) + + def test_registry(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.BufferedReader() + with can.Notifier(bus, [reader], 0.1) as notifier: + # creating a second notifier for the same bus must fail + self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) + + # find_instance must return the existing instance + self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) + + # Notifier is stopped, find_instances() must return an empty tuple + self.assertEqual(can.Notifier.find_instances(bus), ()) + + # now the first notifier is stopped, a new notifier can be created without error: + with can.Notifier(bus, [reader], 0.1) as notifier: + # the next notifier call should fail again since there is an active notifier already + self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) + + # find_instance must return the existing instance + self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) class AsyncNotifierTest(unittest.TestCase):