From 3c141666aded97a7a5ab4a6577e9f56857d6a58c Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 7 Nov 2024 14:57:12 +0100 Subject: [PATCH 01/41] Use zmq-anyio --- ipykernel/debugger.py | 4 +- ipykernel/inprocess/ipkernel.py | 2 +- ipykernel/inprocess/session.py | 2 +- ipykernel/iostream.py | 72 +++++++++++----------- ipykernel/ipkernel.py | 9 +-- ipykernel/kernelapp.py | 59 ++++-------------- ipykernel/kernelbase.py | 103 ++++++++++++++++++-------------- ipykernel/shellchannel.py | 12 +++- ipykernel/subshell.py | 14 +++-- ipykernel/subshell_manager.py | 56 +++++++++++------ ipykernel/thread.py | 5 ++ pyproject.toml | 3 +- tests/conftest.py | 25 +++----- tests/test_async.py | 16 ++--- tests/test_io.py | 97 ++++++++++++++++-------------- 15 files changed, 244 insertions(+), 235 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 780d18015..36aced05d 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -241,7 +241,7 @@ async def _send_request(self, msg): self.log.debug("DEBUGPYCLIENT:") self.log.debug(self.routing_id) self.log.debug(buf) - await self.debugpy_socket.send_multipart((self.routing_id, buf)) + await self.debugpy_socket.asend_multipart((self.routing_id, buf)) async def _wait_for_response(self): # Since events are never pushed to the message_queue @@ -437,7 +437,7 @@ async def start(self): (self.shell_socket.getsockopt(ROUTING_ID)), ) - msg = await self.shell_socket.recv_multipart() + msg = await self.shell_socket.arecv_multipart() ident, msg = self.session.feed_identities(msg, copy=True) try: msg = self.session.deserialize(msg, content=True, copy=True) diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index c6f8c6128..5abb691c2 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -54,7 +54,7 @@ class InProcessKernel(IPythonKernel): _underlying_iopub_socket = Instance(DummySocket, (False,)) iopub_thread: IOPubThread = Instance(IOPubThread) # type:ignore[assignment] - shell_socket = Instance(DummySocket, (True,)) # type:ignore[arg-type] + shell_socket = Instance(DummySocket, (True,)) @default("iopub_thread") def _default_iopub_thread(self): diff --git a/ipykernel/inprocess/session.py b/ipykernel/inprocess/session.py index 0eaed2c60..70b135742 100644 --- a/ipykernel/inprocess/session.py +++ b/ipykernel/inprocess/session.py @@ -3,7 +3,7 @@ class Session(_Session): async def recv(self, socket, copy=True): - return await socket.recv_multipart() + return await socket.arecv_multipart() def send( self, diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index d81710175..193342129 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -20,6 +20,7 @@ from typing import Any, Callable import zmq +import zmq_anyio from anyio import create_task_group, run, sleep, to_thread from jupyter_client.session import extract_header @@ -55,11 +56,11 @@ def run(self): run(self._main) async def _main(self): - async with create_task_group() as tg: + async with create_task_group() as self._task_group: for task in self._tasks: - tg.start_soon(task) + self._task_group.start_soon(task) await to_thread.run_sync(self.__stop.wait) - tg.cancel_scope.cancel() + self._task_group.cancel_scope.cancel() def stop(self): """Stop the thread. @@ -78,7 +79,7 @@ class IOPubThread: whose IO is always run in a thread. """ - def __init__(self, socket, pipe=False): + def __init__(self, socket: zmq_anyio.Socket, pipe=False): """Create IOPub thread Parameters @@ -91,10 +92,7 @@ def __init__(self, socket, pipe=False): """ # ensure all of our sockets as sync zmq.Sockets # don't create async wrappers until we are within the appropriate coroutines - self.socket: zmq.Socket[bytes] | None = zmq.Socket(socket) - if self.socket.context is None: - # bug in pyzmq, shadow socket doesn't always inherit context attribute - self.socket.context = socket.context # type:ignore[unreachable] + self.socket: zmq_anyio.Socket = socket self._context = socket.context self.background_socket = BackgroundSocket(self) @@ -108,14 +106,14 @@ def __init__(self, socket, pipe=False): self._event_pipe_gc_lock: threading.Lock = threading.Lock() self._event_pipe_gc_seconds: float = 10 self._setup_event_pipe() - tasks = [self._handle_event, self._run_event_pipe_gc] + tasks = [self._handle_event, self._run_event_pipe_gc, self.socket.start] if pipe: tasks.append(self._handle_pipe_msgs) self.thread = _IOPubThread(tasks) def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" - self._pipe_in0 = self._context.socket(zmq.PULL, socket_class=zmq.Socket) + self._pipe_in0 = self._context.socket(zmq.PULL) self._pipe_in0.linger = 0 _uuid = b2a_hex(os.urandom(16)).decode("ascii") @@ -150,7 +148,7 @@ def _event_pipe(self): except AttributeError: # new thread, new event pipe # create sync base socket - event_pipe = self._context.socket(zmq.PUSH, socket_class=zmq.Socket) + event_pipe = self._context.socket(zmq.PUSH) event_pipe.linger = 0 event_pipe.connect(self._event_interface) self._local.event_pipe = event_pipe @@ -169,30 +167,28 @@ async def _handle_event(self): Whenever *an* event arrives on the event stream, *all* waiting events are processed in order. """ - # create async wrapper within coroutine - pipe_in = zmq.asyncio.Socket(self._pipe_in0) - try: - while True: - await pipe_in.recv() - # freeze event count so new writes don't extend the queue - # while we are processing - n_events = len(self._events) - for _ in range(n_events): - event_f = self._events.popleft() - event_f() - except Exception: - if self.thread.__stop.is_set(): - return - raise + pipe_in = zmq_anyio.Socket(self._pipe_in0) + async with pipe_in: + try: + while True: + await pipe_in.arecv() + # freeze event count so new writes don't extend the queue + # while we are processing + n_events = len(self._events) + for _ in range(n_events): + event_f = self._events.popleft() + event_f() + except Exception: + if self.thread.__stop.is_set(): + return + raise def _setup_pipe_in(self): """setup listening pipe for IOPub from forked subprocesses""" - ctx = self._context - # use UUID to authenticate pipe messages self._pipe_uuid = os.urandom(16) - self._pipe_in1 = ctx.socket(zmq.PULL, socket_class=zmq.Socket) + self._pipe_in1 = zmq_anyio.Socket(self._context.socket(zmq.PULL)) self._pipe_in1.linger = 0 try: @@ -210,18 +206,18 @@ def _setup_pipe_in(self): async def _handle_pipe_msgs(self): """handle pipe messages from a subprocess""" # create async wrapper within coroutine - self._async_pipe_in1 = zmq.asyncio.Socket(self._pipe_in1) - try: - while True: - await self._handle_pipe_msg() - except Exception: - if self.thread.__stop.is_set(): - return - raise + async with self._pipe_in1: + try: + while True: + await self._handle_pipe_msg() + except Exception: + if self.thread.__stop.is_set(): + return + raise async def _handle_pipe_msg(self, msg=None): """handle a pipe message from a subprocess""" - msg = msg or await self._async_pipe_in1.recv_multipart() + msg = msg or await self._pipe_in1.arecv_multipart() if not self._pipe_flag or not self._is_main_process(): return if msg[0] != self._pipe_uuid: diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 48efa6cd6..d8d2ba5d5 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -12,7 +12,7 @@ from dataclasses import dataclass import comm -import zmq.asyncio +import zmq_anyio from anyio import TASK_STATUS_IGNORED, create_task_group, to_thread from anyio.abc import TaskStatus from IPython.core import release @@ -76,7 +76,7 @@ class IPythonKernel(KernelBase): help="Set this flag to False to deactivate the use of experimental IPython completion APIs.", ).tag(config=True) - debugpy_socket = Instance(zmq.asyncio.Socket, allow_none=True) + debugpy_socket = Instance(zmq_anyio.Socket, allow_none=True) user_module = Any() @@ -212,7 +212,8 @@ def __init__(self, **kwargs): } async def process_debugpy(self): - async with create_task_group() as tg: + assert self.debugpy_socket is not None + async with self.debug_shell_socket, self.debugpy_socket, create_task_group() as tg: tg.start_soon(self.receive_debugpy_messages) tg.start_soon(self.poll_stopped_queue) await to_thread.run_sync(self.debugpy_stop.wait) @@ -235,7 +236,7 @@ async def receive_debugpy_message(self, msg=None): if msg is None: assert self.debugpy_socket is not None - msg = await self.debugpy_socket.recv_multipart() + msg = await self.debugpy_socket.arecv_multipart() # The first frame is the socket id, we can drop it frame = msg[1].decode("utf-8") self.log.debug("Debugpy received: %s", frame) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 8bb047339..055c4bf8e 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -19,7 +19,7 @@ from typing import Optional import zmq -import zmq.asyncio +import zmq_anyio from anyio import create_task_group, run from IPython.core.application import ( # type:ignore[attr-defined] BaseIPythonApplication, @@ -328,15 +328,15 @@ def init_sockets(self): """Create a context, a session, and the kernel sockets.""" self.log.info("Starting the kernel at pid: %i", os.getpid()) assert self.context is None, "init_sockets cannot be called twice!" - self.context = context = zmq.asyncio.Context() + self.context = context = zmq.Context() atexit.register(self.close) - self.shell_socket = context.socket(zmq.ROUTER) + self.shell_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.shell_socket.linger = 1000 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port) - self.stdin_socket = zmq.Context(context).socket(zmq.ROUTER) + self.stdin_socket = context.socket(zmq.ROUTER) self.stdin_socket.linger = 1000 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port) @@ -352,18 +352,19 @@ def init_sockets(self): def init_control(self, context): """Initialize the control channel.""" - self.control_socket = context.socket(zmq.ROUTER) + self.control_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.control_socket.linger = 1000 self.control_port = self._bind_socket(self.control_socket, self.control_port) self.log.debug("control ROUTER Channel on port: %i" % self.control_port) - self.debugpy_socket = context.socket(zmq.STREAM) + self.debugpy_socket = zmq_anyio.Socket(context, zmq.STREAM) self.debugpy_socket.linger = 1000 - self.debug_shell_socket = context.socket(zmq.DEALER) + self.debug_shell_socket = zmq_anyio.Socket(context.socket(zmq.DEALER)) self.debug_shell_socket.linger = 1000 - if self.shell_socket.getsockopt(zmq.LAST_ENDPOINT): - self.debug_shell_socket.connect(self.shell_socket.getsockopt(zmq.LAST_ENDPOINT)) + last_endpoint = self.shell_socket.getsockopt(zmq.LAST_ENDPOINT) + if last_endpoint: + self.debug_shell_socket.connect(last_endpoint) if hasattr(zmq, "ROUTER_HANDOVER"): # set router-handover to workaround zeromq reconnect problems @@ -376,7 +377,7 @@ def init_control(self, context): def init_iopub(self, context): """Initialize the iopub channel.""" - self.iopub_socket = context.socket(zmq.PUB) + self.iopub_socket = zmq_anyio.Socket(context.socket(zmq.PUB)) self.iopub_socket.linger = 1000 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port) @@ -637,43 +638,6 @@ def configure_tornado_logger(self): handler.setFormatter(formatter) logger.addHandler(handler) - def _init_asyncio_patch(self): - """set default asyncio policy to be compatible with tornado - - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - - Support for Proactor via a background thread is available in tornado 6.1, - but it is still preferable to run the Selector in the main thread - instead of the background. - - do this as early as possible to make it a low priority and overridable - - ref: https://github.com/tornadoweb/tornado/issues/2608 - - FIXME: if/when tornado supports the defaults in asyncio without threads, - remove and bump tornado requirement for py38. - Most likely, this will mean a new Python version - where asyncio.ProactorEventLoop supports add_reader and friends. - - """ - if sys.platform.startswith("win"): - import asyncio - - try: - from asyncio import WindowsProactorEventLoopPolicy, WindowsSelectorEventLoopPolicy - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - def init_pdb(self): """Replace pdb with IPython's version that is interruptible. @@ -693,7 +657,6 @@ def init_pdb(self): @catch_config_error def initialize(self, argv=None): """Initialize the application.""" - self._init_asyncio_patch() super().initialize(argv) if self.subapp is not None: return diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index d496e0c91..3c7324d22 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -35,6 +35,7 @@ import psutil import zmq +import zmq_anyio from anyio import TASK_STATUS_IGNORED, Event, create_task_group, sleep, to_thread from anyio.abc import TaskStatus from IPython.core.error import StdinNotImplementedError @@ -88,7 +89,7 @@ class Kernel(SingletonConfigurable): session = Instance(Session, allow_none=True) profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True) - shell_socket = Instance(zmq.asyncio.Socket, allow_none=True) + shell_socket = Instance(zmq_anyio.Socket, allow_none=True) implementation: str implementation_version: str @@ -96,7 +97,7 @@ class Kernel(SingletonConfigurable): _is_test = Bool(False) - control_socket = Instance(zmq.asyncio.Socket, allow_none=True) + control_socket = Instance(zmq_anyio.Socket, allow_none=True) control_tasks: t.Any = List() debug_shell_socket = Any() @@ -267,7 +268,7 @@ async def process_control_message(self, msg=None): assert self.session is not None assert self.control_thread is None or threading.current_thread() == self.control_thread - msg = msg or await self.control_socket.recv_multipart() + msg = msg or await self.control_socket.arecv_multipart() idents, msg = self.session.feed_identities(msg) try: msg = self.session.deserialize(msg, content=True) @@ -364,26 +365,29 @@ async def shell_channel_thread_main(self): assert self.shell_channel_thread is not None assert threading.current_thread() == self.shell_channel_thread - try: - while True: - msg = await self.shell_socket.recv_multipart(copy=False) - # deserialize only the header to get subshell_id - # Keep original message to send to subshell_id unmodified. - _, msg2 = self.session.feed_identities(msg, copy=False) - try: - msg3 = self.session.deserialize(msg2, content=False, copy=False) - subshell_id = msg3["header"].get("subshell_id") - - # Find inproc pair socket to use to send message to correct subshell. - socket = self.shell_channel_thread.manager.get_shell_channel_socket(subshell_id) - assert socket is not None - socket.send_multipart(msg, copy=False) - except Exception: - self.log.error("Invalid message", exc_info=True) # noqa: G201 - except BaseException: - if self.shell_stop.is_set(): - return - raise + async with self.shell_socket: + try: + while True: + msg = await self.shell_socket.arecv_multipart(copy=False) + # deserialize only the header to get subshell_id + # Keep original message to send to subshell_id unmodified. + _, msg2 = self.session.feed_identities(msg, copy=False) + try: + msg3 = self.session.deserialize(msg2, content=False, copy=False) + subshell_id = msg3["header"].get("subshell_id") + + # Find inproc pair socket to use to send message to correct subshell. + socket = self.shell_channel_thread.manager.get_shell_channel_socket( + subshell_id + ) + assert socket is not None + socket.send_multipart(msg, copy=False) + except Exception: + self.log.error("Invalid message", exc_info=True) # noqa: G201 + except BaseException: + if self.shell_stop.is_set(): + return + raise async def shell_main(self, subshell_id: str | None): """Main loop for a single subshell.""" @@ -411,13 +415,15 @@ async def shell_main(self, subshell_id: str | None): async def process_shell(self, socket=None): # socket=None is valid if kernel subshells are not supported. - try: - while True: - await self.process_shell_message(socket=socket) - except BaseException: - if self.shell_stop.is_set(): - return - raise + _socket = self.shell_socket if socket is None else socket + async with _socket: + try: + while True: + await self.process_shell_message(socket=socket) + except BaseException: + if self.shell_stop.is_set(): + return + raise async def process_shell_message(self, msg=None, socket=None): # If socket is None kernel subshells are not supported so use socket=shell_socket. @@ -435,8 +441,8 @@ async def process_shell_message(self, msg=None, socket=None): assert socket is None socket = self.shell_socket - no_msg = msg is None if self._is_test else not await socket.poll(0) - msg = msg or await socket.recv_multipart(copy=False) + no_msg = msg is None if self._is_test else not await socket.apoll(0) + msg = msg or await socket.arecv_multipart(copy=False) received_time = time.monotonic() copy = not isinstance(msg[0], zmq.Message) @@ -490,7 +496,7 @@ async def process_shell_message(self, msg=None, socket=None): try: result = handler(socket, idents, msg) if inspect.isawaitable(result): - await result + result = await result except Exception: self.log.error("Exception in message handler:", exc_info=True) # noqa: G201 except KeyboardInterrupt: @@ -509,7 +515,8 @@ async def process_shell_message(self, msg=None, socket=None): self._publish_status("idle", "shell") async def control_main(self): - async with create_task_group() as tg: + assert self.control_socket is not None + async with self.control_socket, create_task_group() as tg: for task in self.control_tasks: tg.start_soon(task) tg.start_soon(self.process_control) @@ -545,7 +552,9 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: # Assign tasks to and start shell channel thread. manager = self.shell_channel_thread.manager self.shell_channel_thread.add_task(self.shell_channel_thread_main) - self.shell_channel_thread.add_task(manager.listen_from_control, self.shell_main) + self.shell_channel_thread.add_task( + manager.listen_from_control, self.shell_main, self.shell_channel_thread + ) self.shell_channel_thread.add_task(manager.listen_from_subshells) self.shell_channel_thread.start() else: @@ -1075,9 +1084,11 @@ async def create_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. - other_socket = self.shell_channel_thread.manager.get_control_other_socket() - await other_socket.send_json({"type": "create"}) - reply = await other_socket.recv_json() + other_socket = await self.shell_channel_thread.manager.get_control_other_socket( + self.control_thread.get_task_group() + ) + await other_socket.asend_json({"type": "create"}) + reply = await other_socket.arecv_json() self.session.send(socket, "create_subshell_reply", reply, parent, ident) @@ -1097,9 +1108,11 @@ async def delete_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. - other_socket = self.shell_channel_thread.manager.get_control_other_socket() - await other_socket.send_json({"type": "delete", "subshell_id": subshell_id}) - reply = await other_socket.recv_json() + other_socket = await self.shell_channel_thread.manager.get_control_other_socket( + self.control_thread.get_task_group() + ) + await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id}) + reply = await other_socket.arecv_json() self.session.send(socket, "delete_subshell_reply", reply, parent, ident) @@ -1112,9 +1125,11 @@ async def list_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. - other_socket = self.shell_channel_thread.manager.get_control_other_socket() - await other_socket.send_json({"type": "list"}) - reply = await other_socket.recv_json() + other_socket = await self.shell_channel_thread.manager.get_control_other_socket( + self.control_thread.get_task_group() + ) + await other_socket.asend_json({"type": "list"}) + reply = await other_socket.arecv_json() self.session.send(socket, "list_subshell_reply", reply, parent, ident) diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index bc0459c46..789a88757 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -1,5 +1,6 @@ """A thread for a shell channel.""" -import zmq.asyncio +import zmq +import zmq_anyio from .subshell_manager import SubshellManager from .thread import SHELL_CHANNEL_THREAD_NAME, BaseThread @@ -11,7 +12,12 @@ class ShellChannelThread(BaseThread): Communicates with shell/subshell threads via pairs of ZMQ inproc sockets. """ - def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socket, **kwargs): + def __init__( + self, + context: zmq.Context, # type: ignore[type-arg] + shell_socket: zmq_anyio.Socket, + **kwargs, + ): """Initialize the thread.""" super().__init__(name=SHELL_CHANNEL_THREAD_NAME, **kwargs) self._manager: SubshellManager | None = None @@ -22,7 +28,7 @@ def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socke def manager(self) -> SubshellManager: # Lazy initialisation. if self._manager is None: - self._manager = SubshellManager(self._context, self._shell_socket) + self._manager = SubshellManager(self._context, self._shell_socket, self.get_task_group) return self._manager def run(self) -> None: diff --git a/ipykernel/subshell.py b/ipykernel/subshell.py index 18e15ab38..e84f54987 100644 --- a/ipykernel/subshell.py +++ b/ipykernel/subshell.py @@ -2,7 +2,8 @@ from threading import current_thread -import zmq.asyncio +import zmq +import zmq_anyio from .thread import BaseThread @@ -15,17 +16,22 @@ def __init__(self, subshell_id: str, **kwargs): super().__init__(name=f"subshell-{subshell_id}", **kwargs) # Inproc PAIR socket, for communication with shell channel thread. - self._pair_socket: zmq.asyncio.Socket | None = None + self._pair_socket: zmq_anyio.Socket | None = None - async def create_pair_socket(self, context: zmq.asyncio.Context, address: str) -> None: + async def create_pair_socket( + self, + context: zmq.Context, # type: ignore[type-arg] + address: str, + ) -> None: """Create inproc PAIR socket, for communication with shell channel thread. Should be called from this thread, so usually via add_task before the thread is started. """ assert current_thread() == self - self._pair_socket = context.socket(zmq.PAIR) + self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR) self._pair_socket.connect(address) + self.add_task(self._pair_socket.start) def run(self) -> None: try: diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 805d6f812..dbd3da762 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -8,19 +8,21 @@ import uuid from dataclasses import dataclass from threading import Lock, current_thread, main_thread +from typing import Callable import zmq -import zmq.asyncio +import zmq_anyio from anyio import create_memory_object_stream, create_task_group +from anyio.abc import TaskGroup from .subshell import SubshellThread -from .thread import SHELL_CHANNEL_THREAD_NAME +from .thread import SHELL_CHANNEL_THREAD_NAME, BaseThread @dataclass class Subshell: thread: SubshellThread - shell_channel_socket: zmq.asyncio.Socket + shell_channel_socket: zmq_anyio.Socket class SubshellManager: @@ -38,11 +40,17 @@ class SubshellManager: against multiple subshells attempting to send at the same time. """ - def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socket): + def __init__( + self, + context: zmq.Context, # type: ignore[type-arg] + shell_socket: zmq_anyio.Socket, + get_task_group: Callable[[], TaskGroup], + ): assert current_thread() == main_thread() - self._context: zmq.asyncio.Context = context + self._context: zmq.Context = context # type: ignore[type-arg] self._shell_socket = shell_socket + self._get_task_group = get_task_group self._cache: dict[str, Subshell] = {} self._lock_cache = Lock() self._lock_shell_socket = Lock() @@ -83,10 +91,13 @@ def close(self) -> None: break self._stop_subshell(subshell) - def get_control_other_socket(self) -> zmq.asyncio.Socket: + async def get_control_other_socket(self, task_group: TaskGroup) -> zmq_anyio.Socket: + if not self._control_other_socket.started.is_set(): + task_group.start_soon(self._control_other_socket.start) + await self._control_other_socket.started.wait() return self._control_other_socket - def get_other_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def get_other_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: """Return the other inproc pair socket for a subshell. This socket is accessed from the subshell thread. @@ -98,7 +109,7 @@ def get_other_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: assert socket is not None return socket - def get_shell_channel_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def get_shell_channel_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: """Return the shell channel inproc pair socket for a subshell. This socket is accessed from the shell channel thread. @@ -116,17 +127,20 @@ def list_subshell(self) -> list[str]: with self._lock_cache: return list(self._cache) - async def listen_from_control(self, subshell_task: t.Any) -> None: + async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) -> None: """Listen for messages on the control inproc socket, handle those messages and return replies on the same socket. Runs in the shell channel thread. """ assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + if not self._control_shell_channel_socket.started.is_set(): + thread.get_task_group().start_soon(self._control_shell_channel_socket.start) + await self._control_shell_channel_socket.started.wait() socket = self._control_shell_channel_socket while True: - request = await socket.recv_json() # type: ignore[misc] + request = await socket.arecv_json() reply = await self._process_control_request(request, subshell_task) - await socket.send_json(reply) # type: ignore[func-returns-value] + await socket.asend_json(reply) async def listen_from_subshells(self) -> None: """Listen for reply messages on inproc sockets of all subshells and resend @@ -137,9 +151,9 @@ async def listen_from_subshells(self) -> None: assert current_thread().name == SHELL_CHANNEL_THREAD_NAME async with create_task_group() as tg: - tg.start_soon(self._listen_for_subshell_reply, None) + tg.start_soon(self._listen_for_subshell_reply, None, tg) async for subshell_id in self._receive_stream: - tg.start_soon(self._listen_for_subshell_reply, subshell_id) + tg.start_soon(self._listen_for_subshell_reply, subshell_id, tg) def subshell_id_from_thread_id(self, thread_id: int) -> str | None: """Return subshell_id of the specified thread_id. @@ -159,10 +173,10 @@ def subshell_id_from_thread_id(self, thread_id: int) -> str | None: def _create_inproc_pair_socket( self, name: str | None, shell_channel_end: bool - ) -> zmq.asyncio.Socket: + ) -> zmq_anyio.Socket: """Create and return a single ZMQ inproc pair socket.""" address = self._get_inproc_socket_address(name) - socket = self._context.socket(zmq.PAIR) + socket = zmq_anyio.Socket(self._context, zmq.PAIR) if shell_channel_end: socket.bind(address) else: @@ -208,7 +222,7 @@ def _get_inproc_socket_address(self, name: str | None) -> str: full_name = f"subshell-{name}" if name else "subshell" return f"inproc://{full_name}" - def _get_shell_channel_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def _get_shell_channel_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: if subshell_id is None: return self._parent_shell_channel_socket with self._lock_cache: @@ -220,7 +234,9 @@ def _is_subshell(self, subshell_id: str | None) -> bool: with self._lock_cache: return subshell_id in self._cache - async def _listen_for_subshell_reply(self, subshell_id: str | None) -> None: + async def _listen_for_subshell_reply( + self, subshell_id: str | None, task_group: TaskGroup + ) -> None: """Listen for reply messages on specified subshell inproc socket and resend to the client via the shell_socket. @@ -230,11 +246,13 @@ async def _listen_for_subshell_reply(self, subshell_id: str | None) -> None: shell_channel_socket = self._get_shell_channel_socket(subshell_id) + task_group.start_soon(shell_channel_socket.start) + await shell_channel_socket.started.wait() try: while True: - msg = await shell_channel_socket.recv_multipart(copy=False) + msg = await shell_channel_socket.arecv_multipart(copy=False) with self._lock_shell_socket: - await self._shell_socket.send_multipart(msg) + await self._shell_socket.asend_multipart(msg) except BaseException: if not self._is_subshell(subshell_id): # Subshell no longer exists so exit gracefully diff --git a/ipykernel/thread.py b/ipykernel/thread.py index 40509eced..f55ee7c78 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -3,6 +3,7 @@ from threading import Event, Thread from anyio import create_task_group, run, to_thread +from anyio.abc import TaskGroup CONTROL_THREAD_NAME = "Control" SHELL_CHANNEL_THREAD_NAME = "Shell channel" @@ -19,6 +20,9 @@ def __init__(self, **kwargs): self.__stop = Event() self._tasks_and_args: list[tuple[t.Any, t.Any]] = [] + def get_task_group(self) -> TaskGroup: + return self._task_group + def add_task(self, task: t.Any, *args: t.Any) -> None: # May only add tasks before the thread is started. self._tasks_and_args.append((task, args)) @@ -29,6 +33,7 @@ def run(self) -> t.Any: async def _main(self) -> None: async with create_task_group() as tg: + self._task_group = tg for task, args in self._tasks_and_args: tg.start_soon(task, *args) await to_thread.run_sync(self.__stop.wait) diff --git a/pyproject.toml b/pyproject.toml index 2360b6684..89b9ea32d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,10 +30,11 @@ dependencies = [ "nest_asyncio>=1.4", "matplotlib-inline>=0.1", 'appnope>=0.1.2;platform_system=="Darwin"', - "pyzmq>=25.0", + "pyzmq>=26.0", "psutil>=5.7", "packaging>=22", "anyio>=4.2.0", + "zmq-anyio >=0.2.3", ] [project.urls] diff --git a/tests/conftest.py b/tests/conftest.py index 2c2665551..fc798e74d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,12 @@ -import asyncio import logging -import os from math import inf from typing import Any, Callable, no_type_check from unittest.mock import MagicMock import pytest import zmq -import zmq.asyncio -from anyio import create_memory_object_stream, create_task_group +import zmq_anyio +from anyio import create_memory_object_stream, create_task_group, sleep from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from jupyter_client.session import Session @@ -46,11 +44,6 @@ def anyio_backend(): resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard)) -# Enforce selector event loop on Windows. -if os.name == "nt": - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type:ignore - - class TestSession(Session): """A session that copies sent messages to an internal stream, so that they can be accessed later. @@ -77,21 +70,21 @@ def send(self, socket, *args, **kwargs): class KernelMixin: - shell_socket: zmq.asyncio.Socket - control_socket: zmq.asyncio.Socket + shell_socket: zmq_anyio.Socket + control_socket: zmq_anyio.Socket stop: Callable[[], None] log = logging.getLogger() def _initialize(self): self._is_test = True - self.context = context = zmq.asyncio.Context() - self.iopub_socket = context.socket(zmq.PUB) - self.stdin_socket = context.socket(zmq.ROUTER) + self.context = context = zmq.Context() + self.iopub_socket = zmq_anyio.Socket(context.socket(zmq.PUB)) + self.stdin_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.test_sockets = [self.iopub_socket] for name in ["shell", "control"]: - socket = context.socket(zmq.ROUTER) + socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.test_sockets.append(socket) setattr(self, f"{name}_socket", socket) @@ -142,7 +135,7 @@ def _prep_msg(self, *args, **kwargs): async def _wait_for_msg(self): while not self._reply: - await asyncio.sleep(0.1) + await sleep(0.1) _, msg = self.session.feed_identities(self._reply) return self.session.deserialize(msg) diff --git a/tests/test_async.py b/tests/test_async.py index a40db4a00..f1d91c5eb 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -8,6 +8,8 @@ from .test_message_spec import validate_message from .utils import TIMEOUT, execute, flush_channels, start_new_kernel +pytestmark = pytest.mark.anyio + KC = KM = None @@ -30,24 +32,22 @@ def test_async_await(): assert content["status"] == "ok", content -# FIXME: @pytest.mark.parametrize("asynclib", ["asyncio", "trio", "curio"]) @pytest.mark.skipif(os.name == "nt", reason="Cannot interrupt on Windows") -@pytest.mark.parametrize("asynclib", ["asyncio"]) -def test_async_interrupt(asynclib, request): +def test_async_interrupt(anyio_backend, request): assert KC is not None assert KM is not None try: - __import__(asynclib) + __import__(anyio_backend) except ImportError: - pytest.skip("Requires %s" % asynclib) - request.addfinalizer(lambda: execute("%autoawait asyncio", KC)) + pytest.skip("Requires %s" % anyio_backend) + request.addfinalizer(lambda: execute(f"%autoawait {anyio_backend}", KC)) flush_channels(KC) - msg_id, content = execute("%autoawait " + asynclib, KC) + msg_id, content = execute(f"%autoawait {anyio_backend}", KC) assert content["status"] == "ok", content flush_channels(KC) - msg_id = KC.execute(f"print('begin'); import {asynclib}; await {asynclib}.sleep(5)") + msg_id = KC.execute(f"print('begin'); import {anyio_backend}; await {anyio_backend}.sleep(5)") busy = KC.get_iopub_msg(timeout=TIMEOUT) validate_message(busy, "status", msg_id) assert busy["content"]["execution_state"] == "busy" diff --git a/tests/test_io.py b/tests/test_io.py index e3ff28159..09add95f0 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -12,22 +12,24 @@ import pytest import zmq -import zmq.asyncio +import zmq_anyio from jupyter_client.session import Session from ipykernel.iostream import _PARENT, BackgroundSocket, IOPubThread, OutStream +pytestmark = pytest.mark.anyio + @pytest.fixture() def ctx(): - ctx = zmq.asyncio.Context() + ctx = zmq.Context() yield ctx ctx.destroy() @pytest.fixture() -def iopub_thread(ctx): - with ctx.socket(zmq.PUB) as pub: +async def iopub_thread(ctx): + async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: thread = IOPubThread(pub) thread.start() @@ -36,7 +38,7 @@ def iopub_thread(ctx): thread.close() -def test_io_api(iopub_thread): +async def test_io_api(iopub_thread): """Test that wrapped stdout has the same API as a normal TextIO object""" session = Session() stream = OutStream(session, iopub_thread, "stdout") @@ -59,12 +61,13 @@ def test_io_api(iopub_thread): stream.write(b"") # type:ignore -def test_io_isatty(iopub_thread): +async def test_io_isatty(iopub_thread): session = Session() stream = OutStream(session, iopub_thread, "stdout", isatty=True) assert stream.isatty() +@pytest.mark.skip(reason="FIXME") async def test_io_thread(anyio_backend, iopub_thread): thread = iopub_thread thread._setup_pipe_in() @@ -150,61 +153,63 @@ async def test_event_pipe_gc(iopub_thread): # assert iopub_thread._event_pipes == {} -def subprocess_test_echo_watch(): +async def subprocess_test_echo_watch(): # handshake Pub subscription session = Session(key=b"abc") # use PUSH socket to avoid subscription issues - with zmq.asyncio.Context() as ctx, ctx.socket(zmq.PUSH) as pub: - pub.connect(os.environ["IOPUB_URL"]) - iopub_thread = IOPubThread(pub) - iopub_thread.start() - stdout_fd = sys.stdout.fileno() - sys.stdout.flush() - stream = OutStream( - session, - iopub_thread, - "stdout", - isatty=True, - echo=sys.stdout, - watchfd="force", - ) - save_stdout = sys.stdout - with stream, mock.patch.object(sys, "stdout", stream): - # write to low-level FD - os.write(stdout_fd, b"fd\n") - # print (writes to stream) - print("print\n", end="") + with zmq.Context() as ctx: + async with zmq_anyio.Socket(ctx.socket(zmq.PUSH)) as pub: + pub.connect(os.environ["IOPUB_URL"]) + iopub_thread = IOPubThread(pub) + iopub_thread.start() + stdout_fd = sys.stdout.fileno() sys.stdout.flush() - # write to unwrapped __stdout__ (should also go to original FD) - sys.__stdout__.write("__stdout__\n") - sys.__stdout__.flush() - # write to original sys.stdout (should be the same as __stdout__) - save_stdout.write("stdout\n") - save_stdout.flush() - # is there another way to flush on the FD? - fd_file = os.fdopen(stdout_fd, "w") - fd_file.flush() - # we don't have a sync flush on _reading_ from the watched pipe - time.sleep(1) - stream.flush() - iopub_thread.stop() - iopub_thread.close() + stream = OutStream( + session, + iopub_thread, + "stdout", + isatty=True, + echo=sys.stdout, + watchfd="force", + ) + save_stdout = sys.stdout + with stream, mock.patch.object(sys, "stdout", stream): + # write to low-level FD + os.write(stdout_fd, b"fd\n") + # print (writes to stream) + print("print\n", end="") + sys.stdout.flush() + # write to unwrapped __stdout__ (should also go to original FD) + sys.__stdout__.write("__stdout__\n") + sys.__stdout__.flush() + # write to original sys.stdout (should be the same as __stdout__) + save_stdout.write("stdout\n") + save_stdout.flush() + # is there another way to flush on the FD? + fd_file = os.fdopen(stdout_fd, "w") + fd_file.flush() + # we don't have a sync flush on _reading_ from the watched pipe + time.sleep(1) + stream.flush() + iopub_thread.stop() + iopub_thread.close() @pytest.mark.anyio() -@pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") +# @pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") +@pytest.mark.skip(reason="FIXME") async def test_echo_watch(ctx): """Test echo on underlying FD while capturing the same FD Test runs in a subprocess to avoid messing with pytest output capturing. """ - s = ctx.socket(zmq.PULL) + s = zmq_anyio.Socket(ctx.socket(zmq.PULL)) port = s.bind_to_random_port("tcp://127.0.0.1") url = f"tcp://127.0.0.1:{port}" session = Session(key=b"abc") stdout_chunks = [] - with s: + async with s: env = dict(os.environ) env["IOPUB_URL"] = url env["PYTHONUNBUFFERED"] = "1" @@ -224,8 +229,8 @@ async def test_echo_watch(ctx): print(f"{p.stdout=}") print(f"{p.stderr}=", file=sys.stderr) assert p.returncode == 0 - while await s.poll(timeout=100): - msg = await s.recv_multipart() + while await s.apoll(timeout=100): + msg = await s.arecv_multipart() ident, msg = session.feed_identities(msg, copy=True) msg = session.deserialize(msg, content=True, copy=True) assert msg is not None # for type narrowing From b3c1fcf7c72ebf320e4519c430a9ff303c8ed298 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 09:57:18 +0100 Subject: [PATCH 02/41] Replace thread add_task with start_soon --- ipykernel/kernelbase.py | 17 +++++++++-------- ipykernel/shellchannel.py | 2 +- ipykernel/subshell.py | 4 ++-- ipykernel/subshell_manager.py | 14 ++++++-------- ipykernel/thread.py | 35 +++++++++++++++++------------------ 5 files changed, 35 insertions(+), 37 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 3c7324d22..fcceef261 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -16,6 +16,7 @@ import uuid import warnings from datetime import datetime +from functools import partial from signal import SIGINT, SIGTERM, Signals from .thread import CONTROL_THREAD_NAME @@ -536,7 +537,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: self.control_stop = threading.Event() if not self._is_test and self.control_socket is not None: if self.control_thread: - self.control_thread.add_task(self.control_main) + self.control_thread.start_soon(self.control_main) self.control_thread.start() else: tg.start_soon(self.control_main) @@ -551,11 +552,11 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: # Assign tasks to and start shell channel thread. manager = self.shell_channel_thread.manager - self.shell_channel_thread.add_task(self.shell_channel_thread_main) - self.shell_channel_thread.add_task( - manager.listen_from_control, self.shell_main, self.shell_channel_thread + self.shell_channel_thread.start_soon(self.shell_channel_thread_main) + self.shell_channel_thread.start_soon( + partial(manager.listen_from_control, self.shell_main, self.shell_channel_thread) ) - self.shell_channel_thread.add_task(manager.listen_from_subshells) + self.shell_channel_thread.start_soon(manager.listen_from_subshells) self.shell_channel_thread.start() else: if not self._is_test and self.shell_socket is not None: @@ -1085,7 +1086,7 @@ async def create_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. other_socket = await self.shell_channel_thread.manager.get_control_other_socket( - self.control_thread.get_task_group() + self.control_thread ) await other_socket.asend_json({"type": "create"}) reply = await other_socket.arecv_json() @@ -1109,7 +1110,7 @@ async def delete_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. other_socket = await self.shell_channel_thread.manager.get_control_other_socket( - self.control_thread.get_task_group() + self.control_thread ) await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id}) reply = await other_socket.arecv_json() @@ -1126,7 +1127,7 @@ async def list_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. other_socket = await self.shell_channel_thread.manager.get_control_other_socket( - self.control_thread.get_task_group() + self.control_thread ) await other_socket.asend_json({"type": "list"}) reply = await other_socket.arecv_json() diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index 789a88757..819a0aecb 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -28,7 +28,7 @@ def __init__( def manager(self) -> SubshellManager: # Lazy initialisation. if self._manager is None: - self._manager = SubshellManager(self._context, self._shell_socket, self.get_task_group) + self._manager = SubshellManager(self._context, self._shell_socket) return self._manager def run(self) -> None: diff --git a/ipykernel/subshell.py b/ipykernel/subshell.py index e84f54987..180e9ecb3 100644 --- a/ipykernel/subshell.py +++ b/ipykernel/subshell.py @@ -25,13 +25,13 @@ async def create_pair_socket( ) -> None: """Create inproc PAIR socket, for communication with shell channel thread. - Should be called from this thread, so usually via add_task before the + Should be called from this thread, so usually via start_soon before the thread is started. """ assert current_thread() == self self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR) self._pair_socket.connect(address) - self.add_task(self._pair_socket.start) + self.start_soon(self._pair_socket.start) def run(self) -> None: try: diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index dbd3da762..505c2f408 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -7,8 +7,8 @@ import typing as t import uuid from dataclasses import dataclass +from functools import partial from threading import Lock, current_thread, main_thread -from typing import Callable import zmq import zmq_anyio @@ -44,13 +44,11 @@ def __init__( self, context: zmq.Context, # type: ignore[type-arg] shell_socket: zmq_anyio.Socket, - get_task_group: Callable[[], TaskGroup], ): assert current_thread() == main_thread() self._context: zmq.Context = context # type: ignore[type-arg] self._shell_socket = shell_socket - self._get_task_group = get_task_group self._cache: dict[str, Subshell] = {} self._lock_cache = Lock() self._lock_shell_socket = Lock() @@ -91,9 +89,9 @@ def close(self) -> None: break self._stop_subshell(subshell) - async def get_control_other_socket(self, task_group: TaskGroup) -> zmq_anyio.Socket: + async def get_control_other_socket(self, thread: BaseThread) -> zmq_anyio.Socket: if not self._control_other_socket.started.is_set(): - task_group.start_soon(self._control_other_socket.start) + thread.start_soon(self._control_other_socket.start) await self._control_other_socket.started.wait() return self._control_other_socket @@ -134,7 +132,7 @@ async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) -> assert current_thread().name == SHELL_CHANNEL_THREAD_NAME if not self._control_shell_channel_socket.started.is_set(): - thread.get_task_group().start_soon(self._control_shell_channel_socket.start) + thread.start_soon(self._control_shell_channel_socket.start) await self._control_shell_channel_socket.started.wait() socket = self._control_shell_channel_socket while True: @@ -200,8 +198,8 @@ async def _create_subshell(self, subshell_task: t.Any) -> str: await self._send_stream.send(subshell_id) address = self._get_inproc_socket_address(subshell_id) - thread.add_task(thread.create_pair_socket, self._context, address) - thread.add_task(subshell_task, subshell_id) + thread.start_soon(partial(thread.create_pair_socket, self._context, address)) + thread.start_soon(partial(subshell_task, subshell_id)) thread.start() return subshell_id diff --git a/ipykernel/thread.py b/ipykernel/thread.py index f55ee7c78..df8fa4122 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -1,9 +1,12 @@ """Base class for threads.""" -import typing as t -from threading import Event, Thread +from __future__ import annotations + +from collections.abc import Awaitable +from queue import Queue +from threading import Thread +from typing import Callable from anyio import create_task_group, run, to_thread -from anyio.abc import TaskGroup CONTROL_THREAD_NAME = "Control" SHELL_CHANNEL_THREAD_NAME = "Shell channel" @@ -17,26 +20,22 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True - self.__stop = Event() - self._tasks_and_args: list[tuple[t.Any, t.Any]] = [] - - def get_task_group(self) -> TaskGroup: - return self._task_group + self._tasks: Queue[Callable[[], Awaitable[None]] | None] = Queue() - def add_task(self, task: t.Any, *args: t.Any) -> None: - # May only add tasks before the thread is started. - self._tasks_and_args.append((task, args)) + def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None: + self._tasks.put(task) - def run(self) -> t.Any: + def run(self) -> None: """Run the thread.""" - return run(self._main) + run(self._main) async def _main(self) -> None: async with create_task_group() as tg: - self._task_group = tg - for task, args in self._tasks_and_args: - tg.start_soon(task, *args) - await to_thread.run_sync(self.__stop.wait) + while True: + task = await to_thread.run_sync(self._tasks.get) + if task is None: + break + tg.start_soon(task) tg.cancel_scope.cancel() def stop(self) -> None: @@ -44,4 +43,4 @@ def stop(self) -> None: This method is threadsafe. """ - self.__stop.set() + self._tasks.put(None) From d9ae3fc75204f6c962440649ab92f8236220e2b5 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 10:50:07 +0100 Subject: [PATCH 03/41] Replace _IOPubThread with BaseThread --- ipykernel/inprocess/socket.py | 6 ++++- ipykernel/iostream.py | 46 +++++++---------------------------- ipykernel/thread.py | 6 ++++- 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/ipykernel/inprocess/socket.py b/ipykernel/inprocess/socket.py index 5a2e0008b..d14d08501 100644 --- a/ipykernel/inprocess/socket.py +++ b/ipykernel/inprocess/socket.py @@ -65,4 +65,8 @@ async def poll(self, timeout=0): return statistics.current_buffer_used != 0 def close(self): - pass + if self.is_shell: + self.in_send_stream.close() + self.in_receive_stream.close() + self.out_send_stream.close() + self.out_receive_stream.close() diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 193342129..02a0e22ac 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -16,14 +16,16 @@ from binascii import b2a_hex from collections import defaultdict, deque from io import StringIO, TextIOBase -from threading import Event, Thread, local +from threading import local from typing import Any, Callable import zmq import zmq_anyio -from anyio import create_task_group, run, sleep, to_thread +from anyio import sleep from jupyter_client.session import extract_header +from .thread import BaseThread + # ----------------------------------------------------------------------------- # Globals # ----------------------------------------------------------------------------- @@ -38,38 +40,6 @@ # ----------------------------------------------------------------------------- -class _IOPubThread(Thread): - """A thread for a IOPub.""" - - def __init__(self, tasks, **kwargs): - """Initialize the thread.""" - super().__init__(name="IOPub", **kwargs) - self._tasks = tasks - self.pydev_do_not_trace = True - self.is_pydev_daemon_thread = True - self.daemon = True - self.__stop = Event() - - def run(self): - """Run the thread.""" - self.name = "IOPub" - run(self._main) - - async def _main(self): - async with create_task_group() as self._task_group: - for task in self._tasks: - self._task_group.start_soon(task) - await to_thread.run_sync(self.__stop.wait) - self._task_group.cancel_scope.cancel() - - def stop(self): - """Stop the thread. - - This method is threadsafe. - """ - self.__stop.set() - - class IOPubThread: """An object for sending IOPub messages in a background thread @@ -109,7 +79,9 @@ def __init__(self, socket: zmq_anyio.Socket, pipe=False): tasks = [self._handle_event, self._run_event_pipe_gc, self.socket.start] if pipe: tasks.append(self._handle_pipe_msgs) - self.thread = _IOPubThread(tasks) + self.thread = BaseThread(name="IOPub", daemon=True) + for task in tasks: + self.thread.start_soon(task) def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" @@ -179,7 +151,7 @@ async def _handle_event(self): event_f = self._events.popleft() event_f() except Exception: - if self.thread.__stop.is_set(): + if self.thread.stopped.is_set(): return raise @@ -211,7 +183,7 @@ async def _handle_pipe_msgs(self): while True: await self._handle_pipe_msg() except Exception: - if self.thread.__stop.is_set(): + if self.thread.stopped.is_set(): return raise diff --git a/ipykernel/thread.py b/ipykernel/thread.py index df8fa4122..4c9edf86b 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -3,7 +3,7 @@ from collections.abc import Awaitable from queue import Queue -from threading import Thread +from threading import Event, Thread from typing import Callable from anyio import create_task_group, run, to_thread @@ -18,6 +18,8 @@ class BaseThread(Thread): def __init__(self, **kwargs): """Initialize the thread.""" super().__init__(**kwargs) + self.started = Event() + self.stopped = Event() self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True self._tasks: Queue[Callable[[], Awaitable[None]] | None] = Queue() @@ -31,6 +33,7 @@ def run(self) -> None: async def _main(self) -> None: async with create_task_group() as tg: + self.started.set() while True: task = await to_thread.run_sync(self._tasks.get) if task is None: @@ -44,3 +47,4 @@ def stop(self) -> None: This method is threadsafe. """ self._tasks.put(None) + self.stopped.set() From 093d959f214836caa62418d5fff5961a65b68e66 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 11:36:11 +0100 Subject: [PATCH 04/41] Fix tests --- pyproject.toml | 3 +++ tests/test_io.py | 8 ++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 89b9ea32d..8d5ce3954 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -192,6 +192,9 @@ filterwarnings= [ # ignore unclosed sqlite in traits "ignore:unclosed database in .trigger_timeout' was never awaited", ] [tool.coverage.report] diff --git a/tests/test_io.py b/tests/test_io.py index 09add95f0..9d14e5f2d 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -67,7 +67,6 @@ async def test_io_isatty(iopub_thread): assert stream.isatty() -@pytest.mark.skip(reason="FIXME") async def test_io_thread(anyio_backend, iopub_thread): thread = iopub_thread thread._setup_pipe_in() @@ -80,8 +79,6 @@ async def test_io_thread(anyio_backend, iopub_thread): thread._really_send([b"hi"]) ctx1.destroy() thread.stop() - thread.close() - thread._really_send(None) async def test_background_socket(anyio_backend, iopub_thread): @@ -197,8 +194,7 @@ async def subprocess_test_echo_watch(): @pytest.mark.anyio() -# @pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") -@pytest.mark.skip(reason="FIXME") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") async def test_echo_watch(ctx): """Test echo on underlying FD while capturing the same FD @@ -218,7 +214,7 @@ async def test_echo_watch(ctx): [ sys.executable, "-c", - f"import {__name__}; {__name__}.subprocess_test_echo_watch()", + f"import {__name__}, anyio; anyio.run({__name__}.subprocess_test_echo_watch)", ], env=env, capture_output=True, From b789ab68dd357475865e8e4e121c1d6aa819529f Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 15:02:25 +0100 Subject: [PATCH 05/41] Allow testing trio --- ipykernel/thread.py | 5 ++++- pyproject.toml | 1 + tests/conftest.py | 10 +++++----- tests/test_async.py | 3 +-- tests/test_eventloop.py | 1 + tests/test_io.py | 23 ++++++++++++----------- 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/ipykernel/thread.py b/ipykernel/thread.py index 4c9edf86b..d853a2adb 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -29,7 +29,10 @@ def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None: def run(self) -> None: """Run the thread.""" - run(self._main) + try: + run(self._main) + except Exception: + pass async def _main(self) -> None: async with create_task_group() as tg: diff --git a/pyproject.toml b/pyproject.toml index 8d5ce3954..244f1155e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -195,6 +195,7 @@ filterwarnings= [ # ignore timeout cancel coroutine not awaited in zmq-anyio "ignore: coroutine 'Poller._apoll..trigger_timeout' was never awaited", + "ignore: Unclosed socket" ] [tool.coverage.report] diff --git a/tests/conftest.py b/tests/conftest.py index fc798e74d..db992b747 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import logging from math import inf +from threading import Event from typing import Any, Callable, no_type_check from unittest.mock import MagicMock @@ -21,11 +22,6 @@ resource = None # type:ignore -@pytest.fixture() -def anyio_backend(): - return "asyncio" - - pytestmark = pytest.mark.anyio @@ -159,6 +155,8 @@ class MockKernel(KernelMixin, Kernel): # type:ignore def __init__(self, *args, **kwargs): self._initialize() self.shell = MagicMock() + self.shell_stop = Event() + self.control_stop = Event() super().__init__(*args, **kwargs) def do_execute( @@ -180,6 +178,8 @@ def do_execute( class MockIPyKernel(KernelMixin, IPythonKernel): # type:ignore def __init__(self, *args, **kwargs): self._initialize() + self.shell_stop = Event() + self.control_stop = Event() super().__init__(*args, **kwargs) diff --git a/tests/test_async.py b/tests/test_async.py index f1d91c5eb..c2dd980b9 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -8,8 +8,6 @@ from .test_message_spec import validate_message from .utils import TIMEOUT, execute, flush_channels, start_new_kernel -pytestmark = pytest.mark.anyio - KC = KM = None @@ -33,6 +31,7 @@ def test_async_await(): @pytest.mark.skipif(os.name == "nt", reason="Cannot interrupt on Windows") +@pytest.mark.parametrize("anyio_backend", ["asyncio"]) # FIXME: %autoawait trio def test_async_interrupt(anyio_backend, request): assert KC is not None assert KM is not None diff --git a/tests/test_eventloop.py b/tests/test_eventloop.py index 62a7f8ba3..fcaa2bde7 100644 --- a/tests/test_eventloop.py +++ b/tests/test_eventloop.py @@ -85,6 +85,7 @@ def do_thing(): @windows_skip +@pytest.mark.parametrize("anyio_backend", ["asyncio"]) def test_asyncio_loop(kernel): def do_thing(): loop.call_later(0.01, loop.stop) diff --git a/tests/test_io.py b/tests/test_io.py index 9d14e5f2d..7b86a5d71 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -29,13 +29,16 @@ def ctx(): @pytest.fixture() async def iopub_thread(ctx): - async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: - thread = IOPubThread(pub) - thread.start() + try: + async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: + thread = IOPubThread(pub) + thread.start() - yield thread - thread.stop() - thread.close() + yield thread + thread.stop() + thread.close() + except Exception: + pass async def test_io_api(iopub_thread): @@ -67,7 +70,7 @@ async def test_io_isatty(iopub_thread): assert stream.isatty() -async def test_io_thread(anyio_backend, iopub_thread): +async def test_io_thread(iopub_thread): thread = iopub_thread thread._setup_pipe_in() msg = [thread._pipe_uuid, b"a"] @@ -81,7 +84,7 @@ async def test_io_thread(anyio_backend, iopub_thread): thread.stop() -async def test_background_socket(anyio_backend, iopub_thread): +async def test_background_socket(iopub_thread): sock = BackgroundSocket(iopub_thread) assert sock.__class__ == BackgroundSocket with warnings.catch_warnings(): @@ -92,7 +95,7 @@ async def test_background_socket(anyio_backend, iopub_thread): sock.send(b"hi") -async def test_outstream(anyio_backend, iopub_thread): +async def test_outstream(iopub_thread): session = Session() pub = iopub_thread.socket @@ -118,7 +121,6 @@ async def test_outstream(anyio_backend, iopub_thread): assert stream.writable() -@pytest.mark.anyio() async def test_event_pipe_gc(iopub_thread): session = Session(key=b"abc") stream = OutStream( @@ -193,7 +195,6 @@ async def subprocess_test_echo_watch(): iopub_thread.close() -@pytest.mark.anyio() @pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") async def test_echo_watch(ctx): """Test echo on underlying FD while capturing the same FD From 18a84673d030c47775b860520e7d8ac69cf3df97 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 20 Nov 2024 11:12:13 +0100 Subject: [PATCH 06/41] Remove pytest-asyncio from test dependencies --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 244f1155e..faaa9563b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,6 @@ test = [ "pre-commit", "pytest-timeout", "trio", - "pytest-asyncio>=0.23.5", ] cov = [ "coverage[toml]", From 2d839167116603d7abe52b221d3d12401764d2dc Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 20 Nov 2024 11:30:46 +0100 Subject: [PATCH 07/41] Use selector thread from anyio --- .github/workflows/ci.yml | 6 +++++- pyproject.toml | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 98deaf12d..31cd72d33 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,7 +61,11 @@ jobs: timeout-minutes: 15 if: ${{ startsWith(matrix.os, 'windows') }} run: | - hatch run cov:nowarn || hatch run test:nowarn --lf + hatch run test:pip install git+https://github.com/davidbrochart/zmq-anyio.git@anyio-selector-thread#egg=zmq_anyio --ignore-installed + hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed + hatch run test:pip list + hatch run test:python --version + hatch run test:pytest -v - name: Check Launcher run: | diff --git a/pyproject.toml b/pyproject.toml index faaa9563b..bbdde9aeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -321,3 +321,6 @@ ignore = ["W002"] [tool.repo-review] ignore = ["PY007", "PP308", "GH102", "MY101"] + +[tool.hatch.metadata] +allow-direct-references = true From 19ebe89cebbd6f9d4b9c0083d8bd1301ab2927a4 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 20 Nov 2024 15:42:19 +0100 Subject: [PATCH 08/41] Test more Python versions on Windows --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 31cd72d33..4e3290f14 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,12 @@ jobs: python-version: "3.11" - os: ubuntu-latest python-version: "3.12" + - os: windows-latest + python-version: "3.10" + - os: windows-latest + python-version: "3.11" + - os: windows-latest + python-version: "3.12" steps: - name: Checkout uses: actions/checkout@v4 From 5bd6f365423264ac610bb6eac7a0fd68afe95680 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 1 Dec 2024 15:55:53 +0100 Subject: [PATCH 09/41] Use anyio's alternate selector thread --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e3290f14..4deab6cc0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -68,7 +68,8 @@ jobs: if: ${{ startsWith(matrix.os, 'windows') }} run: | hatch run test:pip install git+https://github.com/davidbrochart/zmq-anyio.git@anyio-selector-thread#egg=zmq_anyio --ignore-installed - hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed + # hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed + hatch run test:pip install git+https://github.com/agronholm/anyio.git@selector-thread-alternate#egg=anyio --ignore-installed hatch run test:pip list hatch run test:python --version hatch run test:pytest -v From d91abc7212ecbbac52b4d4e01ef298a92216dbd3 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 1 Dec 2024 19:56:40 +0100 Subject: [PATCH 10/41] - --- ipykernel/kernelbase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index fcceef261..b837c2a52 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -382,7 +382,7 @@ async def shell_channel_thread_main(self): subshell_id ) assert socket is not None - socket.send_multipart(msg, copy=False) + await socket.asend_multipart(msg, copy=False) except Exception: self.log.error("Invalid message", exc_info=True) # noqa: G201 except BaseException: From ed4b68203d80c5a3198355683aaff9e2ea807a6c Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 1 Dec 2024 20:11:39 +0100 Subject: [PATCH 11/41] - --- ipykernel/kernelbase.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index b837c2a52..ed78fc21b 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -366,7 +366,7 @@ async def shell_channel_thread_main(self): assert self.shell_channel_thread is not None assert threading.current_thread() == self.shell_channel_thread - async with self.shell_socket: + async with self.shell_socket, create_task_group() as tg: try: while True: msg = await self.shell_socket.arecv_multipart(copy=False) @@ -382,6 +382,8 @@ async def shell_channel_thread_main(self): subshell_id ) assert socket is not None + if not socket.started.is_set(): + await tg.start(socket.start) await socket.asend_multipart(msg, copy=False) except Exception: self.log.error("Invalid message", exc_info=True) # noqa: G201 From 4b73bc197de7be78eb5dda754cc8236282cd8f14 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 5 Dec 2024 17:29:04 +0100 Subject: [PATCH 12/41] - --- .github/workflows/ci.yml | 3 --- pyproject.toml | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4deab6cc0..0cbf78416 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,9 +67,6 @@ jobs: timeout-minutes: 15 if: ${{ startsWith(matrix.os, 'windows') }} run: | - hatch run test:pip install git+https://github.com/davidbrochart/zmq-anyio.git@anyio-selector-thread#egg=zmq_anyio --ignore-installed - # hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed - hatch run test:pip install git+https://github.com/agronholm/anyio.git@selector-thread-alternate#egg=anyio --ignore-installed hatch run test:pip list hatch run test:python --version hatch run test:pytest -v diff --git a/pyproject.toml b/pyproject.toml index bbdde9aeb..4eee5fc7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "psutil>=5.7", "packaging>=22", "anyio>=4.2.0", - "zmq-anyio >=0.2.3", + "zmq-anyio >=0.2.4", ] [project.urls] From 333f71ce1bd14d2aa51683a4ed5f0f614413deea Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 09:54:16 +0100 Subject: [PATCH 13/41] - --- ipykernel/kernelbase.py | 2 +- ipykernel/subshell_manager.py | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index ed78fc21b..3ac671303 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -556,7 +556,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: manager = self.shell_channel_thread.manager self.shell_channel_thread.start_soon(self.shell_channel_thread_main) self.shell_channel_thread.start_soon( - partial(manager.listen_from_control, self.shell_main, self.shell_channel_thread) + partial(manager.listen_from_control, self.shell_main) ) self.shell_channel_thread.start_soon(manager.listen_from_subshells) self.shell_channel_thread.start() diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 505c2f408..2636d1575 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -125,20 +125,18 @@ def list_subshell(self) -> list[str]: with self._lock_cache: return list(self._cache) - async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) -> None: + async def listen_from_control(self, subshell_task: t.Any) -> None: """Listen for messages on the control inproc socket, handle those messages and return replies on the same socket. Runs in the shell channel thread. """ assert current_thread().name == SHELL_CHANNEL_THREAD_NAME - if not self._control_shell_channel_socket.started.is_set(): - thread.start_soon(self._control_shell_channel_socket.start) - await self._control_shell_channel_socket.started.wait() socket = self._control_shell_channel_socket - while True: - request = await socket.arecv_json() - reply = await self._process_control_request(request, subshell_task) - await socket.asend_json(reply) + async with socket: + while True: + request = await socket.arecv_json() + reply = await self._process_control_request(request, subshell_task) + await socket.asend_json(reply) async def listen_from_subshells(self) -> None: """Listen for reply messages on inproc sockets of all subshells and resend From fe2be51d138f1d05ac34df9621b3051bfc1a3632 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 11:18:30 +0100 Subject: [PATCH 14/41] - --- ipykernel/subshell_manager.py | 2 +- ipykernel/thread.py | 34 +++++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 2636d1575..b9dea456d 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -91,7 +91,7 @@ def close(self) -> None: async def get_control_other_socket(self, thread: BaseThread) -> zmq_anyio.Socket: if not self._control_other_socket.started.is_set(): - thread.start_soon(self._control_other_socket.start) + thread.task_group.start_soon(self._control_other_socket.start) await self._control_other_socket.started.wait() return self._control_other_socket diff --git a/ipykernel/thread.py b/ipykernel/thread.py index d853a2adb..dc68bb3bf 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -4,9 +4,10 @@ from collections.abc import Awaitable from queue import Queue from threading import Event, Thread -from typing import Callable +from typing import Any, Callable from anyio import create_task_group, run, to_thread +from anyio.abc import TaskGroup CONTROL_THREAD_NAME = "Control" SHELL_CHANNEL_THREAD_NAME = "Shell channel" @@ -22,10 +23,23 @@ def __init__(self, **kwargs): self.stopped = Event() self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True - self._tasks: Queue[Callable[[], Awaitable[None]] | None] = Queue() + self._tasks: Queue[tuple[str, Callable[[], Awaitable[Any]]] | None] = Queue() + self._result: Queue[Any] = Queue() - def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None: - self._tasks.put(task) + @property + def task_group(self) -> TaskGroup: + return self._task_group + + def start_soon(self, coro: Callable[[], Awaitable[Any]]) -> None: + self._tasks.put(("start_soon", coro)) + + def run_async(self, coro: Callable[[], Awaitable[Any]]) -> Any: + self._tasks.put(("run_async", coro)) + return self._result.get() + + def run_sync(self, func: Callable[..., Any]) -> Any: + self._tasks.put(("run_sync", func)) + return self._result.get() def run(self) -> None: """Run the thread.""" @@ -36,12 +50,22 @@ def run(self) -> None: async def _main(self) -> None: async with create_task_group() as tg: + self._task_group = tg self.started.set() while True: task = await to_thread.run_sync(self._tasks.get) if task is None: break - tg.start_soon(task) + func, arg = task + if func == "start_soon": + tg.start_soon(arg) + elif func == "run_async": + res = await arg + self._result.put(res) + else: # func == "run_sync" + res = arg() + self._result.put(res) + tg.cancel_scope.cancel() def stop(self) -> None: From e125d6b356ca43f6ec89970cd5b2c60e3402d1f2 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 13:42:41 +0100 Subject: [PATCH 15/41] - --- ipykernel/subshell_manager.py | 40 ++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index b9dea456d..2120abe13 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -56,15 +56,39 @@ def __init__( # Inproc pair sockets for control channel and main shell (parent subshell). # Each inproc pair has a "shell_channel" socket used in the shell channel # thread, and an "other" socket used in the other thread. - self._control_shell_channel_socket = self._create_inproc_pair_socket("control", True) - self._control_other_socket = self._create_inproc_pair_socket("control", False) - self._parent_shell_channel_socket = self._create_inproc_pair_socket(None, True) - self._parent_other_socket = self._create_inproc_pair_socket(None, False) + self.__control_shell_channel_socket: zmq_anyio.Socket | None = None + self.__control_other_socket: zmq_anyio.Socket | None = None + self.__parent_shell_channel_socket: zmq_anyio.Socket | None = None + self.__parent_other_socket: zmq_anyio.Socket | None = None # anyio memory object stream for async queue-like communication between tasks. # Used by _create_subshell to tell listen_from_subshells to spawn a new task. self._send_stream, self._receive_stream = create_memory_object_stream[str]() + @property + def _control_shell_channel_socket(self) -> zmq_anyio.Socket: + if self.__control_shell_channel_socket is None: + self.__control_shell_channel_socket = self._create_inproc_pair_socket("control", True) + return self.__control_shell_channel_socket + + @property + def _control_other_socket(self) -> zmq_anyio.Socket: + if self.__control_other_socket is None: + self.__control_other_socket = self._create_inproc_pair_socket("control", False) + return self.__control_other_socket + + @property + def _parent_shell_channel_socket(self) -> zmq_anyio.Socket: + if self.__parent_shell_channel_socket is None: + self.__parent_shell_channel_socket = self._create_inproc_pair_socket(None, True) + return self.__parent_shell_channel_socket + + @property + def _parent_other_socket(self) -> zmq_anyio.Socket: + if self.__parent_other_socket is None: + self.__parent_other_socket = self._create_inproc_pair_socket(None, False) + return self.__parent_other_socket + def close(self) -> None: """Stop all subshells and close all resources.""" assert current_thread().name == SHELL_CHANNEL_THREAD_NAME @@ -73,10 +97,10 @@ def close(self) -> None: self._receive_stream.close() for socket in ( - self._control_shell_channel_socket, - self._control_other_socket, - self._parent_shell_channel_socket, - self._parent_other_socket, + self.__control_shell_channel_socket, + self.__control_other_socket, + self.__parent_shell_channel_socket, + self.__parent_other_socket, ): if socket is not None: socket.close() From 1a3f0f68e956c983608ed725d85484c41390ca57 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 16:16:14 +0100 Subject: [PATCH 16/41] - --- tests/test_kernel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 2b379eb83..615acbed1 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -62,7 +62,7 @@ def test_simple_print(): def test_print_to_correct_cell_from_thread(): """should print to the cell that spawned the thread, not a subsequently run cell""" iterations = 5 - interval = 0.25 + interval = 1 code = f"""\ from threading import Thread from time import sleep @@ -94,7 +94,7 @@ def thread_target(): def test_print_to_correct_cell_from_child_thread(): """should print to the cell that spawned the thread, not a subsequently run cell""" iterations = 5 - interval = 0.25 + interval = 1 code = f"""\ from threading import Thread from time import sleep @@ -132,7 +132,7 @@ def parent_target(): def test_print_to_correct_cell_from_asyncio(): """should print to the cell that scheduled the task, not a subsequently run cell""" iterations = 5 - interval = 0.25 + interval = 1 code = f"""\ import asyncio From 30ee9f51a54c6dfd2d7d23bb2058ad515006adfb Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 16:40:37 +0100 Subject: [PATCH 17/41] - --- tests/test_kernel.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 615acbed1..d8cd7ac00 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -83,6 +83,8 @@ def thread_target(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue + print(f"{thread_msg_id=}") + print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) @@ -121,6 +123,8 @@ def parent_target(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue + print(f"{thread_msg_id=}") + print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) @@ -153,6 +157,8 @@ async def async_task(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue + print(f"{thread_msg_id=}") + print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) From 7137e7a3adbb3837855cd93f9cefe5adab7978bd Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 17:49:27 +0100 Subject: [PATCH 18/41] Enable tracemalloc --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0cbf78416..172e8a6f9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - hatch run cov:test --cov-fail-under 50 || hatch run test:test --lf + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 || PYTHONTRACEMALLOC=20 hatch run test:test --lf - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - hatch run test:nowarn || hatch run test:nowarn --lf + PYTHONTRACEMALLOC=20 hatch run test:nowarn || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf - name: Run the tests on Windows timeout-minutes: 15 @@ -69,7 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:pytest -v + PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher run: | @@ -152,7 +152,7 @@ jobs: - name: Run the tests timeout-minutes: 15 - run: pytest -W default -vv || pytest --vv -W default --lf + run: PYTHONTRACEMALLOC=20 pytest -W default -vv || PYTHONTRACEMALLOC=20 pytest --vv -W default --lf test_miniumum_versions: name: Test Minimum Versions From 7b4abc2a01af9ed45b8452b7c5390d1b96746a46 Mon Sep 17 00:00:00 2001 From: M Bussonnier Date: Tue, 17 Dec 2024 20:47:13 +0100 Subject: [PATCH 19/41] Update .github/workflows/ci.yml --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 172e8a6f9..5ecd46101 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,6 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version + hatch run test:python -m pip install pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher From 142e033cf7cde2acc98c024b83b2fbe6ec88854d Mon Sep 17 00:00:00 2001 From: M Bussonnier Date: Tue, 17 Dec 2024 20:55:51 +0100 Subject: [PATCH 20/41] Update .github/workflows/ci.yml --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ecd46101..1e7636d32 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:python -m pip install pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 + hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher From 529681a6eef32c8444e6d9b684d72b9cf23bd1f9 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 10:13:43 +0100 Subject: [PATCH 21/41] - --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e7636d32..4bea26fc6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 || PYTHONTRACEMALLOC=20 hatch run test:test --lf + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - PYTHONTRACEMALLOC=20 hatch run test:nowarn || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf + PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell - name: Run the tests on Windows timeout-minutes: 15 @@ -69,8 +69,8 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 - PYTHONTRACEMALLOC=20 hatch run test:pytest -v + #hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 + PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell - name: Check Launcher run: | From 0f8725b2983b1fe67826ae94d4361ae7c178724d Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 10:31:12 +0100 Subject: [PATCH 22/41] - --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4bea26fc6..2c7aefbb4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell_from_child_thread - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell + PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell_from_child_thread - name: Run the tests on Windows timeout-minutes: 15 @@ -70,7 +70,7 @@ jobs: hatch run test:pip list hatch run test:python --version #hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 - PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell + PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell_from_child_thread - name: Check Launcher run: | From 7d4206902082679996c31d554afdcc7d07e16cb0 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 13:25:27 +0100 Subject: [PATCH 23/41] - --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c7aefbb4..6bde2ef77 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell_from_child_thread + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 || PYTHONTRACEMALLOC=20 hatch run test:test --lf - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell_from_child_thread + PYTHONTRACEMALLOC=20 hatch run test:nowarn || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf - name: Run the tests on Windows timeout-minutes: 15 @@ -70,7 +70,7 @@ jobs: hatch run test:pip list hatch run test:python --version #hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 - PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell_from_child_thread + PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher run: | From 6d404fd413dd7cb279b018186afc56187897e953 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 15:40:04 +0100 Subject: [PATCH 24/41] - --- tests/test_io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_io.py b/tests/test_io.py index 7b86a5d71..9615538f5 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -35,8 +35,9 @@ async def iopub_thread(ctx): thread.start() yield thread - thread.stop() + thread.close() + thread.stop() except Exception: pass From 87afdccdb013b120bcdbe2a1f213a0bc3db5510c Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 16:06:53 +0100 Subject: [PATCH 25/41] - --- tests/test_io.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/test_io.py b/tests/test_io.py index 9615538f5..aca2694ee 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -29,17 +29,11 @@ def ctx(): @pytest.fixture() async def iopub_thread(ctx): - try: - async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: - thread = IOPubThread(pub) - thread.start() - - yield thread + async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: + thread = IOPubThread(pub) + thread.start() - thread.close() - thread.stop() - except Exception: - pass + yield thread async def test_io_api(iopub_thread): From b5d75420826744e694db4e2c92082ab17be410db Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 10 Feb 2025 09:02:17 +0100 Subject: [PATCH 26/41] - --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4eee5fc7d..f1df518df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,8 +33,9 @@ dependencies = [ "pyzmq>=26.0", "psutil>=5.7", "packaging>=22", - "anyio>=4.2.0", + "anyio>=4.8.0,<5.0.0", "zmq-anyio >=0.2.4", + "anyioutils >=0.5.0,<0.6.0", ] [project.urls] From 3a14e8365613e18623a188c20b21e858fc82c4a1 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 10 Feb 2025 09:45:28 +0100 Subject: [PATCH 27/41] - --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f1df518df..26b50e58b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,8 +34,7 @@ dependencies = [ "psutil>=5.7", "packaging>=22", "anyio>=4.8.0,<5.0.0", - "zmq-anyio >=0.2.4", - "anyioutils >=0.5.0,<0.6.0", + "zmq-anyio >=0.2.5", ] [project.urls] From eefa5768345e7e7b4be72d11b6603951b1c27a89 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 11 Feb 2025 10:56:09 +0100 Subject: [PATCH 28/41] Use zmq-anyio v0.3.0 --- ipykernel/debugger.py | 4 ++-- ipykernel/inprocess/session.py | 2 +- ipykernel/iostream.py | 4 ++-- ipykernel/ipkernel.py | 2 +- ipykernel/kernelbase.py | 24 ++++++++++++------------ ipykernel/subshell_manager.py | 8 ++++---- pyproject.toml | 2 +- tests/test_io.py | 4 ++-- tests/test_kernel.py | 6 ------ 9 files changed, 25 insertions(+), 31 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 36aced05d..48d42cc7c 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -241,7 +241,7 @@ async def _send_request(self, msg): self.log.debug("DEBUGPYCLIENT:") self.log.debug(self.routing_id) self.log.debug(buf) - await self.debugpy_socket.asend_multipart((self.routing_id, buf)) + await self.debugpy_socket.asend_multipart((self.routing_id, buf)).wait() async def _wait_for_response(self): # Since events are never pushed to the message_queue @@ -437,7 +437,7 @@ async def start(self): (self.shell_socket.getsockopt(ROUTING_ID)), ) - msg = await self.shell_socket.arecv_multipart() + msg = await self.shell_socket.arecv_multipart().wait() ident, msg = self.session.feed_identities(msg, copy=True) try: msg = self.session.deserialize(msg, content=True, copy=True) diff --git a/ipykernel/inprocess/session.py b/ipykernel/inprocess/session.py index 70b135742..8f92f1326 100644 --- a/ipykernel/inprocess/session.py +++ b/ipykernel/inprocess/session.py @@ -3,7 +3,7 @@ class Session(_Session): async def recv(self, socket, copy=True): - return await socket.arecv_multipart() + return await socket.arecv_multipart().wait() def send( self, diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 02a0e22ac..398ea1a0e 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -143,7 +143,7 @@ async def _handle_event(self): async with pipe_in: try: while True: - await pipe_in.arecv() + await pipe_in.arecv().wait() # freeze event count so new writes don't extend the queue # while we are processing n_events = len(self._events) @@ -189,7 +189,7 @@ async def _handle_pipe_msgs(self): async def _handle_pipe_msg(self, msg=None): """handle a pipe message from a subprocess""" - msg = msg or await self._pipe_in1.arecv_multipart() + msg = msg or await self._pipe_in1.arecv_multipart().wait() if not self._pipe_flag or not self._is_main_process(): return if msg[0] != self._pipe_uuid: diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index d8d2ba5d5..ff421685e 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -236,7 +236,7 @@ async def receive_debugpy_message(self, msg=None): if msg is None: assert self.debugpy_socket is not None - msg = await self.debugpy_socket.arecv_multipart() + msg = await self.debugpy_socket.arecv_multipart().wait() # The first frame is the socket id, we can drop it frame = msg[1].decode("utf-8") self.log.debug("Debugpy received: %s", frame) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 3ac671303..cb03b4d4f 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -269,7 +269,7 @@ async def process_control_message(self, msg=None): assert self.session is not None assert self.control_thread is None or threading.current_thread() == self.control_thread - msg = msg or await self.control_socket.arecv_multipart() + msg = msg or await self.control_socket.arecv_multipart().wait() idents, msg = self.session.feed_identities(msg) try: msg = self.session.deserialize(msg, content=True) @@ -369,7 +369,7 @@ async def shell_channel_thread_main(self): async with self.shell_socket, create_task_group() as tg: try: while True: - msg = await self.shell_socket.arecv_multipart(copy=False) + msg = await self.shell_socket.arecv_multipart(copy=False).wait() # deserialize only the header to get subshell_id # Keep original message to send to subshell_id unmodified. _, msg2 = self.session.feed_identities(msg, copy=False) @@ -384,7 +384,7 @@ async def shell_channel_thread_main(self): assert socket is not None if not socket.started.is_set(): await tg.start(socket.start) - await socket.asend_multipart(msg, copy=False) + socket.asend_multipart(msg, copy=False) except Exception: self.log.error("Invalid message", exc_info=True) # noqa: G201 except BaseException: @@ -444,8 +444,8 @@ async def process_shell_message(self, msg=None, socket=None): assert socket is None socket = self.shell_socket - no_msg = msg is None if self._is_test else not await socket.apoll(0) - msg = msg or await socket.arecv_multipart(copy=False) + no_msg = msg is None if self._is_test else not await socket.apoll(0).wait() + msg = msg or await socket.arecv_multipart(copy=False).wait() received_time = time.monotonic() copy = not isinstance(msg[0], zmq.Message) @@ -499,7 +499,7 @@ async def process_shell_message(self, msg=None, socket=None): try: result = handler(socket, idents, msg) if inspect.isawaitable(result): - result = await result + await result except Exception: self.log.error("Exception in message handler:", exc_info=True) # noqa: G201 except KeyboardInterrupt: @@ -1090,8 +1090,8 @@ async def create_subshell_request(self, socket, ident, parent) -> None: other_socket = await self.shell_channel_thread.manager.get_control_other_socket( self.control_thread ) - await other_socket.asend_json({"type": "create"}) - reply = await other_socket.arecv_json() + await other_socket.asend_json({"type": "create"}).wait() + reply = await other_socket.arecv_json().wait() self.session.send(socket, "create_subshell_reply", reply, parent, ident) @@ -1114,8 +1114,8 @@ async def delete_subshell_request(self, socket, ident, parent) -> None: other_socket = await self.shell_channel_thread.manager.get_control_other_socket( self.control_thread ) - await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id}) - reply = await other_socket.arecv_json() + await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id}).wait() + reply = await other_socket.arecv_json().wait() self.session.send(socket, "delete_subshell_reply", reply, parent, ident) @@ -1131,8 +1131,8 @@ async def list_subshell_request(self, socket, ident, parent) -> None: other_socket = await self.shell_channel_thread.manager.get_control_other_socket( self.control_thread ) - await other_socket.asend_json({"type": "list"}) - reply = await other_socket.arecv_json() + await other_socket.asend_json({"type": "list"}).wait() + reply = await other_socket.arecv_json().wait() self.session.send(socket, "list_subshell_reply", reply, parent, ident) diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 2120abe13..88c3e9b67 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -158,9 +158,9 @@ async def listen_from_control(self, subshell_task: t.Any) -> None: socket = self._control_shell_channel_socket async with socket: while True: - request = await socket.arecv_json() + request = await socket.arecv_json().wait() reply = await self._process_control_request(request, subshell_task) - await socket.asend_json(reply) + await socket.asend_json(reply).wait() async def listen_from_subshells(self) -> None: """Listen for reply messages on inproc sockets of all subshells and resend @@ -270,9 +270,9 @@ async def _listen_for_subshell_reply( await shell_channel_socket.started.wait() try: while True: - msg = await shell_channel_socket.arecv_multipart(copy=False) + msg = await shell_channel_socket.arecv_multipart(copy=False).wait() with self._lock_shell_socket: - await self._shell_socket.asend_multipart(msg) + await self._shell_socket.asend_multipart(msg).wait() except BaseException: if not self._is_subshell(subshell_id): # Subshell no longer exists so exit gracefully diff --git a/pyproject.toml b/pyproject.toml index 26b50e58b..27ea2b6da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "psutil>=5.7", "packaging>=22", "anyio>=4.8.0,<5.0.0", - "zmq-anyio >=0.2.5", + "zmq-anyio >=0.3.0", ] [project.urls] diff --git a/tests/test_io.py b/tests/test_io.py index aca2694ee..c88911fa4 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -221,8 +221,8 @@ async def test_echo_watch(ctx): print(f"{p.stdout=}") print(f"{p.stderr}=", file=sys.stderr) assert p.returncode == 0 - while await s.apoll(timeout=100): - msg = await s.arecv_multipart() + while await s.apoll(timeout=100).wait(): + msg = await s.arecv_multipart().wait() ident, msg = session.feed_identities(msg, copy=True) msg = session.deserialize(msg, content=True, copy=True) assert msg is not None # for type narrowing diff --git a/tests/test_kernel.py b/tests/test_kernel.py index d8cd7ac00..615acbed1 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -83,8 +83,6 @@ def thread_target(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue - print(f"{thread_msg_id=}") - print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) @@ -123,8 +121,6 @@ def parent_target(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue - print(f"{thread_msg_id=}") - print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) @@ -157,8 +153,6 @@ async def async_task(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue - print(f"{thread_msg_id=}") - print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) From b55589244926cbc88cecddd419094b3beaa0b188 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 12 Feb 2025 13:14:32 +0100 Subject: [PATCH 29/41] Workwaround for not suspending AnyIO socket selector thread --- ipykernel/kernelapp.py | 16 +++++++++++++--- ipykernel/kernelbase.py | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 055c4bf8e..e8c0a4805 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -357,7 +357,7 @@ def init_control(self, context): self.control_port = self._bind_socket(self.control_socket, self.control_port) self.log.debug("control ROUTER Channel on port: %i" % self.control_port) - self.debugpy_socket = zmq_anyio.Socket(context, zmq.STREAM) + self.debugpy_socket = zmq_anyio.Socket(context.socket(zmq.STREAM)) self.debugpy_socket.linger = 1000 self.debug_shell_socket = zmq_anyio.Socket(context.socket(zmq.DEALER)) @@ -701,14 +701,24 @@ def start(self) -> None: if self.poller is not None: self.poller.start() backend = "trio" if self.trio_loop else "asyncio" - run(self.main, backend=backend) + run(partial(self.main, backend), backend=backend) return async def _wait_to_enter_eventloop(self): await self.kernel._eventloop_set.wait() await self.kernel.enter_eventloop() - async def main(self): + async def main(self, backend: str): + if backend == "asyncio" and sys.platform == "win32": + import asyncio + + policy = asyncio.get_event_loop_policy() + if policy.__class__.__name__ == "WindowsProactorEventLoopPolicy": + from anyio._core._asyncio_selector_thread import get_selector + selector = get_selector() + selector._thread.pydev_do_not_trace = True + #selector._thread.is_pydev_daemon_thread = True + async with create_task_group() as tg: tg.start_soon(self._wait_to_enter_eventloop) tg.start_soon(self.kernel.start) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index cb03b4d4f..71925e4ec 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -384,7 +384,7 @@ async def shell_channel_thread_main(self): assert socket is not None if not socket.started.is_set(): await tg.start(socket.start) - socket.asend_multipart(msg, copy=False) + await socket.asend_multipart(msg, copy=False).wait() except Exception: self.log.error("Invalid message", exc_info=True) # noqa: G201 except BaseException: From c217f9d869558f714da1f286b632b7034f39e055 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 12 Feb 2025 12:17:23 +0000 Subject: [PATCH 30/41] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipykernel/kernelapp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index e8c0a4805..743b9bc03 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -715,9 +715,10 @@ async def main(self, backend: str): policy = asyncio.get_event_loop_policy() if policy.__class__.__name__ == "WindowsProactorEventLoopPolicy": from anyio._core._asyncio_selector_thread import get_selector + selector = get_selector() selector._thread.pydev_do_not_trace = True - #selector._thread.is_pydev_daemon_thread = True + # selector._thread.is_pydev_daemon_thread = True async with create_task_group() as tg: tg.start_soon(self._wait_to_enter_eventloop) From 871da2b896a88bb60f02bd409f1a0b56553fd89c Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 12 Feb 2025 15:31:59 +0100 Subject: [PATCH 31/41] Update .github/workflows/ci.yml Co-authored-by: M Bussonnier --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6bde2ef77..1a55cc829 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - #hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 + hatch run test:python -m pip install git+https://github.com/ipython/ipython@f9899071478fab03d9c7cf5c982141f2fa0af7d7 PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher From 70dd1077728b934905894278d42df2e200a4122d Mon Sep 17 00:00:00 2001 From: M Bussonnier Date: Wed, 12 Feb 2025 18:40:43 +0100 Subject: [PATCH 32/41] Update .github/workflows/ci.yml --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a55cc829..f668c8c5c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:python -m pip install git+https://github.com/ipython/ipython@f9899071478fab03d9c7cf5c982141f2fa0af7d7 + hatch run test:python -m pip install git+https://github.com/ipython/ipython@ea303f8330fddbc786e1fcbf31e881feb7a606b3 PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher From 5f5fb4722e2653e8e9bd143321bbe396b03d5e92 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 13 Feb 2025 09:21:15 +0100 Subject: [PATCH 33/41] Fix iopub_thread fixture --- .github/workflows/ci.yml | 2 +- tests/test_io.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f668c8c5c..ddd9330f9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:python -m pip install git+https://github.com/ipython/ipython@ea303f8330fddbc786e1fcbf31e881feb7a606b3 + # hatch run test:python -m pip install git+https://github.com/ipython/ipython@ea303f8330fddbc786e1fcbf31e881feb7a606b3 PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher diff --git a/tests/test_io.py b/tests/test_io.py index c88911fa4..bedb368c3 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -10,6 +10,7 @@ from concurrent.futures import Future, ThreadPoolExecutor from unittest import mock +from anyio import create_task_group import pytest import zmq import zmq_anyio @@ -29,12 +30,18 @@ def ctx(): @pytest.fixture() async def iopub_thread(ctx): - async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: + async with create_task_group() as tg: + pub = zmq_anyio.Socket(ctx.socket(zmq.PUB)) + await tg.start(pub.start) thread = IOPubThread(pub) thread.start() yield thread + await pub.stop() + thread.stop() + thread.close() + async def test_io_api(iopub_thread): """Test that wrapped stdout has the same API as a normal TextIO object""" From 9ab982f5e27f98817f8da5ee444c9698135d308b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 13 Feb 2025 08:22:12 +0000 Subject: [PATCH 34/41] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_io.py b/tests/test_io.py index bedb368c3..c900b383f 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -10,10 +10,10 @@ from concurrent.futures import Future, ThreadPoolExecutor from unittest import mock -from anyio import create_task_group import pytest import zmq import zmq_anyio +from anyio import create_task_group from jupyter_client.session import Session from ipykernel.iostream import _PARENT, BackgroundSocket, IOPubThread, OutStream From 92975d93bbc0c227bfd14ba5d2f63dedfcf94bc5 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 13 Feb 2025 09:35:14 +0100 Subject: [PATCH 35/41] - --- tests/test_io.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_io.py b/tests/test_io.py index c900b383f..7dc6be417 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -38,9 +38,12 @@ async def iopub_thread(ctx): yield thread - await pub.stop() - thread.stop() - thread.close() + try: + await pub.stop() + thread.stop() + thread.close() + except Exception: + pass async def test_io_api(iopub_thread): From 0919954cf972b0024650d76056cd6757f07698ee Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 13 Feb 2025 09:42:54 +0100 Subject: [PATCH 36/41] - --- tests/test_io.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/test_io.py b/tests/test_io.py index 7dc6be417..1c2b10695 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -30,20 +30,20 @@ def ctx(): @pytest.fixture() async def iopub_thread(ctx): - async with create_task_group() as tg: - pub = zmq_anyio.Socket(ctx.socket(zmq.PUB)) - await tg.start(pub.start) - thread = IOPubThread(pub) - thread.start() + try: + async with create_task_group() as tg: + pub = zmq_anyio.Socket(ctx.socket(zmq.PUB)) + await tg.start(pub.start) + thread = IOPubThread(pub) + thread.start() - yield thread + yield thread - try: await pub.stop() thread.stop() thread.close() - except Exception: - pass + except Exception: + pass async def test_io_api(iopub_thread): From 8b57cd154f5e79c3ef66cd733d9af2354a2f128b Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 13 Feb 2025 10:56:02 +0100 Subject: [PATCH 37/41] - --- tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_io.py b/tests/test_io.py index 1c2b10695..a5b34f4aa 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -42,7 +42,7 @@ async def iopub_thread(ctx): await pub.stop() thread.stop() thread.close() - except Exception: + except BaseException: pass From 55e0f31d487b80b4a43a7be006fd7fb820a77698 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 13 Feb 2025 11:21:38 +0100 Subject: [PATCH 38/41] Remove timeout --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ddd9330f9..6e06652ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,19 +52,19 @@ jobs: python -m pip install hatch - name: Run the tests - timeout-minutes: 15 + # timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 || PYTHONTRACEMALLOC=20 hatch run test:test --lf - name: Run the tests on pypy - timeout-minutes: 15 + # timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | PYTHONTRACEMALLOC=20 hatch run test:nowarn || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf - name: Run the tests on Windows - timeout-minutes: 15 + # timeout-minutes: 15 if: ${{ startsWith(matrix.os, 'windows') }} run: | hatch run test:pip list @@ -152,7 +152,7 @@ jobs: pip freeze - name: Run the tests - timeout-minutes: 15 + # timeout-minutes: 15 run: PYTHONTRACEMALLOC=20 pytest -W default -vv || PYTHONTRACEMALLOC=20 pytest --vv -W default --lf test_miniumum_versions: From 3c3bc57373631127ab095027f7bb9d6c73f59045 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 13 Feb 2025 12:19:07 +0100 Subject: [PATCH 39/41] Fix test_embed_kernel --- tests/test_embed_kernel.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/tests/test_embed_kernel.py b/tests/test_embed_kernel.py index 685824071..800213baf 100644 --- a/tests/test_embed_kernel.py +++ b/tests/test_embed_kernel.py @@ -109,7 +109,10 @@ def test_embed_kernel_basic(): with setup_kernel(cmd) as client: # oinfo a (int) client.inspect("a") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert content["found"] @@ -145,7 +148,10 @@ def test_embed_kernel_namespace(): with setup_kernel(cmd) as client: # oinfo a (int) client.inspect("a") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert content["found"] text = content["data"]["text/plain"] @@ -153,7 +159,10 @@ def test_embed_kernel_namespace(): # oinfo b (str) client.inspect("b") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert content["found"] text = content["data"]["text/plain"] @@ -161,7 +170,10 @@ def test_embed_kernel_namespace(): # oinfo c (undefined) client.inspect("c") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert not content["found"] @@ -186,7 +198,10 @@ def test_embed_kernel_reentrant(): with setup_kernel(cmd) as client: for i in range(5): client.inspect("count") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert content["found"] text = content["data"]["text/plain"] From 1b1ec8028a8a11d93882d85053487ead7a83f2e1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 13 Feb 2025 11:23:17 +0000 Subject: [PATCH 40/41] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ipykernel/thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/thread.py b/ipykernel/thread.py index f90afb1cf..cf8532e3a 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -1,9 +1,9 @@ """Base class for threads.""" from __future__ import annotations +import typing as t from collections.abc import Awaitable from queue import Queue -import typing as t from threading import Event, Thread from anyio import create_task_group, run, to_thread From cefa3e9f9f1645bf132c64d770fb521d8614821e Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 13 Feb 2025 13:11:34 +0100 Subject: [PATCH 41/41] Fix test_start_kernel --- tests/test_start_kernel.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/test_start_kernel.py b/tests/test_start_kernel.py index f2a632be0..baa96c1b2 100644 --- a/tests/test_start_kernel.py +++ b/tests/test_start_kernel.py @@ -24,7 +24,10 @@ def test_ipython_start_kernel_userns(): with setup_kernel(cmd) as client: client.inspect("custom") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert content["found"] text = content["data"]["text/plain"] @@ -36,7 +39,10 @@ def test_ipython_start_kernel_userns(): content = msg["content"] assert content["status"] == "ok" client.inspect("usermod") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert content["found"] text = content["data"]["text/plain"] @@ -60,7 +66,10 @@ def test_ipython_start_kernel_no_userns(): content = msg["content"] assert content["status"] == "ok" client.inspect("usermod") - msg = client.get_shell_msg(timeout=TIMEOUT) + while True: + msg = client.get_shell_msg(timeout=TIMEOUT) + if msg["msg_type"] == "inspect_reply": + break content = msg["content"] assert content["found"] text = content["data"]["text/plain"]