Skip to content

Commit 897cbae

Browse files
committed
websocket/connection: Allow granular control wrt open timeouts
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
1 parent 04d3497 commit 897cbae

File tree

1 file changed

+38
-14
lines changed

1 file changed

+38
-14
lines changed

src/transport/websocket/connection.rs

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -195,18 +195,26 @@ impl WebSocketConnection {
195195
stream: S,
196196
role: &Role,
197197
protocols: Vec<&str>,
198+
substream_open_timeout: Duration,
198199
) -> Result<(Negotiated<S>, ProtocolName), NegotiationError> {
199200
tracing::trace!(target: LOG_TARGET, ?protocols, "negotiating protocols");
200201

201-
let (protocol, socket) = match role {
202-
Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await,
203-
Role::Listener => listener_select_proto(stream, protocols).await,
204-
}
205-
.map_err(NegotiationError::MultistreamSelectError)?;
206-
207-
tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated");
202+
match tokio::time::timeout(substream_open_timeout, async move {
203+
match role {
204+
Role::Dialer => dialer_select_proto(stream, protocols, Version::V1).await,
205+
Role::Listener => listener_select_proto(stream, protocols).await,
206+
}
207+
})
208+
.await
209+
{
210+
Err(_) => Err(NegotiationError::Timeout),
211+
Ok(Err(error)) => Err(NegotiationError::MultistreamSelectError(error)),
212+
Ok(Ok((protocol, socket))) => {
213+
tracing::trace!(target: LOG_TARGET, ?protocol, "protocol negotiated");
208214

209-
Ok((socket, ProtocolName::from(protocol.to_string())))
215+
Ok((socket, ProtocolName::from(protocol.to_string())))
216+
}
217+
}
210218
}
211219

212220
/// Open WebSocket connection.
@@ -220,6 +228,7 @@ impl WebSocketConnection {
220228
yamux_config: crate::yamux::Config,
221229
max_read_ahead_factor: usize,
222230
max_write_buffer_size: usize,
231+
substream_open_timeout: Duration,
223232
) -> Result<NegotiatedConnection, NegotiationError> {
224233
tracing::trace!(
225234
target: LOG_TARGET,
@@ -239,6 +248,7 @@ impl WebSocketConnection {
239248
yamux_config,
240249
max_read_ahead_factor,
241250
max_write_buffer_size,
251+
substream_open_timeout,
242252
)
243253
.await
244254
}
@@ -252,6 +262,7 @@ impl WebSocketConnection {
252262
yamux_config: crate::yamux::Config,
253263
max_read_ahead_factor: usize,
254264
max_write_buffer_size: usize,
265+
substream_open_timeout: Duration,
255266
) -> Result<NegotiatedConnection, NegotiationError> {
256267
let stream = MaybeTlsStream::Plain(stream);
257268

@@ -267,6 +278,7 @@ impl WebSocketConnection {
267278
yamux_config,
268279
max_read_ahead_factor,
269280
max_write_buffer_size,
281+
substream_open_timeout,
270282
)
271283
.await
272284
}
@@ -282,6 +294,7 @@ impl WebSocketConnection {
282294
yamux_config: crate::yamux::Config,
283295
max_read_ahead_factor: usize,
284296
max_write_buffer_size: usize,
297+
substream_open_timeout: Duration,
285298
) -> Result<NegotiatedConnection, NegotiationError> {
286299
tracing::trace!(
287300
target: LOG_TARGET,
@@ -294,7 +307,8 @@ impl WebSocketConnection {
294307
let stream = BufferedStream::new(stream);
295308

296309
// negotiate `noise`
297-
let (stream, _) = Self::negotiate_protocol(stream, &role, vec!["/noise"]).await?;
310+
let (stream, _) =
311+
Self::negotiate_protocol(stream, &role, vec!["/noise"], substream_open_timeout).await?;
298312

299313
tracing::trace!(
300314
target: LOG_TARGET,
@@ -308,6 +322,7 @@ impl WebSocketConnection {
308322
role,
309323
max_read_ahead_factor,
310324
max_write_buffer_size,
325+
substream_open_timeout,
311326
)
312327
.await?;
313328

@@ -321,7 +336,9 @@ impl WebSocketConnection {
321336
tracing::trace!(target: LOG_TARGET, "noise handshake done");
322337

323338
// negotiate `yamux`
324-
let (stream, _) = Self::negotiate_protocol(stream, &role, vec!["/yamux/1.0.0"]).await?;
339+
let (stream, _) =
340+
Self::negotiate_protocol(stream, &role, vec!["/yamux/1.0.0"], substream_open_timeout)
341+
.await?;
325342
tracing::trace!(target: LOG_TARGET, "`yamux` negotiated");
326343

327344
let connection = crate::yamux::Connection::new(stream.inner(), yamux_config, role.into());
@@ -349,6 +366,7 @@ impl WebSocketConnection {
349366
permit: Permit,
350367
substream_id: SubstreamId,
351368
protocols: Vec<ProtocolName>,
369+
substream_open_timeout: Duration,
352370
) -> Result<NegotiatedSubstream, NegotiationError> {
353371
tracing::trace!(
354372
target: LOG_TARGET,
@@ -357,7 +375,9 @@ impl WebSocketConnection {
357375
);
358376

359377
let protocols = protocols.iter().map(|protocol| &**protocol).collect::<Vec<&str>>();
360-
let (io, protocol) = Self::negotiate_protocol(stream, &Role::Listener, protocols).await?;
378+
let (io, protocol) =
379+
Self::negotiate_protocol(stream, &Role::Listener, protocols, substream_open_timeout)
380+
.await?;
361381

362382
tracing::trace!(
363383
target: LOG_TARGET,
@@ -381,6 +401,7 @@ impl WebSocketConnection {
381401
substream_id: SubstreamId,
382402
protocol: ProtocolName,
383403
fallback_names: Vec<ProtocolName>,
404+
substream_open_timeout: Duration,
384405
) -> Result<NegotiatedSubstream, SubstreamError> {
385406
tracing::debug!(target: LOG_TARGET, ?protocol, ?substream_id, "open substream");
386407

@@ -409,7 +430,9 @@ impl WebSocketConnection {
409430
.chain(fallback_names.iter().map(|protocol| &**protocol))
410431
.collect();
411432

412-
let (io, protocol) = Self::negotiate_protocol(stream, &Role::Dialer, protocols).await?;
433+
let (io, protocol) =
434+
Self::negotiate_protocol(stream, &Role::Dialer, protocols, substream_open_timeout)
435+
.await?;
413436

414437
Ok(NegotiatedSubstream {
415438
io: io.inner(),
@@ -438,7 +461,7 @@ impl WebSocketConnection {
438461
self.pending_substreams.push(Box::pin(async move {
439462
match tokio::time::timeout(
440463
substream_open_timeout,
441-
Self::accept_substream(stream, permit, substream, protocols),
464+
Self::accept_substream(stream, permit, substream, protocols, substream_open_timeout),
442465
)
443466
.await
444467
{
@@ -537,7 +560,8 @@ impl WebSocketConnection {
537560
permit,
538561
substream_id,
539562
protocol.clone(),
540-
fallback_names
563+
fallback_names,
564+
substream_open_timeout
541565
),
542566
)
543567
.await

0 commit comments

Comments
 (0)