From ea9855d2d4222e4e455afbff232dfb503faa3ece Mon Sep 17 00:00:00 2001 From: Adesoji Alu Date: Mon, 2 Jun 2025 11:12:06 +0100 Subject: [PATCH 1/6] feat: improve HTTP upgrade handling Added server startup log with bound address Cloned service handler for proper connection handling Improved connection logging with client addresses --- examples/transport/src/http_upgrade.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/examples/transport/src/http_upgrade.rs b/examples/transport/src/http_upgrade.rs index 1c0cf6ad..74379799 100644 --- a/examples/transport/src/http_upgrade.rs +++ b/examples/transport/src/http_upgrade.rs @@ -16,17 +16,22 @@ async fn main() -> anyhow::Result<()> { start_server().await?; let client = http_client("127.0.0.1:8001").await?; let tools = client.list_all_tools().await?; + tracing::info!("Available tools: {:#?}", tools); client.cancel().await?; - tracing::info!("{:#?}", tools); + tracing::info!("Client terminated"); Ok(()) } async fn http_server(req: Request) -> Result, hyper::Error> { tokio::spawn(async move { + if let Err(e) = async { let upgraded = hyper::upgrade::on(req).await?; let service = Calculator.serve(TokioIo::new(upgraded)).await?; service.waiting().await?; - anyhow::Result::<()>::Ok(()) + Ok::<(), anyhow::Error>(()) + }.await { + tracing::error!("Service error: {:?}", e); + } }); let mut response = hyper::Response::new(String::new()); *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; @@ -46,18 +51,19 @@ async fn http_client(uri: &str) -> anyhow::Result .insert(UPGRADE, HeaderValue::from_static("mcp")); let response = s.send_request(req).await?; let upgraded = hyper::upgrade::on(response).await?; - let client = ().serve(TokioIo::new(upgraded)).await?; + let client = RoleClient.serve(TokioIo::new(upgraded)).await?; Ok(client) } async fn start_server() -> anyhow::Result<()> { let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:8001").await?; + tracing::info!("Server listening on {}", tcp_listener.local_addr()?); let service = hyper::service::service_fn(http_server); tokio::spawn(async move { while let Ok((stream, addr)) = tcp_listener.accept().await { - tracing::info!("accepted connection from: {}", addr); + tracing::info!("Accepted connection from: {}", addr); let conn = hyper::server::conn::http1::Builder::new() - .serve_connection(TokioIo::new(stream), service) + .serve_connection(TokioIo::new(stream), service.clone()) .with_upgrades(); tokio::spawn(conn); } From a3bf97c5b9edb3bcd7ce94d1afc5ee4909492e93 Mon Sep 17 00:00:00 2001 From: Adesoji Alu Date: Mon, 2 Jun 2025 11:25:49 +0100 Subject: [PATCH 2/6] refactor: optimize TCP connection handling --- examples/transport/src/tcp.rs | 48 +++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/examples/transport/src/tcp.rs b/examples/transport/src/tcp.rs index 72428fe6..46849952 100644 --- a/examples/transport/src/tcp.rs +++ b/examples/transport/src/tcp.rs @@ -1,31 +1,63 @@ use common::calculator::Calculator; use rmcp::{serve_client, serve_server}; +use tracing::info; +use tracing_subscriber::EnvFilter; mod common; #[tokio::main] async fn main() -> anyhow::Result<()> { - tokio::spawn(server()); + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env().add_directive("info".parse()?)) + .init(); + + + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(server(tx)); + rx.await??; client().await?; Ok(()) } -async fn server() -> anyhow::Result<()> { - let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:8001").await?; - while let Ok((stream, _)) = tcp_listener.accept().await { +async fn server(ready_tx: tokio::sync::oneshot::Sender>) { + + let bind_result = tokio::net::TcpListener::bind("127.0.0.1:8001").await; + let listener = match bind_result { + Ok(l) => l, + Err(e) => { + let _ = ready_tx.send(Err(e.into())); + return; + } + }; + + info!("Server listening on {}", listener.local_addr().unwrap()); + let _ = ready_tx.send(Ok(())); + + + while let Ok((stream, addr)) = listener.accept().await { + info!("Accepted connection from: {}", addr); + tokio::spawn(async move { - let server = serve_server(Calculator, stream).await?; - server.waiting().await?; - anyhow::Ok(()) + match serve_server(Calculator, stream).await { + Ok(server) => { + if let Err(e) = server.waiting().await { + info!("Connection closed with error: {}", e); + } + } + Err(e) => { + info!("Failed to serve connection: {}", e); + } + }; }); } - Ok(()) } async fn client() -> anyhow::Result<()> { + info!("Client connecting to server..."); let stream = tokio::net::TcpSocket::new_v4()? .connect("127.0.0.1:8001".parse()?) .await?; let client = serve_client((), stream).await?; + info!("Client connected successfully"); let tools = client.peer().list_tools(Default::default()).await?; println!("{:?}", tools); Ok(()) From 8eac97aeb6d97e550b314b4f0abb18b981e65fe8 Mon Sep 17 00:00:00 2001 From: Adesoji Alu Date: Mon, 2 Jun 2025 11:29:12 +0100 Subject: [PATCH 3/6] chore: update Cargo.toml dependencies --- examples/transport/Cargo.toml | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/transport/Cargo.toml b/examples/transport/Cargo.toml index 01306034..ba84515a 100644 --- a/examples/transport/Cargo.toml +++ b/examples/transport/Cargo.toml @@ -16,15 +16,16 @@ all-features = true [dependencies] rmcp = { path = "../../crates/rmcp", features = ["server", "client"] } -tokio = { version = "1", features = [ - "macros", - "rt", - "rt-multi-thread", - "io-std", - "net", - "fs", - "time", -] } +tokio = { version = "1", features = ["full"] } +# tokio = { version = "1", features = [ +# "macros", +# "rt", +# "rt-multi-thread", +# "io-std", +# "net", +# "fs", +# "time", +# ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" anyhow = "1.0" From 2e26c427c22bb23efd31f3a2dbb2f61213f0d69b Mon Sep 17 00:00:00 2001 From: Adesoji Alu Date: Tue, 3 Jun 2025 10:13:39 +0100 Subject: [PATCH 4/6] chore: update hyper dependency in Cargo.toml --- examples/transport/Cargo.toml | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/examples/transport/Cargo.toml b/examples/transport/Cargo.toml index ba84515a..01306034 100644 --- a/examples/transport/Cargo.toml +++ b/examples/transport/Cargo.toml @@ -16,16 +16,15 @@ all-features = true [dependencies] rmcp = { path = "../../crates/rmcp", features = ["server", "client"] } -tokio = { version = "1", features = ["full"] } -# tokio = { version = "1", features = [ -# "macros", -# "rt", -# "rt-multi-thread", -# "io-std", -# "net", -# "fs", -# "time", -# ] } +tokio = { version = "1", features = [ + "macros", + "rt", + "rt-multi-thread", + "io-std", + "net", + "fs", + "time", +] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" anyhow = "1.0" From ee4198fdcfde85e5e104a44a59ad5e35d1b1f6a6 Mon Sep 17 00:00:00 2001 From: Adesoji1 Date: Wed, 4 Jun 2025 09:46:09 +0100 Subject: [PATCH 5/6] fix: resolve clippy linting issues --- examples/transport/src/http_upgrade.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/transport/src/http_upgrade.rs b/examples/transport/src/http_upgrade.rs index 74379799..1b2f1b9d 100644 --- a/examples/transport/src/http_upgrade.rs +++ b/examples/transport/src/http_upgrade.rs @@ -25,10 +25,10 @@ async fn main() -> anyhow::Result<()> { async fn http_server(req: Request) -> Result, hyper::Error> { tokio::spawn(async move { if let Err(e) = async { - let upgraded = hyper::upgrade::on(req).await?; - let service = Calculator.serve(TokioIo::new(upgraded)).await?; - service.waiting().await?; - Ok::<(), anyhow::Error>(()) + let upgraded = hyper::upgrade::on(req).await?; + let service = Calculator.serve(TokioIo::new(upgraded)).await?; + service.waiting().await?; + Ok::<(), anyhow::Error>(()) }.await { tracing::error!("Service error: {:?}", e); } @@ -41,6 +41,7 @@ async fn http_server(req: Request) -> Result, Ok(response) } + async fn http_client(uri: &str) -> anyhow::Result> { let tcp_stream = tokio::net::TcpStream::connect(uri).await?; let (mut s, c) = @@ -51,7 +52,7 @@ async fn http_client(uri: &str) -> anyhow::Result .insert(UPGRADE, HeaderValue::from_static("mcp")); let response = s.send_request(req).await?; let upgraded = hyper::upgrade::on(response).await?; - let client = RoleClient.serve(TokioIo::new(upgraded)).await?; + let client = ().serve(TokioIo::new(upgraded)).await?; Ok(client) } From c52de81006f02235b1962f60af98d6b3e2fde013 Mon Sep 17 00:00:00 2001 From: Adesoji Alu Date: Wed, 4 Jun 2025 12:41:21 +0100 Subject: [PATCH 6/6] Update Cargo.toml --- examples/simple-chat-client/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/simple-chat-client/Cargo.toml b/examples/simple-chat-client/Cargo.toml index 8d79222d..2a6ab8d9 100644 --- a/examples/simple-chat-client/Cargo.toml +++ b/examples/simple-chat-client/Cargo.toml @@ -13,10 +13,10 @@ thiserror = "2.0" async-trait = "0.1" futures = "0.3" toml = "0.8" -rmcp = { workspace = true, features = [ +rmcp = { path = "../../rmcp", features = [ "client", "transport-child-process", "transport-sse-client", "reqwest" -], no-default-features = true } +], default-features = false } clap = { version = "4.0", features = ["derive"] }