Skip to content

Commit 314a2e9

Browse files
lexnvdmitry-markin
andauthored
transport_service: Improve connection stability by downgrading connections on substream inactivity (#260)
This PR advances the keep-alive timeout of the transport service. Previously, the keep-alive timeout was triggered 5 seconds after the connection was reported to the transport service regardless of substream activity. - (0secs) T0: connection established; keep-alive timeout set to 5seconds in the future - (4secs) T1: substream A, B, C opened - (5secs) T2: keep-alive timeout triggered and the connection is downgraded. T1 was not taken into account, otherwise, the keep-alive timeout should be triggered at second 9 (T1 at 4 seconds + keepalive 5 seconds) - (6secs) T3: substreams A, B, C closed -> connection closes - (7secs) T4: cannot open new substreams even if we expected the connection to be kept alive for longer In this PR: - `KeepAliveTracker` structure to forward the keep-alive timeout of connections. - Connection ID is forwarded to `SubstreamOpened` events to identify properly substream Ids. This is needed because the `ConnectionContext` contains up to two connections (primary and secondary) ### Testing Done - test to ensure keepalive downgrades the connection after 5 seconds - test to ensure keepalive is forwarded on substream activity - test to ensure a downgraded connection with dropped substreams is closed Closes #253. --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Dmitry Markin <dmitry@markin.tech>
1 parent c0fef8d commit 314a2e9

File tree

9 files changed

+790
-38
lines changed

9 files changed

+790
-38
lines changed

src/protocol/connection.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ impl ConnectionHandle {
9494
}
9595
}
9696

97+
/// Try to upgrade the connection to active state.
98+
pub fn try_upgrade(&mut self) {
99+
if let ConnectionType::Inactive(inactive) = &self.connection {
100+
if let Some(active) = inactive.upgrade() {
101+
self.connection = ConnectionType::Active(active);
102+
}
103+
}
104+
}
105+
97106
/// Attempt to acquire permit which will keep the connection open for indefinite time.
98107
pub fn try_get_permit(&self) -> Option<Permit> {
99108
match &self.connection {
@@ -120,6 +129,7 @@ impl ConnectionHandle {
120129
protocol: protocol.clone(),
121130
fallback_names,
122131
substream_id,
132+
connection_id: self.connection_id.clone(),
123133
permit,
124134
})
125135
.map_err(|error| match error {
@@ -141,10 +151,15 @@ impl ConnectionHandle {
141151
TrySendError::Closed(_) => Error::ConnectionClosed,
142152
})
143153
}
154+
155+
/// Check if the connection is active.
156+
pub fn is_active(&self) -> bool {
157+
matches!(self.connection, ConnectionType::Active(_))
158+
}
144159
}
145160

146161
/// Type which allows the connection to be kept open.
147-
#[derive(Debug)]
162+
#[derive(Debug, Clone)]
148163
pub struct Permit {
149164
/// Active connection.
150165
_connection: Sender<ProtocolCommand>,

src/protocol/notification/tests/notification.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ async fn remote_opens_multiple_inbound_substreams() {
475475
SubstreamId::from(0usize),
476476
Box::new(DummySubstream::new()),
477477
),
478+
connection_id: ConnectionId::from(0usize),
478479
})
479480
.await
480481
.unwrap();
@@ -511,6 +512,7 @@ async fn remote_opens_multiple_inbound_substreams() {
511512
SubstreamId::from(0usize),
512513
Box::new(substream),
513514
),
515+
connection_id: ConnectionId::from(0usize),
514516
})
515517
.await
516518
.unwrap();

src/protocol/protocol_set.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ pub enum InnerTransportEvent {
122122
/// distinguish between different outbound substreams.
123123
direction: Direction,
124124

125+
/// Connection ID.
126+
connection_id: ConnectionId,
127+
125128
/// Substream.
126129
substream: Substream,
127130
},
@@ -149,6 +152,7 @@ impl From<InnerTransportEvent> for TransportEvent {
149152
fallback,
150153
direction,
151154
substream,
155+
..
152156
} => TransportEvent::SubstreamOpened {
153157
peer,
154158
protocol,
@@ -164,7 +168,7 @@ impl From<InnerTransportEvent> for TransportEvent {
164168
}
165169

166170
/// Events emitted by the installed protocols to transport.
167-
#[derive(Debug)]
171+
#[derive(Debug, Clone)]
168172
pub enum ProtocolCommand {
169173
/// Open substream.
170174
OpenSubstream {
@@ -192,6 +196,9 @@ pub enum ProtocolCommand {
192196
/// and associate incoming substreams with whatever logic it has.
193197
substream_id: SubstreamId,
194198

199+
/// Connection ID.
200+
connection_id: ConnectionId,
201+
195202
/// Connection permit.
196203
///
197204
/// `Permit` allows the connection to be kept open while the permit is held and it is given
@@ -300,6 +307,7 @@ impl ProtocolSet {
300307
fallback,
301308
direction,
302309
substream,
310+
connection_id: self.connection.connection_id().clone(),
303311
};
304312

305313
protocol_context

0 commit comments

Comments
 (0)