Skip to content

Added functionlaity for logging information #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/simple-chat-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ thiserror = "2.0"
async-trait = "0.1"
futures = "0.3"
toml = "0.8"
rmcp = { workspace = true, features = [
rmcp = { path = "../../rmcp", features = [
"client",
"transport-child-process",
"transport-sse-client",
"reqwest"
], no-default-features = true }
], default-features = false }
clap = { version = "4.0", features = ["derive"] }
21 changes: 14 additions & 7 deletions examples/transport/src/http_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ async fn main() -> anyhow::Result<()> {
start_server().await?;
let client = http_client("127.0.0.1:8001").await?;
let tools = client.list_all_tools().await?;
tracing::info!("Available tools: {:#?}", tools);
client.cancel().await?;
tracing::info!("{:#?}", tools);
tracing::info!("Client terminated");
Ok(())
}

async fn http_server(req: Request<Incoming>) -> Result<hyper::Response<String>, hyper::Error> {
tokio::spawn(async move {
let upgraded = hyper::upgrade::on(req).await?;
let service = Calculator.serve(TokioIo::new(upgraded)).await?;
service.waiting().await?;
anyhow::Result::<()>::Ok(())
if let Err(e) = async {
let upgraded = hyper::upgrade::on(req).await?;
let service = Calculator.serve(TokioIo::new(upgraded)).await?;
service.waiting().await?;
Ok::<(), anyhow::Error>(())
}.await {
tracing::error!("Service error: {:?}", e);
}
});
let mut response = hyper::Response::new(String::new());
*response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
Expand All @@ -36,6 +41,7 @@ async fn http_server(req: Request<Incoming>) -> Result<hyper::Response<String>,
Ok(response)
}


async fn http_client(uri: &str) -> anyhow::Result<RunningService<RoleClient, ()>> {
let tcp_stream = tokio::net::TcpStream::connect(uri).await?;
let (mut s, c) =
Expand All @@ -52,12 +58,13 @@ async fn http_client(uri: &str) -> anyhow::Result<RunningService<RoleClient, ()>

async fn start_server() -> anyhow::Result<()> {
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:8001").await?;
tracing::info!("Server listening on {}", tcp_listener.local_addr()?);
let service = hyper::service::service_fn(http_server);
tokio::spawn(async move {
while let Ok((stream, addr)) = tcp_listener.accept().await {
tracing::info!("accepted connection from: {}", addr);
tracing::info!("Accepted connection from: {}", addr);
let conn = hyper::server::conn::http1::Builder::new()
.serve_connection(TokioIo::new(stream), service)
.serve_connection(TokioIo::new(stream), service.clone())
.with_upgrades();
tokio::spawn(conn);
}
Expand Down
48 changes: 40 additions & 8 deletions examples/transport/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,63 @@
use common::calculator::Calculator;
use rmcp::{serve_client, serve_server};
use tracing::info;
use tracing_subscriber::EnvFilter;

mod common;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tokio::spawn(server());
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("info".parse()?))
.init();


let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(server(tx));
rx.await??;
client().await?;
Ok(())
}

async fn server() -> anyhow::Result<()> {
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:8001").await?;
while let Ok((stream, _)) = tcp_listener.accept().await {
async fn server(ready_tx: tokio::sync::oneshot::Sender<anyhow::Result<()>>) {

let bind_result = tokio::net::TcpListener::bind("127.0.0.1:8001").await;
let listener = match bind_result {
Ok(l) => l,
Err(e) => {
let _ = ready_tx.send(Err(e.into()));
return;
}
};

info!("Server listening on {}", listener.local_addr().unwrap());
let _ = ready_tx.send(Ok(()));


while let Ok((stream, addr)) = listener.accept().await {
info!("Accepted connection from: {}", addr);

tokio::spawn(async move {
let server = serve_server(Calculator, stream).await?;
server.waiting().await?;
anyhow::Ok(())
match serve_server(Calculator, stream).await {
Ok(server) => {
if let Err(e) = server.waiting().await {
info!("Connection closed with error: {}", e);
}
}
Err(e) => {
info!("Failed to serve connection: {}", e);
}
};
});
}
Ok(())
}

async fn client() -> anyhow::Result<()> {
info!("Client connecting to server...");
let stream = tokio::net::TcpSocket::new_v4()?
.connect("127.0.0.1:8001".parse()?)
.await?;
let client = serve_client((), stream).await?;
info!("Client connected successfully");
let tools = client.peer().list_tools(Default::default()).await?;
println!("{:?}", tools);
Ok(())
Expand Down
Loading