Skip to content

fix/mcp_session_auto_close_when_Mcpworkbench_deleted #6497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import builtins
import warnings
from typing import Any, List, Literal, Mapping
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(self, server_params: McpServerParams) -> None:
self._server_params = server_params
# self._session: ClientSession | None = None
self._actor: McpSessionActor | None = None
self._actor_loop: asyncio.AbstractEventLoop | None = None
self._read = None
self._write = None

Expand Down Expand Up @@ -253,6 +255,7 @@ async def start(self) -> None:
if isinstance(self._server_params, (StdioServerParams, SseServerParams)):
self._actor = McpSessionActor(self._server_params)
await self._actor.initialize()
self._actor_loop = asyncio.get_event_loop()
else:
raise ValueError(f"Unsupported server params type: {type(self._server_params)}")

Expand Down Expand Up @@ -282,4 +285,10 @@ def _from_config(cls, config: McpWorkbenchConfig) -> Self:

def __del__(self) -> None:
# Ensure the actor is stopped when the workbench is deleted
pass
if self._actor and self._actor_loop:
loop = self._actor_loop
if loop.is_running() and not loop.is_closed():
loop.call_soon_threadsafe(lambda: asyncio.create_task(self.stop()))
else:
msg = "Cannot safely stop actor at [McpWorkbench.__del__]: loop is closed or not running"
warnings.warn(msg, RuntimeWarning, stacklevel=2)
55 changes: 55 additions & 0 deletions python/packages/autogen-ext/tests/tools/test_mcp_tools.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import asyncio
import logging
import os
import threading
from typing import cast
from unittest.mock import AsyncMock, MagicMock

import pytest
from _pytest.logging import LogCaptureFixture # type: ignore[import]
from autogen_core import CancellationToken
from autogen_core.tools import Workbench
from autogen_core.utils import schema_to_pydantic_model
from autogen_ext.tools.mcp import (
McpSessionActor,
McpWorkbench,
SseMcpToolAdapter,
SseServerParams,
Expand Down Expand Up @@ -594,4 +599,54 @@ async def test_lazy_init_and_finalize_cleanup() -> None:
assert workbench._actor is not None # type: ignore[reportPrivateUsage]
assert workbench._actor._active is True # type: ignore[reportPrivateUsage]

actor = workbench._actor # type: ignore[reportPrivateUsage]
del workbench
await asyncio.sleep(0.1)
assert actor._active is False


@pytest.mark.asyncio
async def test_del_to_new_event_loop_when_get_event_loop_fails() -> None:
params = StdioServerParams(
command="npx",
args=[
"-y",
"@modelcontextprotocol/server-filesystem",
".",
],
read_timeout_seconds=60,
)
workbench = McpWorkbench(server_params=params)

await workbench.list_tools()
assert workbench._actor is not None # type: ignore[reportPrivateUsage]
assert workbench._actor._active is True # type: ignore[reportPrivateUsage]

actor = workbench._actor # type: ignore[reportPrivateUsage]

def cleanup() -> None:
nonlocal workbench
del workbench

t = threading.Thread(target=cleanup)
t.start()
t.join()

await asyncio.sleep(0.1)
assert actor._active is False # type: ignore[reportPrivateUsage]


def test_del_raises_when_loop_closed() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

params = StdioServerParams(command="echo", args=["ok"])
workbench = McpWorkbench(server_params=params)

workbench._actor_loop = loop # type: ignore[reportPrivateUsage]
workbench._actor = cast(McpSessionActor, object()) # type: ignore[reportPrivateUsage]

loop.close()

with pytest.warns(RuntimeWarning, match="loop is closed or not running"):
del workbench
Loading