Skip to content

Commit db7cbb0

Browse files
authored
substream/fix: Allow empty payloads with 0-length frame (#395)
This PR ensures that zero-length frames are propagated to higher levels. An issue was noted with the compatibility between litep2p and smoldot, which resulted in smoldot not being accepted by substrate-based chains. The issue stems from the litep2p substream (high level) implementation, which did not handle zero-length frames. Smoldot sends during the notification handshake a zero-length frame (ie, `frame [0]`). However, full nodes (litep2p and libp2p) send the node role `Role::Full` (ie, `frame [1, 1]`). Substrate nodes expected the node `Role`, however zero-length frames were never punished. This behavior causes the zero-length frames to go unnoticed and further call into `poll_read`. In the case of smoldot, the connection is terminated after the handshake timeout of 10 seconds. ### Testing Done - Reproduced the issue locally with zombinet and papi (https://github.com/polkadot-api/polkadot-api/tree/main/integration-tests/zombie-tests) - The e2e tests were timing out, because litep2p rejected the `/transactions/1` substream with smoldot (however it accepted the /sync substream which sends a different payload) - Added a new notification test to illustrate this behavior with different payloads (including empty ones) Closes: smol-dot/smoldot#2128 Thanks @josepot and @tomaka for repro-case and details 🙏 --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
1 parent ff1419a commit db7cbb0

File tree

2 files changed

+281
-0
lines changed

2 files changed

+281
-0
lines changed

src/substream/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,13 @@ impl Stream for Substream {
645645
}
646646

647647
this.offset = 0;
648+
// Handle empty payloads detected as 0-length frame.
649+
// The offset must be cleared to 0 to not interfere
650+
// with next framing.
651+
if size == 0 {
652+
return Poll::Ready(Some(Ok(BytesMut::new())));
653+
}
654+
648655
this.current_frame_size = Some(size);
649656
this.read_buffer = BytesMut::zeroed(size);
650657
}

tests/protocol/notification.rs

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2009,6 +2009,280 @@ async fn dialer_fallback_protocol_works(transport1: Transport, transport2: Trans
20092009
);
20102010
}
20112011

2012+
#[tokio::test]
2013+
async fn zero_byte_handshake_tcp() {
2014+
// Full node role.
2015+
zero_byte_handshake(
2016+
Transport::Tcp(TcpConfig {
2017+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
2018+
..Default::default()
2019+
}),
2020+
Transport::Tcp(TcpConfig {
2021+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
2022+
..Default::default()
2023+
}),
2024+
vec![1],
2025+
)
2026+
.await;
2027+
2028+
// Invalid role set as `ObservedRole::NONE`.
2029+
zero_byte_handshake(
2030+
Transport::Tcp(TcpConfig {
2031+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
2032+
..Default::default()
2033+
}),
2034+
Transport::Tcp(TcpConfig {
2035+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
2036+
..Default::default()
2037+
}),
2038+
vec![0],
2039+
)
2040+
.await;
2041+
2042+
// Light client role provided by smoldot.
2043+
zero_byte_handshake(
2044+
Transport::Tcp(TcpConfig {
2045+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
2046+
..Default::default()
2047+
}),
2048+
Transport::Tcp(TcpConfig {
2049+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
2050+
..Default::default()
2051+
}),
2052+
vec![],
2053+
)
2054+
.await;
2055+
}
2056+
2057+
#[cfg(feature = "quic")]
2058+
#[tokio::test]
2059+
async fn zero_byte_handshake_quic() {
2060+
zero_byte_handshake(
2061+
Transport::Quic(Default::default()),
2062+
Transport::Quic(Default::default()),
2063+
vec![1],
2064+
)
2065+
.await;
2066+
2067+
zero_byte_handshake(
2068+
Transport::Quic(Default::default()),
2069+
Transport::Quic(Default::default()),
2070+
vec![0],
2071+
)
2072+
.await;
2073+
2074+
zero_byte_handshake(
2075+
Transport::Quic(Default::default()),
2076+
Transport::Quic(Default::default()),
2077+
vec![],
2078+
)
2079+
.await;
2080+
}
2081+
2082+
#[cfg(feature = "websocket")]
2083+
#[tokio::test]
2084+
async fn zero_byte_handshake_websocket() {
2085+
// Full node role.
2086+
zero_byte_handshake(
2087+
Transport::WebSocket(WebSocketConfig {
2088+
listen_addresses: vec!["/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()],
2089+
..Default::default()
2090+
}),
2091+
Transport::WebSocket(WebSocketConfig {
2092+
listen_addresses: vec!["/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()],
2093+
..Default::default()
2094+
}),
2095+
vec![1],
2096+
)
2097+
.await;
2098+
2099+
// Invalid role set as `ObservedRole::NONE`.
2100+
zero_byte_handshake(
2101+
Transport::WebSocket(WebSocketConfig {
2102+
listen_addresses: vec!["/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()],
2103+
..Default::default()
2104+
}),
2105+
Transport::WebSocket(WebSocketConfig {
2106+
listen_addresses: vec!["/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()],
2107+
..Default::default()
2108+
}),
2109+
vec![0],
2110+
)
2111+
.await;
2112+
2113+
// Light client role provided by smoldot.
2114+
zero_byte_handshake(
2115+
Transport::WebSocket(WebSocketConfig {
2116+
listen_addresses: vec!["/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()],
2117+
..Default::default()
2118+
}),
2119+
Transport::WebSocket(WebSocketConfig {
2120+
listen_addresses: vec!["/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()],
2121+
..Default::default()
2122+
}),
2123+
vec![],
2124+
)
2125+
.await;
2126+
}
2127+
2128+
async fn zero_byte_handshake(transport1: Transport, transport2: Transport, handshake: Vec<u8>) {
2129+
let _ = tracing_subscriber::fmt()
2130+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2131+
.try_init();
2132+
2133+
let (notif_config1, mut handle1) = ConfigBuilder::new(ProtocolName::from("/notif/1"))
2134+
.with_max_size(1024usize)
2135+
.with_handshake(handshake.clone())
2136+
.build();
2137+
2138+
let config1 = Litep2pConfigBuilder::new()
2139+
.with_keypair(Keypair::generate())
2140+
.with_notification_protocol(notif_config1);
2141+
2142+
let config1 = add_transport(config1, transport1).build();
2143+
2144+
let (notif_config2, mut handle2) = ConfigBuilder::new(ProtocolName::from("/notif/1"))
2145+
.with_max_size(1024usize)
2146+
.with_handshake(handshake.clone())
2147+
.build();
2148+
let config2 = Litep2pConfigBuilder::new()
2149+
.with_keypair(Keypair::generate())
2150+
.with_notification_protocol(notif_config2);
2151+
2152+
let config2 = add_transport(config2, transport2).build();
2153+
2154+
let mut litep2p1 = Litep2p::new(config1).unwrap();
2155+
let mut litep2p2 = Litep2p::new(config2).unwrap();
2156+
2157+
let peer1 = *litep2p1.local_peer_id();
2158+
let peer2 = *litep2p2.local_peer_id();
2159+
2160+
// wait until peers have connected and spawn the litep2p objects in the background
2161+
connect_peers(&mut litep2p1, &mut litep2p2).await;
2162+
tokio::spawn(async move {
2163+
loop {
2164+
tokio::select! {
2165+
_ = litep2p1.next_event() => {},
2166+
_ = litep2p2.next_event() => {},
2167+
}
2168+
}
2169+
});
2170+
2171+
// open substream for `peer2` and accept it
2172+
tracing::info!("Opening substream handle1 => handle2");
2173+
handle1.open_substream(peer2).await.unwrap();
2174+
2175+
tracing::info!("Expecting validate substream event...");
2176+
assert_eq!(
2177+
handle2.next().await.unwrap(),
2178+
NotificationEvent::ValidateSubstream {
2179+
protocol: ProtocolName::from("/notif/1"),
2180+
fallback: None,
2181+
peer: peer1,
2182+
handshake: handshake.clone(),
2183+
}
2184+
);
2185+
2186+
tracing::info!("Send validation result... peer2 => peer1");
2187+
handle2.send_validation_result(peer1, ValidationResult::Accept);
2188+
assert_eq!(
2189+
handle1.next().await.unwrap(),
2190+
NotificationEvent::ValidateSubstream {
2191+
protocol: ProtocolName::from("/notif/1"),
2192+
fallback: None,
2193+
peer: peer2,
2194+
handshake: handshake.clone(),
2195+
}
2196+
);
2197+
2198+
tracing::info!("Send validation result... peer1 => peer2");
2199+
handle1.send_validation_result(peer2, ValidationResult::Accept);
2200+
2201+
tracing::info!("Handle2 expecting notification stream opened event...");
2202+
assert_eq!(
2203+
handle2.next().await.unwrap(),
2204+
NotificationEvent::NotificationStreamOpened {
2205+
protocol: ProtocolName::from("/notif/1"),
2206+
fallback: None,
2207+
direction: Direction::Inbound,
2208+
peer: peer1,
2209+
handshake: handshake.clone(),
2210+
}
2211+
);
2212+
2213+
tracing::info!("Handle1 expecting notification stream opened event...");
2214+
assert_eq!(
2215+
handle1.next().await.unwrap(),
2216+
NotificationEvent::NotificationStreamOpened {
2217+
protocol: ProtocolName::from("/notif/1"),
2218+
fallback: None,
2219+
direction: Direction::Outbound,
2220+
peer: peer2,
2221+
handshake: handshake,
2222+
}
2223+
);
2224+
2225+
// This step ensures we have not messed with the notification frames.
2226+
tracing::info!("Send sync notification...");
2227+
handle1.send_sync_notification(peer2, vec![1, 3, 3, 7]).unwrap();
2228+
handle2.send_sync_notification(peer1, vec![1, 3, 3, 8]).unwrap();
2229+
2230+
assert_eq!(
2231+
handle2.next().await.unwrap(),
2232+
NotificationEvent::NotificationReceived {
2233+
peer: peer1,
2234+
notification: BytesMut::from(&[1, 3, 3, 7][..]),
2235+
}
2236+
);
2237+
assert_eq!(
2238+
handle1.next().await.unwrap(),
2239+
NotificationEvent::NotificationReceived {
2240+
peer: peer2,
2241+
notification: BytesMut::from(&[1, 3, 3, 8][..]),
2242+
}
2243+
);
2244+
2245+
// Ensure the handle can send empty notifications.
2246+
tracing::info!("Send empty sync notification...");
2247+
handle1.send_sync_notification(peer2, vec![]).unwrap();
2248+
handle2.send_sync_notification(peer1, vec![]).unwrap();
2249+
2250+
assert_eq!(
2251+
handle2.next().await.unwrap(),
2252+
NotificationEvent::NotificationReceived {
2253+
peer: peer1,
2254+
notification: BytesMut::from(&[][..]),
2255+
}
2256+
);
2257+
assert_eq!(
2258+
handle1.next().await.unwrap(),
2259+
NotificationEvent::NotificationReceived {
2260+
peer: peer2,
2261+
notification: BytesMut::from(&[][..]),
2262+
}
2263+
);
2264+
2265+
// Double check non-empty notifications.
2266+
tracing::info!("Send sync notification...");
2267+
handle1.send_sync_notification(peer2, vec![1, 3, 3, 9]).unwrap();
2268+
handle2.send_sync_notification(peer1, vec![1, 3, 3, 4]).unwrap();
2269+
2270+
assert_eq!(
2271+
handle2.next().await.unwrap(),
2272+
NotificationEvent::NotificationReceived {
2273+
peer: peer1,
2274+
notification: BytesMut::from(&[1, 3, 3, 9][..]),
2275+
}
2276+
);
2277+
assert_eq!(
2278+
handle1.next().await.unwrap(),
2279+
NotificationEvent::NotificationReceived {
2280+
peer: peer2,
2281+
notification: BytesMut::from(&[1, 3, 3, 4][..]),
2282+
}
2283+
);
2284+
}
2285+
20122286
#[tokio::test]
20132287
async fn listener_fallback_protocol_works_tcp() {
20142288
listener_fallback_protocol_works(

0 commit comments

Comments
 (0)