Skip to content

Commit c0fef8d

Browse files
lexnvdmitry-markin
andauthored
tcp/websocket/quic: Fix cancel memory leak (#272)
This fixes a bug in the TCP and Websocket transports that was leaking memory for: - `canceled: HashSet<ConnectionId>` - leak since the beginning of litep2p - `cancel_futures: HashMap<ConnectionId, AbortHandle>` added in unmerged #255 The memory leak is happening in the following scenarios: - T0: transport manager: dials K (parallelism factor = 8) addresses on TCP and WebSocket on ConnectionId=1 - T1: TCP: establishes a connection with the peer ConnectionId=1 - T2: WebSocket: establishes a connection with the peer ConnectionId=1 - T3: transport manager: receives TCP establishment event and cancels `WebSocket` dials The issue happens when T2 finishes before T3. In this situation, the WebSocket transport no longer has a future with a corresponding ConnectionId=1. The canceling method simply inserts ConnectionId=1 into a hashset This leads to the hashset growing over time, without a way to clean-up stale connection IDs. The fix relies on the changes added in #255: - `cancel_futures` maps a connection ID to an abort handle - the `cancel_futures` is guaranteed to contain a connection ID that corresponds to an unfinished `pending_raw_connections` future - the cancel method just aborts the in-flight future, if it exists - state of the `cancel_futures` is done when polling `pending_raw_connections` ### Testing Done I used a custom-patched version of litep2p to log the number of pending dials. After a few hours, the pending dials for both TCP and WebSocket connections stabilized at just a few. (Same as #271). ``` 2024-10-22 17:37:56.252 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=1 pending_inbound_connections=0 pending_connections=1 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0 2024-10-22 17:38:26.252 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=1 opened_raw=0 cancel_futures=1 pending_open=0 2024-10-22 17:38:56.253 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0 2024-10-22 17:39:26.253 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0 2024-10-22 17:39:56.252 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0 2024-10-22 17:40:26.252 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=1 opened_raw=0 cancel_futures=1 pending_open=0 2024-10-22 17:40:56.252 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0 2024-10-22 17:41:26.252 INFO tokio-runtime-worker litep2p::tcp: status pending_dials=0 pending_inbound_connections=0 pending_connections=0 pending_raw_connections=0 opened_raw=0 cancel_futures=0 pending_open=0 ``` Build on: #255 Closes: #270 --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Dmitry Markin <dmitry@markin.tech>
1 parent d07c455 commit c0fef8d

File tree

3 files changed

+136
-42
lines changed

3 files changed

+136
-42
lines changed

src/transport/quic/mod.rs

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use multiaddr::{Multiaddr, Protocol};
4343
use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout};
4444

4545
use std::{
46-
collections::{HashMap, HashSet},
46+
collections::HashMap,
4747
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
4848
pin::Pin,
4949
sync::Arc,
@@ -120,9 +120,9 @@ pub(crate) struct QuicTransport {
120120
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
121121
opened_raw: HashMap<ConnectionId, (NegotiatedConnection, Multiaddr)>,
122122

123-
/// Canceled raw connections.
124-
canceled: HashSet<ConnectionId>,
125-
123+
/// Cancel raw connections futures.
124+
///
125+
/// This is cancelling `Self::pending_raw_connections`.
126126
cancel_futures: HashMap<ConnectionId, AbortHandle>,
127127
}
128128

@@ -235,7 +235,6 @@ impl TransportBuilder for QuicTransport {
235235
context,
236236
config,
237237
listener,
238-
canceled: HashSet::new(),
239238
opened_raw: HashMap::new(),
240239
pending_open: HashMap::new(),
241240
pending_dials: HashMap::new(),
@@ -477,8 +476,11 @@ impl Transport for QuicTransport {
477476

478477
/// Cancel opening connections.
479478
fn cancel(&mut self, connection_id: ConnectionId) {
480-
self.canceled.insert(connection_id);
481-
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
479+
// Cancel the future if it exists.
480+
// State clean-up happens inside the `poll_next`.
481+
if let Some(handle) = self.cancel_futures.get(&connection_id) {
482+
handle.abort();
483+
}
482484
}
483485
}
484486

@@ -510,27 +512,57 @@ impl Stream for QuicTransport {
510512
connection_id,
511513
address,
512514
stream,
513-
} =>
514-
if !self.canceled.remove(&connection_id) {
515+
} => {
516+
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
517+
tracing::warn!(
518+
target: LOG_TARGET,
519+
?connection_id,
520+
?address,
521+
"raw connection without a cancel handle",
522+
);
523+
continue;
524+
};
525+
526+
if !handle.is_aborted() {
515527
self.opened_raw.insert(connection_id, (stream, address.clone()));
516528

517529
return Poll::Ready(Some(TransportEvent::ConnectionOpened {
518530
connection_id,
519531
address,
520532
}));
521-
},
533+
}
534+
}
535+
522536
RawConnectionResult::Failed {
523537
connection_id,
524538
errors,
525-
} =>
526-
if !self.canceled.remove(&connection_id) {
539+
} => {
540+
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
541+
tracing::warn!(
542+
target: LOG_TARGET,
543+
?connection_id,
544+
?errors,
545+
"raw connection without a cancel handle",
546+
);
547+
continue;
548+
};
549+
550+
if !handle.is_aborted() {
527551
return Poll::Ready(Some(TransportEvent::OpenFailure {
528552
connection_id,
529553
errors,
530554
}));
531-
},
555+
}
556+
}
557+
532558
RawConnectionResult::Canceled { connection_id } => {
533-
self.canceled.remove(&connection_id);
559+
if self.cancel_futures.remove(&connection_id).is_none() {
560+
tracing::warn!(
561+
target: LOG_TARGET,
562+
?connection_id,
563+
"raw cancelled connection without a cancel handle",
564+
);
565+
}
534566
}
535567
}
536568
}

src/transport/tcp/mod.rs

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use socket2::{Domain, Socket, Type};
4646
use tokio::net::TcpStream;
4747

4848
use std::{
49-
collections::{HashMap, HashSet},
49+
collections::HashMap,
5050
net::SocketAddr,
5151
pin::Pin,
5252
task::{Context, Poll},
@@ -121,9 +121,9 @@ pub(crate) struct TcpTransport {
121121
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
122122
opened_raw: HashMap<ConnectionId, (TcpStream, Multiaddr)>,
123123

124-
/// Canceled raw connections.
125-
canceled: HashSet<ConnectionId>,
126-
124+
/// Cancel raw connections futures.
125+
///
126+
/// This is cancelling `Self::pending_raw_connections`.
127127
cancel_futures: HashMap<ConnectionId, AbortHandle>,
128128

129129
/// Connections which have been opened and negotiated but are being validated by the
@@ -291,7 +291,6 @@ impl TransportBuilder for TcpTransport {
291291
config,
292292
context,
293293
dial_addresses,
294-
canceled: HashSet::new(),
295294
opened_raw: HashMap::new(),
296295
pending_open: HashMap::new(),
297296
pending_dials: HashMap::new(),
@@ -516,8 +515,11 @@ impl Transport for TcpTransport {
516515
}
517516

518517
fn cancel(&mut self, connection_id: ConnectionId) {
519-
self.canceled.insert(connection_id);
520-
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
518+
// Cancel the future if it exists.
519+
// State clean-up happens inside the `poll_next`.
520+
if let Some(handle) = self.cancel_futures.get(&connection_id) {
521+
handle.abort();
522+
}
521523
}
522524
}
523525

@@ -560,27 +562,56 @@ impl Stream for TcpTransport {
560562
connection_id,
561563
address,
562564
stream,
563-
} =>
564-
if !self.canceled.remove(&connection_id) {
565+
} => {
566+
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
567+
tracing::warn!(
568+
target: LOG_TARGET,
569+
?connection_id,
570+
?address,
571+
"raw connection without a cancel handle",
572+
);
573+
continue;
574+
};
575+
576+
if !handle.is_aborted() {
565577
self.opened_raw.insert(connection_id, (stream, address.clone()));
566578

567579
return Poll::Ready(Some(TransportEvent::ConnectionOpened {
568580
connection_id,
569581
address,
570582
}));
571-
},
583+
}
584+
}
585+
572586
RawConnectionResult::Failed {
573587
connection_id,
574588
errors,
575-
} =>
576-
if !self.canceled.remove(&connection_id) {
589+
} => {
590+
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
591+
tracing::warn!(
592+
target: LOG_TARGET,
593+
?connection_id,
594+
?errors,
595+
"raw connection without a cancel handle",
596+
);
597+
continue;
598+
};
599+
600+
if !handle.is_aborted() {
577601
return Poll::Ready(Some(TransportEvent::OpenFailure {
578602
connection_id,
579603
errors,
580604
}));
581-
},
605+
}
606+
}
582607
RawConnectionResult::Canceled { connection_id } => {
583-
self.canceled.remove(&connection_id);
608+
if self.cancel_futures.remove(&connection_id).is_none() {
609+
tracing::warn!(
610+
target: LOG_TARGET,
611+
?connection_id,
612+
"raw cancelled connection without a cancel handle",
613+
);
614+
}
584615
}
585616
}
586617
}

src/transport/websocket/mod.rs

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
5050
use url::Url;
5151

5252
use std::{
53-
collections::{HashMap, HashSet},
53+
collections::HashMap,
5454
pin::Pin,
5555
task::{Context, Poll},
5656
time::Duration,
@@ -125,9 +125,9 @@ pub(crate) struct WebSocketTransport {
125125
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
126126
opened_raw: HashMap<ConnectionId, (WebSocketStream<MaybeTlsStream<TcpStream>>, Multiaddr)>,
127127

128-
/// Canceled raw connections.
129-
canceled: HashSet<ConnectionId>,
130-
128+
/// Cancel raw connections futures.
129+
///
130+
/// This is cancelling `Self::pending_raw_connections`.
131131
cancel_futures: HashMap<ConnectionId, AbortHandle>,
132132

133133
/// Negotiated connections waiting validation.
@@ -321,7 +321,6 @@ impl TransportBuilder for WebSocketTransport {
321321
config,
322322
context,
323323
dial_addresses,
324-
canceled: HashSet::new(),
325324
opened_raw: HashMap::new(),
326325
pending_open: HashMap::new(),
327326
pending_dials: HashMap::new(),
@@ -562,8 +561,11 @@ impl Transport for WebSocketTransport {
562561
}
563562

564563
fn cancel(&mut self, connection_id: ConnectionId) {
565-
self.canceled.insert(connection_id);
566-
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
564+
// Cancel the future if it exists.
565+
// State clean-up happens inside the `poll_next`.
566+
if let Some(handle) = self.cancel_futures.get(&connection_id) {
567+
handle.abort();
568+
}
567569
}
568570
}
569571

@@ -600,27 +602,56 @@ impl Stream for WebSocketTransport {
600602
connection_id,
601603
address,
602604
stream,
603-
} =>
604-
if !self.canceled.remove(&connection_id) {
605+
} => {
606+
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
607+
tracing::warn!(
608+
target: LOG_TARGET,
609+
?connection_id,
610+
?address,
611+
"raw connection without a cancel handle",
612+
);
613+
continue;
614+
};
615+
616+
if !handle.is_aborted() {
605617
self.opened_raw.insert(connection_id, (stream, address.clone()));
606618

607619
return Poll::Ready(Some(TransportEvent::ConnectionOpened {
608620
connection_id,
609621
address,
610622
}));
611-
},
623+
}
624+
}
625+
612626
RawConnectionResult::Failed {
613627
connection_id,
614628
errors,
615-
} =>
616-
if !self.canceled.remove(&connection_id) {
629+
} => {
630+
let Some(handle) = self.cancel_futures.remove(&connection_id) else {
631+
tracing::warn!(
632+
target: LOG_TARGET,
633+
?connection_id,
634+
?errors,
635+
"raw connection without a cancel handle",
636+
);
637+
continue;
638+
};
639+
640+
if !handle.is_aborted() {
617641
return Poll::Ready(Some(TransportEvent::OpenFailure {
618642
connection_id,
619643
errors,
620644
}));
621-
},
645+
}
646+
}
622647
RawConnectionResult::Canceled { connection_id } => {
623-
self.canceled.remove(&connection_id);
648+
if self.cancel_futures.remove(&connection_id).is_none() {
649+
tracing::warn!(
650+
target: LOG_TARGET,
651+
?connection_id,
652+
"raw cancelled connection without a cancel handle",
653+
);
654+
}
624655
}
625656
}
626657
}

0 commit comments

Comments
 (0)