Skip to content

Commit

Permalink
Fix dangling endpoint to chain worker actor that failed to start (#3298)
Browse files Browse the repository at this point in the history
Testnet is currently experiencing some panics when an attempt is made to
send a request to a chain worker actor. The panic message says that the
endpoint is closed. One possible cause is #3295, where a storage error
could cause an endpoint to be added to the chain worker cache but its
respective actor task wouldn't start.

Refactor the `ChainWorkerActor` so that it has a `run` method that loads
the chain state from storage and then handles the incoming requests. If
loading the state fails, then the error is reported to the next
requester, and loading will be reattempted while the actor is running
and receiving requests.

CI should catch any regressions caused by this refactor.

- These changes should be backported to the latest `devnet` branch, then
    - be released in a validator hotfix.
- These changes should be backported to the latest `testnet` branch,
then
    - be released in a validator hotfix.

This needs to be hotfixed because it may fix a bug that's in production.

- [reviewer
checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
- Closes #3295
  • Loading branch information
jvff committed Feb 12, 2025
1 parent 1566fa9 commit 00854a0
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 127 deletions.
8 changes: 4 additions & 4 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ where
// Try processing the inbox optimistically without waiting for validator notifications.
let (new_certificates, maybe_timeout) = {
chain_client.synchronize_from_validators().await?;
let result = chain_client.process_inbox_without_prepare().await;
let result = Box::pin(chain_client.process_inbox_without_prepare()).await;
self.update_wallet_from_client(chain_client).await?;
if result.is_err() {
self.save_wallet().await?;
Expand All @@ -314,7 +314,7 @@ where

loop {
let (new_certificates, maybe_timeout) = {
let result = chain_client.process_inbox().await;
let result = Box::pin(chain_client.process_inbox()).await;
self.update_wallet_from_client(chain_client).await?;
if result.is_err() {
self.save_wallet().await?;
Expand Down Expand Up @@ -591,7 +591,7 @@ where

for chain_id in key_pairs.keys() {
let child_client = self.make_chain_client(*chain_id)?;
child_client.process_inbox().await?;
Box::pin(child_client.process_inbox()).await?;
self.wallet.as_mut().update_from_state(&child_client).await;
self.save_wallet().await?;
}
Expand Down Expand Up @@ -650,7 +650,7 @@ where
async move {
for i in 0..5 {
linera_base::time::timer::sleep(Duration::from_secs(i)).await;
chain_client.process_inbox().await?;
Box::pin(chain_client.process_inbox()).await?;
let chain_state = chain_client.chain_state_view().await?;
if chain_state
.execution_state
Expand Down
103 changes: 98 additions & 5 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use linera_execution::{
};
use linera_storage::Storage;
use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
use tracing::{instrument, trace, warn};
use tracing::{debug, instrument, trace, warn};

use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
use crate::{
Expand Down Expand Up @@ -141,9 +141,46 @@ impl<StorageClient> ChainWorkerActor<StorageClient>
where
StorageClient: Storage + Clone + Send + Sync + 'static,
{
/// Spawns a new task to run the [`ChainWorkerActor`], returning an endpoint for sending
/// requests to the worker.
#[tracing::instrument(level = "debug", skip_all, fields(?chain_id))]
/// Runs the [`ChainWorkerActor`], first by loading the chain state from `storage` then
/// handling all `incoming_requests` as they arrive.
///
/// If loading the chain state fails the next request will receive the error reported by the
/// `storage`, and the actor will then try again to load the state.
pub async fn run(
config: ChainWorkerConfig,
storage: StorageClient,
certificate_value_cache: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
delivery_notifier: DeliveryNotifier,
chain_id: ChainId,
mut incoming_requests: mpsc::UnboundedReceiver<ChainWorkerRequest<StorageClient::Context>>,
) {
let actor = loop {
let load_result = Self::load(
config.clone(),
storage.clone(),
certificate_value_cache.clone(),
tracked_chains.clone(),
delivery_notifier.clone(),
chain_id,
)
.await
.inspect_err(|error| warn!("Failed to load chain state: {error:?}"));

match load_result {
Ok(actor) => break actor,
Err(error) => match incoming_requests.recv().await {
Some(request) => request.send_error(error),
None => return,
},
}
};

actor.handle_requests(incoming_requests).await;
}

/// Creates a [`ChainWorkerActor`], loading it with the chain state for the requested
/// [`ChainId`].
pub async fn load(
config: ChainWorkerConfig,
storage: StorageClient,
Expand Down Expand Up @@ -218,7 +255,7 @@ where
skip_all,
fields(chain_id = format!("{:.8}", self.worker.chain_id())),
)]
pub async fn run(
pub async fn handle_requests(
mut self,
mut incoming_requests: mpsc::UnboundedReceiver<ChainWorkerRequest<StorageClient::Context>>,
) {
Expand Down Expand Up @@ -455,3 +492,59 @@ where
}
}
}

impl<Context> ChainWorkerRequest<Context>
where
Context: linera_views::context::Context + Clone + Send + Sync + 'static,
{
/// Responds to this request with an `error`.
pub fn send_error(self, error: WorkerError) {
debug!("Immediately sending error to chain worker request {self:?}");

let responded = match self {
#[cfg(with_testing)]
ChainWorkerRequest::ReadCertificate { callback, .. } => {
callback.send(Err(error)).is_ok()
}
#[cfg(with_testing)]
ChainWorkerRequest::FindBundleInInbox { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::GetChainStateView { callback } => callback.send(Err(error)).is_ok(),
ChainWorkerRequest::QueryApplication { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::DescribeApplication { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::StageBlockExecution { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::ProcessTimeout { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::HandleBlockProposal { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::ProcessValidatedBlock { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::ProcessConfirmedBlock { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::ProcessCrossChainUpdate { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::ConfirmUpdatedRecipient { callback, .. } => {
callback.send(Err(error)).is_ok()
}
ChainWorkerRequest::HandleChainInfoQuery { callback, .. } => {
callback.send(Err(error)).is_ok()
}
};

if !responded {
warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
}
}
}
Loading

0 comments on commit 00854a0

Please sign in to comment.