From 4a06fa5027c12feb09f422ec2c9f86f353f26d0b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 15 May 2025 16:45:54 +0000 Subject: [PATCH 1/5] tests: Add stability connection tests Signed-off-by: Alexandru Vasile --- src/lib.rs | 2 +- tests/connection/mod.rs | 2 + tests/connection/stability.rs | 344 ++++++++++++++++++++++++++++++++++ 3 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 tests/connection/stability.rs diff --git a/src/lib.rs b/src/lib.rs index 66e03289..77dcdeed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,7 +80,7 @@ pub mod yamux; mod bandwidth; mod multistream_select; -mod utils; +pub mod utils; #[cfg(test)] mod mock; diff --git a/tests/connection/mod.rs b/tests/connection/mod.rs index d47e19c5..ebe55169 100644 --- a/tests/connection/mod.rs +++ b/tests/connection/mod.rs @@ -44,6 +44,8 @@ use crate::common::{add_transport, Transport}; #[cfg(test)] mod protocol_dial_invalid_address; +#[cfg(test)] +mod stability; #[tokio::test] async fn two_litep2ps_work_tcp() { diff --git a/tests/connection/stability.rs b/tests/connection/stability.rs new file mode 100644 index 00000000..095309a4 --- /dev/null +++ b/tests/connection/stability.rs @@ -0,0 +1,344 @@ +// Copyright 2025 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use litep2p::{ + codec::ProtocolCodec, + config::ConfigBuilder, + crypto::ed25519::Keypair, + protocol::{ + libp2p::ping::Config as PingConfig, Direction, TransportEvent, TransportService, + UserProtocol, + }, + substream::Substream, + transport::tcp::config::Config as TcpConfig, + types::protocol::ProtocolName, + utils::futures_stream::FuturesStream, + Litep2p, PeerId, +}; + +use futures::{future::BoxFuture, StreamExt}; + +use crate::common::{add_transport, Transport}; + +const PROTOCOL_NAME: &str = "/litep2p-stability/1.0.0"; + +const LOG_TARGET: &str = "litep2p::stability"; + +/// The stability protocol ensures a single transport connection +/// (either TCP or WebSocket) can sustain multiple received packets. +/// +/// The scenario puts stress on the internal buffers, ensuring that +/// each layer behave properly. +/// +/// ## Protocol Details +/// +/// The protocol opens 16 outbound substreams on the connection established event. +/// Therefore, it will handle 16 outbound substreams and 16 inbound substreams +/// (open by the remote). +/// +/// The outbound substreams will push a configurable number of packets, each of +/// size 128 bytes, to the remote peer. While the inbound substreams will read +/// the same number of packets from the remote peer. +pub struct StabilityProtocol { + /// The number of identical packets to send / receive on a substream. + total_packets: usize, + inbound: FuturesStream>>, + outbound: FuturesStream>>, + /// Peer Id for logging purposes. + peer_id: PeerId, + /// The sender to notify the test that the protocol finished. + tx: Option>, +} + +impl StabilityProtocol { + fn new(total_packets: usize, peer_id: PeerId) -> (Self, tokio::sync::oneshot::Receiver<()>) { + let (tx, rx) = tokio::sync::oneshot::channel(); + + ( + Self { + total_packets, + inbound: FuturesStream::new(), + outbound: FuturesStream::new(), + peer_id, + tx: Some(tx), + }, + rx, + ) + } + + fn handle_substream(&mut self, mut substream: Substream, direction: Direction) { + let mut total_packets = self.total_packets; + match direction { + Direction::Inbound => { + self.inbound.push(Box::pin(async move { + while total_packets > 0 { + let _payload = substream + .next() + .await + .ok_or_else(|| { + tracing::warn!(target: LOG_TARGET, "Failed to read None from substream"); + "Failed to read None from substream".to_string() + })? + .map_err(|err| { + tracing::warn!(target: LOG_TARGET, "Failed to read from substream {:?}", err); + "Failed to read from substream".to_string() + })?; + total_packets -= 1; + } + + Ok(()) + })); + } + Direction::Outbound { .. } => { + self.outbound.push(Box::pin(async move { + let mut frame = vec![0; 128]; + for i in 0..frame.len() { + frame[i] = i as u8; + } + + while total_packets > 0 { + substream.send_framed(frame.clone().into()).await.map_err(|err| { + tracing::warn!("Failed to send to substream {:?}", err); + "Failed to send to substream".to_string() + })?; + total_packets -= 1; + } + + Ok(()) + })); + } + } + } +} + +#[async_trait::async_trait] +impl UserProtocol for StabilityProtocol { + fn protocol(&self) -> ProtocolName { + PROTOCOL_NAME.into() + } + + fn codec(&self) -> ProtocolCodec { + // Similar to the identify payload size. + ProtocolCodec::UnsignedVarint(Some(4096)) + } + + async fn run(mut self: Box, mut service: TransportService) -> litep2p::Result<()> { + let num_substreams = 16; + let mut handled_substreams = 0; + + loop { + if handled_substreams == 2 * num_substreams { + tracing::info!( + target: LOG_TARGET, + handled_substreams, + peer_id = %self.peer_id, + "StabilityProtocol finished to handle packets", + ); + + self.tx.take().expect("Send happens only once; qed").send(()).unwrap(); + // If one of the stability protocols finishes, while the + // the other is still reading data from the stream, the test + // might race if the substream detects the connection as closed. + futures::future::pending::<()>().await; + } + + tokio::select! { + event = service.next() => match event { + Some(TransportEvent::ConnectionEstablished { peer, .. }) => { + for i in 0..num_substreams { + match service.open_substream(peer) { + Ok(_) => {}, + Err(e) => { + tracing::error!( + target: LOG_TARGET, + ?e, + i, + peer_id = %self.peer_id, + "Failed to open substream" + ); + // Drop the tx sender. + return Ok(()); + } + } + } + } + Some(TransportEvent::ConnectionClosed { peer }) => { + tracing::error!( + target: LOG_TARGET, + peer_id = %self.peer_id, + "Connection closed unexpectedly: {}", + peer + ); + + panic!("connection closed"); + } + Some(TransportEvent::SubstreamOpened { + substream, + direction, + .. + }) => { + self.handle_substream(substream, direction); + } + _ => {}, + }, + + inbound = self.inbound.next(), if !self.inbound.is_empty() => { + match inbound { + Some(Ok(())) => { + handled_substreams += 1; + } + Some(Err(err)) => { + tracing::error!( + target: LOG_TARGET, + peer_id = %self.peer_id, + "Inbound stream failed with error: {}", + err + ); + // Drop the tx sender. + return Ok(()); + } + None => { + tracing::error!( + target: LOG_TARGET, + peer_id = %self.peer_id, + "Inbound stream failed with None", + ); + panic!("Inbound stream failed"); + } + } + }, + + outbound = self.outbound.next(), if !self.outbound.is_empty() => { + match outbound { + Some(Ok(())) => { + handled_substreams += 1; + } + Some(Err(err)) => { + tracing::error!( + target: LOG_TARGET, + peer_id = %self.peer_id, + "Outbound stream failed with error: {}", + err + ); + // Drop the tx sender. + return Ok(()); + } + None => { + tracing::error!( + target: LOG_TARGET, + peer_id = %self.peer_id, + "Outbound stream failed with None", + ); + panic!("Outbound stream failed"); + } + } + }, + } + } + } +} + +async fn stability_litep2p_transport(transport1: Transport, transport2: Transport) { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (ping_config1, _ping_event_stream1) = PingConfig::default(); + let keypair = Keypair::generate(); + let peer_id = keypair.public().to_peer_id(); + let (stability_protocol, mut exit1) = StabilityProtocol::new(1000, peer_id); + let config1 = ConfigBuilder::new() + .with_keypair(keypair) + .with_libp2p_ping(ping_config1) + .with_user_protocol(Box::new(stability_protocol)); + + let config1 = add_transport(config1, transport1).build(); + + let (ping_config2, _ping_event_stream2) = PingConfig::default(); + let keypair = Keypair::generate(); + let peer_id = keypair.public().to_peer_id(); + let (stability_protocol, mut exit2) = StabilityProtocol::new(1000, peer_id); + let config2 = ConfigBuilder::new() + .with_keypair(keypair) + .with_libp2p_ping(ping_config2) + .with_user_protocol(Box::new(stability_protocol)); + + let config2 = add_transport(config2, transport2).build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + + let address = litep2p2.listen_addresses().next().unwrap().clone(); + litep2p1.dial_address(address).await.unwrap(); + + let mut litep2p1_exit = false; + let mut litep2p2_exit = false; + loop { + if litep2p1_exit && litep2p2_exit { + break; + } + + tokio::select! { + // Wait for the stability protocols to finish, while keeping + // the peer connections alive. + event = &mut exit1, if !litep2p1_exit => { + if let Ok(()) = event { + litep2p1_exit = true; + } else { + panic!("StabilityProtocol 1 failed"); + } + }, + event = &mut exit2, if !litep2p2_exit => { + if let Ok(()) = event { + litep2p2_exit = true; + } else { + panic!("StabilityProtocol 2 failed"); + } + }, + + // Drive litep2p backends. + event = litep2p1.next_event() => { + tracing::info!(target: LOG_TARGET, "litep2p1 event: {:?}", event); + } + event = litep2p2.next_event() => { + tracing::info!(target: LOG_TARGET, "litep2p2 event: {:?}", event); + } + } + } +} + +#[tokio::test] +async fn stability_tcp() { + let transport1 = Transport::Tcp(TcpConfig::default()); + let transport2 = Transport::Tcp(TcpConfig::default()); + + stability_litep2p_transport(transport1, transport2).await; +} + +#[cfg(feature = "websocket")] +#[tokio::test] +async fn stability_websocket() { + use litep2p::transport::websocket::config::Config as WebSocketConfig; + + let transport1 = Transport::WebSocket(WebSocketConfig::default()); + let transport2 = Transport::WebSocket(WebSocketConfig::default()); + + stability_litep2p_transport(transport1, transport2).await; +} From edaf6438f9692bd028770ef8b91b2e45136c648a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 16 May 2025 11:41:11 +0000 Subject: [PATCH 2/5] websocket: Fix connection stability on decrypt messages Signed-off-by: Alexandru Vasile --- src/transport/websocket/stream.rs | 140 ++++++------------------------ 1 file changed, 26 insertions(+), 114 deletions(-) diff --git a/src/transport/websocket/stream.rs b/src/transport/websocket/stream.rs index cec8e10f..1e958cfe 100644 --- a/src/transport/websocket/stream.rs +++ b/src/transport/websocket/stream.rs @@ -21,7 +21,7 @@ //! Stream implementation for `tokio_tungstenite::WebSocketStream` that implements //! `AsyncRead + AsyncWrite` -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, Bytes}; use futures::{SinkExt, StreamExt}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; @@ -31,27 +31,11 @@ use std::{ task::{Context, Poll}, }; -const DEFAULT_BUF_SIZE: usize = 8 * 1024; - -/// Send state. -#[derive(Debug)] -enum State { - /// State is poisoned. - Poisoned, - - /// Sink is accepting input. - ReadyToSend, - - /// Flush is pending for the sink. - FlushPending, -} +const LOG_TARGET: &str = "litep2p::transport::websocket::stream"; /// Buffered stream which implements `AsyncRead + AsyncWrite` #[derive(Debug)] pub(super) struct BufferedStream { - /// Write buffer. - write_buffer: BytesMut, - /// Read buffer. /// /// The buffer is taken directly from the WebSocket stream. @@ -59,19 +43,14 @@ pub(super) struct BufferedStream { /// Underlying WebSocket stream. stream: WebSocketStream, - - /// Read state. - state: State, } impl BufferedStream { /// Create new [`BufferedStream`]. pub(super) fn new(stream: WebSocketStream) -> Self { Self { - write_buffer: BytesMut::with_capacity(DEFAULT_BUF_SIZE), read_buffer: Bytes::new(), stream, - state: State::ReadyToSend, } } } @@ -79,73 +58,39 @@ impl BufferedStream { impl futures::AsyncWrite for BufferedStream { fn poll_write( mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - self.write_buffer.extend_from_slice(buf); - - Poll::Ready(Ok(buf.len())) - } + match futures::ready!(self.stream.poll_ready_unpin(cx)) { + Ok(()) => { + let message = Message::Binary(Bytes::copy_from_slice(buf)); - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.write_buffer.is_empty() { - return self - .stream - .poll_ready_unpin(cx) - .map_err(|_| std::io::ErrorKind::UnexpectedEof.into()); - } - - loop { - match std::mem::replace(&mut self.state, State::Poisoned) { - State::ReadyToSend => { - match self.stream.poll_ready_unpin(cx) { - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(_error)) => - return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())), - Poll::Pending => { - self.state = State::ReadyToSend; - return Poll::Pending; - } - } - - let message = std::mem::take(&mut self.write_buffer); - match self.stream.start_send_unpin(Message::Binary(message.freeze())) { - Ok(()) => {} - Err(_error) => - return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())), - } - - // Transition to flush pending state. - self.state = State::FlushPending; - continue; + if let Err(err) = self.stream.start_send_unpin(message) { + tracing::debug!(target: LOG_TARGET, "Error during start send: {:?}", err); + return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())); } - State::FlushPending => { - match self.stream.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(_error)) => - return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())), - Poll::Pending => { - self.state = State::FlushPending; - return Poll::Pending; - } - } - - self.state = State::ReadyToSend; - self.write_buffer = BytesMut::with_capacity(DEFAULT_BUF_SIZE); - return Poll::Ready(Ok(())); - } - State::Poisoned => - return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())), + Poll::Ready(Ok(buf.len())) + } + Err(err) => { + tracing::debug!(target: LOG_TARGET, "Error during poll ready: {:?}", err); + return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())); } } } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.poll_flush_unpin(cx).map_err(|err| { + tracing::debug!(target: LOG_TARGET, "Error during poll flush: {:?}", err); + std::io::ErrorKind::UnexpectedEof.into() + }) + } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match futures::ready!(self.stream.poll_close_unpin(cx)) { - Ok(_) => Poll::Ready(Ok(())), - Err(_) => Poll::Ready(Err(std::io::ErrorKind::PermissionDenied.into())), - } + self.stream.poll_close_unpin(cx).map_err(|err| { + tracing::debug!(target: LOG_TARGET, "Error during poll close: {:?}", err); + std::io::ErrorKind::PermissionDenied.into() + }) } } @@ -183,7 +128,7 @@ impl futures::AsyncRead for BufferedStream #[cfg(test)] mod tests { use super::*; - use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + use futures::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use tokio::io::DuplexStream; use tokio_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; @@ -203,7 +148,6 @@ mod tests { let bytes_written = stream.write(data).await.unwrap(); assert_eq!(bytes_written, data.len()); - assert_eq!(&stream.write_buffer[..], data); } #[tokio::test] @@ -253,38 +197,6 @@ mod tests { }; } - #[tokio::test] - async fn test_poisoned_state() { - let (mut stream, server) = create_test_stream().await; - drop(server); - - stream.state = State::Poisoned; - - let mut buffer = [0u8; 10]; - let result = stream.read(&mut buffer).await; - match result { - Err(error) => if error.kind() == std::io::ErrorKind::UnexpectedEof {}, - state => panic!("Unexpected state {state:?}"), - }; - - let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); - let mut pin_stream = Pin::new(&mut stream); - - // Messages are buffered internally, the socket is not touched. - match pin_stream.as_mut().poll_write(&mut cx, &mut buffer) { - Poll::Ready(Ok(10)) => {} - state => panic!("Unexpected state {state:?}"), - } - // Socket is poisoned, the flush will fail. - match pin_stream.poll_flush(&mut cx) { - Poll::Ready(Err(error)) => - if error.kind() == std::io::ErrorKind::UnexpectedEof { - return; - }, - state => panic!("Unexpected state {state:?}"), - } - } - #[tokio::test] async fn test_read_poll_pending() { let (mut stream, mut _server) = create_test_stream().await; From 0b99e9339beaac33488549dde583cb74fa47ee28 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 16 May 2025 14:32:18 +0000 Subject: [PATCH 3/5] websocket: Fix clippy Signed-off-by: Alexandru Vasile --- src/transport/websocket/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/websocket/stream.rs b/src/transport/websocket/stream.rs index 1e958cfe..05846c9d 100644 --- a/src/transport/websocket/stream.rs +++ b/src/transport/websocket/stream.rs @@ -74,7 +74,7 @@ impl futures::AsyncWrite for BufferedStream { tracing::debug!(target: LOG_TARGET, "Error during poll ready: {:?}", err); - return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())); + Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())) } } } From 70140acb5f29f54437e9f7ce7dbd3f01011ce3f3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 16 May 2025 14:33:40 +0000 Subject: [PATCH 4/5] lib: Allow large enum variants Signed-off-by: Alexandru Vasile --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 77dcdeed..32f61bf4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ #![allow(clippy::single_match)] #![allow(clippy::result_large_err)] +#![allow(clippy::large_enum_variant)] #![allow(clippy::redundant_pattern_matching)] #![allow(clippy::type_complexity)] #![allow(clippy::result_unit_err)] From 4049f4d5a59e2d098ce044b5ef450b9e74e171ab Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 16 May 2025 14:34:29 +0000 Subject: [PATCH 5/5] multistream: Fix clippy Signed-off-by: Alexandru Vasile --- src/multistream_select/negotiated.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/multistream_select/negotiated.rs b/src/multistream_select/negotiated.rs index 04757041..e4609de2 100644 --- a/src/multistream_select/negotiated.rs +++ b/src/multistream_select/negotiated.rs @@ -370,6 +370,6 @@ impl From for io::Error { if let NegotiationError::ProtocolError(e) = err { return e.into(); } - io::Error::new(io::ErrorKind::Other, err) + io::Error::other(err) } }