Skip to content

Commit

Permalink
Reconnect L1 WS stream if disconnected
Browse files Browse the repository at this point in the history
Ethers internally tries to reconnect once, but if that fails, it
gives up and shuts down the whole provider backend. This is not
good in the case of extended outages of the L1 server.

This change handles errors from the underlying RPC client indicating
that the connection is closed by spawning a task to reestablish the
connection. This task tries until it succceeds.
  • Loading branch information
jbearer committed Nov 18, 2024
1 parent 57f1493 commit 67d297f
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,8 @@ tracing = { workspace = true }
url = { workspace = true }
vbs = { workspace = true }

[dev-dependencies]
portpicker = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["base64_bytes", "hotshot_testing"]
218 changes: 208 additions & 10 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ use std::{
cmp::{min, Ordering},
fmt::Debug,
num::NonZeroUsize,
sync::Arc,
sync::{
atomic::{self, AtomicBool},
Arc,
},
time::Duration,
};

use anyhow::{bail, Context};
Expand All @@ -12,7 +16,7 @@ use committable::{Commitment, Committable, RawCommitmentBuilder};
use contract_bindings::fee_contract::FeeContract;
use ethers::{
prelude::{Address, BlockNumber, Middleware, Provider, H256, U256},
providers::{Http, JsonRpcClient, ProviderError, PubsubClient, Ws},
providers::{Http, JsonRpcClient, ProviderError, PubsubClient, Ws, WsClientError},
};
use futures::{
future::Future,
Expand All @@ -22,14 +26,14 @@ use lru::LruCache;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
spawn,
sync::{Mutex, MutexGuard},
sync::{Mutex, MutexGuard, RwLock},
time::sleep,
};
use tracing::Instrument;
use url::Url;

use super::{L1BlockInfo, L1State, L1UpdateTask, RpcClient};
use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot};
use crate::{FeeInfo, L1Client, L1ClientOptions, L1Event, L1Snapshot, WsConn};

impl PartialOrd for L1BlockInfo {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Expand Down Expand Up @@ -77,6 +81,23 @@ impl L1BlockInfo {
}
}

impl RpcClient {
fn http(url: Url) -> Self {
Self::Http(Http::new(url))
}

async fn ws(url: Url, retry_delay: Duration) -> anyhow::Result<Self> {
Ok(Self::Ws {
conn: Arc::new(RwLock::new(WsConn {
inner: Ws::connect(url.clone()).await?,
resetting: AtomicBool::new(false),
})),
retry_delay,
url,
})
}
}

#[async_trait]
impl JsonRpcClient for RpcClient {
type Error = ProviderError;
Expand All @@ -88,7 +109,63 @@ impl JsonRpcClient for RpcClient {
{
let res = match self {
Self::Http(client) => client.request(method, params).await?,
Self::Ws(client) => client.request(method, params).await?,
Self::Ws {
conn,
url,
retry_delay,
} => {
let conn_guard = conn
.try_read()
// We only lock the connection exclusively when we are resetting it, so if it is
// locked that means it was closed and is still being reset. There is no point
// in trying a request with a closed connection.
.map_err(|_| {
ProviderError::CustomError("connection closed; reset in progress".into())
})?;
match conn_guard.inner.request(method, params).await {
Ok(res) => res,
Err(err @ WsClientError::UnexpectedClose) => {
// If the WebSocket connection is closed, try to reopen it.
if !conn_guard.resetting.swap(true, atomic::Ordering::SeqCst) {
// We are the first one to try and reset this connection, so let's do
// it. We spawn a separate task to do it, because resetting it might
// take a long time (especially if it was closed because of a provider
// outage), and it is not good to block indefinitely in this low-level
// request API.
let conn = conn.clone();
let url = url.clone();
let retry_delay = *retry_delay;
let span = tracing::warn_span!("ws resetter");
spawn(
async move {
tracing::warn!("ws connection closed, trying to reset");
let inner = loop {
match Ws::connect(url.clone()).await {
Ok(inner) => break inner,
Err(err) => {
tracing::warn!("failed to reconnect: {err:#}");
sleep(retry_delay).await;
}
}
};
let new_conn = WsConn {
inner,
resetting: AtomicBool::new(false),
};
*conn.write().await = new_conn;
tracing::info!("ws connection successfully reestablished");
}
.instrument(span),
);
} else {
// Otherwise, if we couldn't get a write lock, it is because someone
// else is already resetting this connection, so we have nothing to do.
}
Err(err)?
}
Err(err) => Err(err)?,
}
}
};
Ok(res)
}
Expand All @@ -105,7 +182,16 @@ impl PubsubClient for RpcClient {
Self::Http(_) => Err(ProviderError::CustomError(
"subscriptions not supported with HTTP client".into(),
)),
Self::Ws(client) => Ok(client.subscribe(id)?),
Self::Ws { conn, .. } => Ok(conn
.try_read()
// We only lock the connection exclusively when we are resetting it, so if it is
// locked that means it was closed and is still being reset. There is no point
// in trying to subscribe with a closed connection.
.map_err(|_| {
ProviderError::CustomError("connection closed; reset in progress".into())
})?
.inner
.subscribe(id)?),
}
}

Expand All @@ -117,7 +203,16 @@ impl PubsubClient for RpcClient {
Self::Http(_) => Err(ProviderError::CustomError(
"subscriptions not supported with HTTP client".into(),
)),
Self::Ws(client) => Ok(client.unsubscribe(id)?),
Self::Ws { conn, .. } => Ok(conn
.try_read()
// We only lock the connection exclusively when we are resetting it, so if it is
// locked that means it was closed and is still being reset. There is no point
// in doing anything with a closed connection.
.map_err(|_| {
ProviderError::CustomError("connection closed; reset in progress".into())
})?
.inner
.unsubscribe(id)?),
}
}
}
Expand Down Expand Up @@ -153,16 +248,17 @@ impl L1ClientOptions {
///
/// `url` must have a scheme `http` or `https`.
pub fn http(self, url: Url) -> L1Client {
L1Client::with_provider(self, Provider::new(RpcClient::Http(Http::new(url))))
L1Client::with_provider(self, Provider::new(RpcClient::http(url)))
}

/// Construct a new WebSockets client.
///
/// `url` must have a scheme `ws` or `wss`.
pub async fn ws(self, url: Url) -> anyhow::Result<L1Client> {
let retry_delay = self.l1_retry_delay;
Ok(L1Client::with_provider(
self,
Provider::new(RpcClient::Ws(Ws::connect(url).await?)),
Provider::new(RpcClient::ws(url, retry_delay).await?),
))
}
}
Expand Down Expand Up @@ -238,7 +334,7 @@ impl L1Client {
// Subscribe to new blocks. This task cannot fail; retry until we succeed.
let mut block_stream = loop {
let res = match (*rpc).as_ref() {
RpcClient::Ws(_) => rpc.subscribe_blocks().await.map(StreamExt::boxed),
RpcClient::Ws { .. } => rpc.subscribe_blocks().await.map(StreamExt::boxed),
RpcClient::Http(_) => rpc
.watch_blocks()
.await
Expand Down Expand Up @@ -656,6 +752,7 @@ mod test {
prelude::{LocalWallet, Signer, SignerMiddleware, H160, U64},
utils::{hex, parse_ether, Anvil, AnvilInstance},
};
use portpicker::pick_unused_port;
use sequencer_utils::test_utils::setup_test;
use std::time::Duration;
use time::OffsetDateTime;
Expand Down Expand Up @@ -948,4 +1045,105 @@ mod test {
async fn test_wait_for_block_http() {
test_wait_for_block_helper(false).await
}

#[tokio::test(flavor = "multi_thread")]
async fn test_l1_ws_reconnect_rpc_request() {
setup_test();

let port = pick_unused_port().unwrap();
let anvil = Anvil::new().block_time(1u32).port(port).spawn();
let provider = Provider::new(
RpcClient::ws(anvil.ws_endpoint().parse().unwrap(), Duration::from_secs(1))
.await
.unwrap(),
);

// Check the provider is working.
assert_eq!(provider.get_chainid().await.unwrap(), 31337.into());

// Disconnect the WebSocket and reconnect it. Technically this spawns a whole new Anvil
// chain, but for the purposes of this test it should look to the client like an L1 server
// closing a WebSocket connection.
drop(anvil);
let err = provider.get_chainid().await.unwrap_err();
tracing::info!("L1 request failed as expected with closed connection: {err:#}");

// Let the connection stay down for a little while: Ethers internally tries to reconnect,
// and starting up to fast again might hit that and cause a false positive. The problem is,
// Ethers doesn't try very hard, and if we wait a bit, we will test the worst possible case
// where the internal retry logic gives up and just kills the whole provider.
tracing::info!("sleep 5");
sleep(Duration::from_secs(5)).await;

// Once a connection is reestablished, the provider will eventually work again.
tracing::info!("restarting L1");
let _anvil = Anvil::new().block_time(1u32).port(port).spawn();
// Give a bit of time for the provider to reconnect.
for retry in 0..5 {
if let Ok(chain_id) = provider.get_chainid().await {
assert_eq!(chain_id, 31337.into());
return;
}
tracing::warn!(retry, "waiting for provider to reconnect");
sleep(Duration::from_secs(1)).await;
}
panic!("request never succeeded after reconnect");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_l1_ws_reconnect_update_task() {
setup_test();

let port = pick_unused_port().unwrap();
let anvil = Anvil::new().block_time(1u32).port(port).spawn();
let client = new_l1_client(&anvil, true).await;

let initial_state = client.snapshot().await;
tracing::info!(?initial_state, "initial state");

// Check the state is updating.
let mut retry = 0;
let updated_state = loop {
assert!(retry < 5, "state did not update in time");

let updated_state = client.snapshot().await;
if updated_state.head > initial_state.head {
break updated_state;
}
tracing::info!(retry, "waiting for state update");
sleep(Duration::from_secs(1)).await;
retry += 1;
};
tracing::info!(?updated_state, "state updated");

// Disconnect the WebSocket and reconnect it. Technically this spawns a whole new Anvil
// chain, but for the purposes of this test it should look to the client like an L1 server
// closing a WebSocket connection.
drop(anvil);

// Let the connection stay down for a little while: Ethers internally tries to reconnect,
// and starting up to fast again might hit that and cause a false positive. The problem is,
// Ethers doesn't try very hard, and if we wait a bit, we will test the worst possible case
// where the internal retry logic gives up and just kills the whole provider.
tracing::info!("sleep 5");
sleep(Duration::from_secs(5)).await;

// Once a connection is reestablished, the state will eventually start to update again.
tracing::info!("restarting L1");
let _anvil = Anvil::new().block_time(1u32).port(port).spawn();

let mut retry = 0;
let final_state = loop {
assert!(retry < 5, "state did not update in time");

let final_state = client.snapshot().await;
if final_state.head > updated_state.head {
break final_state;
}
tracing::info!(retry, "waiting for state update");
sleep(Duration::from_secs(1)).await;
retry += 1;
};
tracing::info!(?final_state, "state updated");
}
}
2 changes: 1 addition & 1 deletion types/src/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ reexport_unchanged_types!(
ViewBasedUpgrade,
BlockSize,
);
pub(crate) use v0_3::{L1Event, L1State, L1UpdateTask, RpcClient};
pub(crate) use v0_3::{L1Event, L1State, L1UpdateTask, RpcClient, WsConn};

#[derive(
Clone, Copy, Debug, Default, Hash, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize,
Expand Down
19 changes: 17 additions & 2 deletions types/src/v0/v0_1/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ use ethers::{
};
use lru::LruCache;
use serde::{Deserialize, Serialize};
use tokio::{sync::Mutex, task::JoinHandle};
use std::sync::atomic::AtomicBool;
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use url::Url;

#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Hash, PartialEq, Eq)]
pub struct L1BlockInfo {
Expand Down Expand Up @@ -112,7 +117,17 @@ pub struct L1Client {
#[derive(Clone, Debug)]
pub(crate) enum RpcClient {
Http(Http),
Ws(Ws),
Ws {
conn: Arc<RwLock<WsConn>>,
url: Url,
retry_delay: Duration,
},
}

#[derive(Debug)]
pub(crate) struct WsConn {
pub(crate) inner: Ws,
pub(crate) resetting: AtomicBool,
}

/// In-memory view of the L1 state, updated asynchronously.
Expand Down
12 changes: 6 additions & 6 deletions types/src/v0/v0_3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use vbs::version::Version;
pub use super::v0_1::{
AccountQueryData, BlockMerkleCommitment, BlockMerkleTree, BlockSize, BuilderSignature, ChainId,
Delta, FeeAccount, FeeAccountProof, FeeAmount, FeeInfo, FeeMerkleCommitment, FeeMerkleProof,
FeeMerkleTree, Index, Iter, L1BlockInfo, L1Client, L1ClientOptions, L1Snapshot, NamespaceId, NsIndex, NsIter,
NsPayload, NsPayloadBuilder, NsPayloadByteLen, NsPayloadOwned, NsPayloadRange, NsProof,
NsTable, NsTableBuilder, NsTableValidationError, NumNss, NumTxs, NumTxsRange, NumTxsUnchecked,
Payload, PayloadByteLen, TimeBasedUpgrade, Transaction, TxIndex, TxIter, TxPayload,
TxPayloadRange, TxProof, TxTableEntries, TxTableEntriesRange, Upgrade, UpgradeMode,
FeeMerkleTree, Index, Iter, L1BlockInfo, L1Client, L1ClientOptions, L1Snapshot, NamespaceId,
NsIndex, NsIter, NsPayload, NsPayloadBuilder, NsPayloadByteLen, NsPayloadOwned, NsPayloadRange,
NsProof, NsTable, NsTableBuilder, NsTableValidationError, NumNss, NumTxs, NumTxsRange,
NumTxsUnchecked, Payload, PayloadByteLen, TimeBasedUpgrade, Transaction, TxIndex, TxIter,
TxPayload, TxPayloadRange, TxProof, TxTableEntries, TxTableEntriesRange, Upgrade, UpgradeMode,
UpgradeType, ViewBasedUpgrade, BLOCK_MERKLE_TREE_HEIGHT, FEE_MERKLE_TREE_HEIGHT,
NS_ID_BYTE_LEN, NS_OFFSET_BYTE_LEN, NUM_NSS_BYTE_LEN, NUM_TXS_BYTE_LEN, TX_OFFSET_BYTE_LEN,
};
pub(crate) use super::v0_1::{L1Event, L1State, RpcClient, L1UpdateTask};
pub(crate) use super::v0_1::{L1Event, L1State, L1UpdateTask, RpcClient, WsConn};

pub const VERSION: Version = Version { major: 0, minor: 3 };

Expand Down

0 comments on commit 67d297f

Please sign in to comment.