diff --git a/src/codec/identity.rs b/src/codec/identity.rs index 266bff4b..f3e47716 100644 --- a/src/codec/identity.rs +++ b/src/codec/identity.rs @@ -102,7 +102,7 @@ mod tests { let bytes = [3u8; 64]; let mut bytes = BytesMut::from(&bytes[..]); - let decoded = codec.decode(&mut bytes); + assert!(codec.decode(&mut bytes).unwrap().is_none()); } #[test] diff --git a/src/crypto/noise/mod.rs b/src/crypto/noise/mod.rs index 2e4b6db0..775363e7 100644 --- a/src/crypto/noise/mod.rs +++ b/src/crypto/noise/mod.rs @@ -350,6 +350,8 @@ pub struct NoiseSocket { read_buffer: Vec, canonical_max_read: usize, decrypt_buffer: Option>, + peer: PeerId, + ty: HandshakeTransport, } impl NoiseSocket { @@ -358,6 +360,8 @@ impl NoiseSocket { noise: NoiseContext, max_read_ahead_factor: usize, max_write_buffer_size: usize, + peer: PeerId, + ty: HandshakeTransport, ) -> Self { Self { io, @@ -380,6 +384,8 @@ impl NoiseSocket { max_read: max_read_ahead_factor * MAX_NOISE_MSG_LEN, }, canonical_max_read: max_read_ahead_factor * MAX_NOISE_MSG_LEN, + peer, + ty, } } @@ -424,7 +430,13 @@ impl AsyncRead for NoiseSocket { }, }; - tracing::trace!(target: LOG_TARGET, ?nread, "read data from socket"); + tracing::trace!( + target: LOG_TARGET, + ?nread, + ty = ?this.ty, + peer = ?this.peer, + "read data from socket" + ); this.nread += nread; this.read_state = ReadState::ReadFrameLen; @@ -433,13 +445,25 @@ impl AsyncRead for NoiseSocket { let mut remaining = match this.nread.checked_sub(this.offset) { Some(remaining) => remaining, None => { - tracing::error!(target: LOG_TARGET, "offset is larger than the number of bytes read"); + tracing::error!( + target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, + nread = ?this.nread, + offset = ?this.offset, + "offset is larger than the number of bytes read" + ); return Poll::Ready(Err(io::ErrorKind::PermissionDenied.into())); } }; if remaining < 2 { - tracing::trace!(target: LOG_TARGET, "reset read buffer"); + tracing::trace!( + target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, + "reset read buffer" + ); this.reset_read_state(remaining); continue; } @@ -456,13 +480,20 @@ impl AsyncRead for NoiseSocket { } }; - tracing::trace!(target: LOG_TARGET, "current frame size = {frame_size}"); + tracing::trace!( + target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, + "current frame size = {frame_size}" + ); if remaining < frame_size { // `read_buffer` can fit the full frame size. if this.nread + frame_size < this.canonical_max_read { tracing::trace!( target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, max_size = ?this.canonical_max_read, next_frame_size = ?(this.nread + frame_size), "read buffer can fit the full frame", @@ -475,7 +506,12 @@ impl AsyncRead for NoiseSocket { continue; } - tracing::trace!(target: LOG_TARGET, "use auxiliary buffer extension"); + tracing::trace!( + target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, + "use auxiliary buffer extension" + ); // use the auxiliary memory at the end of the read buffer for reading the // frame @@ -489,6 +525,8 @@ impl AsyncRead for NoiseSocket { if frame_size <= NOISE_EXTRA_ENCRYPT_SPACE { tracing::error!( target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, ?frame_size, max_size = ?NOISE_EXTRA_ENCRYPT_SPACE, "invalid frame size", @@ -542,7 +580,16 @@ impl AsyncRead for NoiseSocket { buf, ) { Err(error) => { - tracing::error!(target: LOG_TARGET, ?error, "failed to decrypt message"); + tracing::error!( + target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, + buf_len = ?buf.len(), + frame_size = ?frame_size, + ?error, + "failed to decrypt message" + ); + return Poll::Ready(Err(io::ErrorKind::InvalidData.into())); } Ok(nread) => { @@ -560,7 +607,16 @@ impl AsyncRead for NoiseSocket { &mut buffer, ) { Err(error) => { - tracing::error!(target: LOG_TARGET, ?error, "failed to decrypt message"); + tracing::error!( + target: LOG_TARGET, + ty = ?this.ty, + peer = ?this.peer, + buf_len = ?buf.len(), + frame_size = ?frame_size, + ?error, + "failed to decrypt message for smaller buffer" + ); + return Poll::Ready(Err(io::ErrorKind::InvalidData.into())); } Ok(nread) => { @@ -605,7 +661,14 @@ impl AsyncWrite for NoiseSocket { match this.noise.write_message(chunk, &mut this.encrypt_buffer[offset + 2..]) { Err(error) => { - tracing::error!(target: LOG_TARGET, ?error, "failed to encrypt message"); + tracing::error!( + target: LOG_TARGET, + ?error, + ty = ?this.ty, + peer = ?this.peer, + "failed to encrypt message" + ); + return Poll::Ready(Err(io::ErrorKind::InvalidData.into())); } Ok(nwritten) => { @@ -701,6 +764,15 @@ fn parse_and_verify_peer_id( Ok(peer_id) } +/// The type of the transport used for the crypto/noise protocol. +/// +/// This is used for logging purposes. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HandshakeTransport { + Tcp, + WebSocket, +} + /// Perform Noise handshake. pub async fn handshake( mut io: S, @@ -709,9 +781,10 @@ pub async fn handshake( max_read_ahead_factor: usize, max_write_buffer_size: usize, timeout: std::time::Duration, + ty: HandshakeTransport, ) -> Result<(NoiseSocket, PeerId), NegotiationError> { let handle_handshake = async move { - tracing::debug!(target: LOG_TARGET, ?role, "start noise handshake"); + tracing::debug!(target: LOG_TARGET, ?role, ?ty, "start noise handshake"); let mut noise = NoiseContext::new(keypair, role)?; let payload = match role { @@ -727,7 +800,7 @@ pub async fn handshake( let payload = handshake_schema::NoiseHandshakePayload::decode(message) .map_err(ParseError::from) .map_err(|err| { - tracing::error!(target: LOG_TARGET, ?err, "failed to decode remote identity message"); + tracing::error!(target: LOG_TARGET, ?err, ?ty, "failed to decode remote identity message"); err })?; @@ -764,6 +837,8 @@ pub async fn handshake( noise.into_transport()?, max_read_ahead_factor, max_write_buffer_size, + peer, + ty, ), peer, )) @@ -818,6 +893,7 @@ mod tests { MAX_READ_AHEAD_FACTOR, MAX_WRITE_BUFFER_SIZE, std::time::Duration::from_secs(10), + HandshakeTransport::Tcp, ), handshake( io2, @@ -826,6 +902,7 @@ mod tests { MAX_READ_AHEAD_FACTOR, MAX_WRITE_BUFFER_SIZE, std::time::Duration::from_secs(10), + HandshakeTransport::Tcp, ) ); let (mut res1, mut res2) = (res1.unwrap(), res2.unwrap()); diff --git a/src/protocol/libp2p/kademlia/bucket.rs b/src/protocol/libp2p/kademlia/bucket.rs index e1b115cf..4c999efc 100644 --- a/src/protocol/libp2p/kademlia/bucket.rs +++ b/src/protocol/libp2p/kademlia/bucket.rs @@ -128,7 +128,7 @@ mod tests { .collect::>(); let target = Key::from(PeerId::random()); - let mut iter = bucket.closest_iter(&target); + let iter = bucket.closest_iter(&target); let mut prev = None; for node in iter { @@ -173,7 +173,7 @@ mod tests { .collect::>(); let target = Key::from(PeerId::random()); - let mut iter = bucket.closest_iter(&target); + let iter = bucket.closest_iter(&target); let mut prev = None; let mut num_peers = 0usize; diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index 8a7806d5..c208af8d 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -438,6 +438,7 @@ impl TcpConnection { max_read_ahead_factor, max_write_buffer_size, substream_open_timeout, + noise::HandshakeTransport::Tcp, ) .await?; @@ -1154,6 +1155,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::Tcp, ) .await .unwrap(); @@ -1212,6 +1214,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::Tcp, ) .await .unwrap(); @@ -1286,6 +1289,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::Tcp, ) .await .unwrap(); @@ -1339,6 +1343,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::Tcp, ) .await .unwrap(); diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 7c4dd56f..d6dc9268 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -323,6 +323,7 @@ impl WebSocketConnection { max_read_ahead_factor, max_write_buffer_size, substream_open_timeout, + noise::HandshakeTransport::WebSocket, ) .await?; @@ -887,6 +888,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::WebSocket, ) .await .unwrap(); @@ -1094,6 +1096,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::WebSocket, ) .await .unwrap(); @@ -1162,6 +1165,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::WebSocket, ) .await .unwrap(); @@ -1261,6 +1265,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::WebSocket, ) .await .unwrap(); @@ -1320,6 +1325,7 @@ mod tests { 5, 2, std::time::Duration::from_secs(10), + noise::HandshakeTransport::WebSocket, ) .await .unwrap(); diff --git a/tests/conformance/rust/kademlia.rs b/tests/conformance/rust/kademlia.rs index 8c9afbfa..84761914 100644 --- a/tests/conformance/rust/kademlia.rs +++ b/tests/conformance/rust/kademlia.rs @@ -24,7 +24,7 @@ use libp2p::{ identify, identity, kad::{ self, store::RecordStore, AddProviderOk, GetProvidersOk, InboundRequest, - KademliaEvent as Libp2pKademliaEvent, QueryResult, RecordKey as Libp2pRecordKey, + KademliaEvent as Libp2pKademliaEvent, QueryResult, }, swarm::{keep_alive, AddressScore, NetworkBehaviour, SwarmBuilder, SwarmEvent}, PeerId, Swarm,