diff --git a/python/packages/autogen-ext/src/autogen_ext/tools/mcp/_workbench.py b/python/packages/autogen-ext/src/autogen_ext/tools/mcp/_workbench.py index 419ae34bfe2e..fd43ff852f3d 100644 --- a/python/packages/autogen-ext/src/autogen_ext/tools/mcp/_workbench.py +++ b/python/packages/autogen-ext/src/autogen_ext/tools/mcp/_workbench.py @@ -1,3 +1,4 @@ +import asyncio import builtins import warnings from typing import Any, List, Literal, Mapping @@ -152,6 +153,7 @@ def __init__(self, server_params: McpServerParams) -> None: self._server_params = server_params # self._session: ClientSession | None = None self._actor: McpSessionActor | None = None + self._actor_loop: asyncio.AbstractEventLoop | None = None self._read = None self._write = None @@ -253,6 +255,7 @@ async def start(self) -> None: if isinstance(self._server_params, (StdioServerParams, SseServerParams)): self._actor = McpSessionActor(self._server_params) await self._actor.initialize() + self._actor_loop = asyncio.get_event_loop() else: raise ValueError(f"Unsupported server params type: {type(self._server_params)}") @@ -282,4 +285,10 @@ def _from_config(cls, config: McpWorkbenchConfig) -> Self: def __del__(self) -> None: # Ensure the actor is stopped when the workbench is deleted - pass + if self._actor and self._actor_loop: + loop = self._actor_loop + if loop.is_running() and not loop.is_closed(): + loop.call_soon_threadsafe(lambda: asyncio.create_task(self.stop())) + else: + msg = "Cannot safely stop actor at [McpWorkbench.__del__]: loop is closed or not running" + warnings.warn(msg, RuntimeWarning, stacklevel=2) diff --git a/python/packages/autogen-ext/tests/tools/test_mcp_tools.py b/python/packages/autogen-ext/tests/tools/test_mcp_tools.py index de9fd5cb513c..1230395a77f3 100644 --- a/python/packages/autogen-ext/tests/tools/test_mcp_tools.py +++ b/python/packages/autogen-ext/tests/tools/test_mcp_tools.py @@ -1,12 +1,17 @@ +import asyncio import logging import os +import threading +from typing import cast from unittest.mock import AsyncMock, MagicMock import pytest +from _pytest.logging import LogCaptureFixture # type: ignore[import] from autogen_core import CancellationToken from autogen_core.tools import Workbench from autogen_core.utils import schema_to_pydantic_model from autogen_ext.tools.mcp import ( + McpSessionActor, McpWorkbench, SseMcpToolAdapter, SseServerParams, @@ -594,4 +599,54 @@ async def test_lazy_init_and_finalize_cleanup() -> None: assert workbench._actor is not None # type: ignore[reportPrivateUsage] assert workbench._actor._active is True # type: ignore[reportPrivateUsage] + actor = workbench._actor # type: ignore[reportPrivateUsage] del workbench + await asyncio.sleep(0.1) + assert actor._active is False + + +@pytest.mark.asyncio +async def test_del_to_new_event_loop_when_get_event_loop_fails() -> None: + params = StdioServerParams( + command="npx", + args=[ + "-y", + "@modelcontextprotocol/server-filesystem", + ".", + ], + read_timeout_seconds=60, + ) + workbench = McpWorkbench(server_params=params) + + await workbench.list_tools() + assert workbench._actor is not None # type: ignore[reportPrivateUsage] + assert workbench._actor._active is True # type: ignore[reportPrivateUsage] + + actor = workbench._actor # type: ignore[reportPrivateUsage] + + def cleanup() -> None: + nonlocal workbench + del workbench + + t = threading.Thread(target=cleanup) + t.start() + t.join() + + await asyncio.sleep(0.1) + assert actor._active is False # type: ignore[reportPrivateUsage] + + +def test_del_raises_when_loop_closed() -> None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + params = StdioServerParams(command="echo", args=["ok"]) + workbench = McpWorkbench(server_params=params) + + workbench._actor_loop = loop # type: ignore[reportPrivateUsage] + workbench._actor = cast(McpSessionActor, object()) # type: ignore[reportPrivateUsage] + + loop.close() + + with pytest.warns(RuntimeWarning, match="loop is closed or not running"): + del workbench