diff --git a/linera-core/src/client.rs b/linera-core/src/client.rs index 999eabf540a0..4f75fd8a3339 100644 --- a/linera-core/src/client.rs +++ b/linera-core/src/client.rs @@ -66,7 +66,7 @@ use crate::{ data_types::{ BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout, }, - local_node::{LocalNodeClient, LocalNodeError, NamedNode}, + local_node::{LocalNodeClient, LocalNodeError, RemoteNode}, node::{ CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode, ValidatorNodeProvider, @@ -293,7 +293,7 @@ where { async fn download_certificates( &self, - nodes: &[NamedNode], + nodes: &[RemoteNode], chain_id: ChainId, height: BlockHeight, ) -> Result, LocalNodeError> { @@ -308,14 +308,14 @@ where async fn try_process_certificates( &self, - named_node: &NamedNode, + remote_node: &RemoteNode, chain_id: ChainId, certificates: Vec, ) -> Option> { let mut notifications = Vec::::new(); let result = self .local_node - .try_process_certificates(named_node, chain_id, certificates, &mut notifications) + .try_process_certificates(remote_node, chain_id, certificates, &mut notifications) .await; self.notifier.handle_notifications(¬ifications); result @@ -830,18 +830,18 @@ where } #[tracing::instrument(level = "trace")] - fn make_nodes(&self, committee: &Committee) -> Result>, NodeError> { + fn make_nodes(&self, committee: &Committee) -> Result>, NodeError> { Ok(self .client .validator_node_provider .make_nodes(committee)? - .map(|(name, node)| NamedNode { name, node }) + .map(|(name, node)| RemoteNode { name, node }) .collect()) } #[tracing::instrument(level = "trace")] /// Obtains the validators trusted by the local chain. - async fn validator_nodes(&self) -> Result>, ChainClientError> { + async fn validator_nodes(&self) -> Result>, ChainClientError> { match self.local_committee().await { Ok(committee) => Ok(self.make_nodes(&committee)?), Err(LocalNodeError::InactiveChain(_)) => Ok(Vec::new()), @@ -1011,9 +1011,9 @@ where &nodes, committee, |_: &()| (), - |named_node| { + |remote_node| { let mut updater = ValidatorUpdater { - named_node, + remote_node, local_node: local_node.clone(), }; Box::pin(async move { @@ -1045,9 +1045,9 @@ where &nodes, committee, |vote: &LiteVote| (vote.value.value_hash, vote.round), - |named_node| { + |remote_node| { let mut updater = ValidatorUpdater { - named_node, + remote_node, local_node: local_node.clone(), }; let action = action.clone(); @@ -1159,12 +1159,12 @@ where async fn synchronize_received_certificates_from_validator( &self, chain_id: ChainId, - named_node: &NamedNode, + remote_node: &RemoteNode, ) -> Result<(ValidatorName, u64, Vec), NodeError> { let tracker = self .state() .received_certificate_trackers - .get(&named_node.name) + .get(&remote_node.name) .copied() .unwrap_or(0); let (committees, max_epoch) = self @@ -1173,7 +1173,7 @@ where .map_err(|_| NodeError::InvalidChainInfoResponse)?; // Retrieve newly received certificates from this validator. let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_nth(tracker); - let info = named_node.handle_chain_info_query(query).await?; + let info = remote_node.handle_chain_info_query(query).await?; let mut certificates = Vec::new(); let mut new_tracker = tracker; for entry in info.requested_received_log { @@ -1196,12 +1196,12 @@ where continue; } - let mut info = named_node.handle_chain_info_query(query).await?; + let mut info = remote_node.handle_chain_info_query(query).await?; let Some(certificate_hash) = info.requested_sent_certificate_hashes.pop() else { break; }; - let certificate = named_node + let certificate = remote_node .node .download_certificate(certificate_hash) .await?; @@ -1243,7 +1243,7 @@ where } } } - Ok((named_node.name, new_tracker, certificates)) + Ok((remote_node.name, new_tracker, certificates)) } #[tracing::instrument(level = "trace", skip(tracker, certificates))] @@ -1307,11 +1307,11 @@ where &nodes, &local_committee, |_| (), - |named_node| { + |remote_node| { let client = client.clone(); Box::pin(async move { client - .synchronize_received_certificates_from_validator(chain_id, &named_node) + .synchronize_received_certificates_from_validator(chain_id, &remote_node) .await }) }, @@ -1465,7 +1465,7 @@ where /// Downloads and processes any certificates we are missing for the given chain. pub async fn synchronize_chain_state( &self, - validators: &[NamedNode], + validators: &[RemoteNode], chain_id: ChainId, ) -> Result, ChainClientError> { #[cfg(with_metrics)] @@ -1473,11 +1473,11 @@ where let mut futures = vec![]; - for named_node in validators { + for remote_node in validators { let client = self.clone(); futures.push(async move { client - .try_synchronize_chain_state_from(named_node, chain_id) + .try_synchronize_chain_state_from(remote_node, chain_id) .await }); } @@ -1495,12 +1495,12 @@ where .map_err(Into::into) } - #[tracing::instrument(level = "trace", skip(self, named_node, chain_id))] + #[tracing::instrument(level = "trace", skip(self, remote_node, chain_id))] /// Downloads any certificates from the specified validator that we are missing for the given /// chain, and processes them. async fn try_synchronize_chain_state_from( &self, - named_node: &NamedNode, + remote_node: &RemoteNode, chain_id: ChainId, ) -> Result<(), ChainClientError> { let local_info = self.client.local_node.local_chain_info(chain_id).await?; @@ -1511,19 +1511,19 @@ where let query = ChainInfoQuery::new(chain_id) .with_sent_certificate_hashes_in_range(range) .with_manager_values(); - let info = named_node.handle_chain_info_query(query).await?; + let info = remote_node.handle_chain_info_query(query).await?; let certificates = future::try_join_all( info.requested_sent_certificate_hashes .into_iter() - .map(move |hash| async move { named_node.node.download_certificate(hash).await }), + .map(move |hash| async move { remote_node.node.download_certificate(hash).await }), ) .await?; if !certificates.is_empty() && self .client - .try_process_certificates(named_node, chain_id, certificates) + .try_process_certificates(remote_node, chain_id, certificates) .await .is_none() { @@ -1547,14 +1547,14 @@ where _, ) = &**chain_error { - self.update_local_node_with_blob_from(*blob_id, named_node) + self.update_local_node_with_blob_from(*blob_id, remote_node) .await?; continue; // We found the missing blob: retry. } } warn!( "Skipping proposal from {} and validator {}: {}", - owner, named_node.name, original_err + owner, remote_node.name, original_err ); break; } @@ -1567,7 +1567,7 @@ where if let LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) = &original_err { - blobs = named_node + blobs = remote_node .find_missing_blobs(blob_ids.clone(), chain_id) .await?; @@ -1577,7 +1577,7 @@ where } warn!( "Skipping certificate {} from validator {}: {}", - hash, named_node.name, original_err + hash, remote_node.name, original_err ); break; } @@ -1590,9 +1590,9 @@ where async fn update_local_node_with_blob_from( &self, blob_id: BlobId, - named_node: &NamedNode, + remote_node: &RemoteNode, ) -> Result<(), ChainClientError> { - let certificate = named_node.download_certificate_for_blob(blob_id).await?; + let certificate = remote_node.download_certificate_for_blob(blob_id).await?; // This will download all ancestors of the certificate and process all of them locally. self.receive_certificate(certificate).await?; Ok(()) @@ -1603,10 +1603,10 @@ where async fn receive_certificate_for_blob(&self, blob_id: BlobId) -> Result<(), ChainClientError> { let validators = self.validator_nodes().await?; let mut tasks = FuturesUnordered::new(); - for named_node in validators { - let named_node = named_node.clone(); + for remote_node in validators { + let remote_node = remote_node.clone(); tasks.push(async move { - let cert = named_node + let cert = remote_node .download_certificate_for_blob(blob_id) .await .ok()?; @@ -2870,10 +2870,10 @@ where Some(info.next_block_height) } - #[tracing::instrument(level = "trace", skip(named_node, local_node, notification))] + #[tracing::instrument(level = "trace", skip(remote_node, local_node, notification))] async fn process_notification( &self, - named_node: NamedNode, + remote_node: RemoteNode, mut local_node: LocalNodeClient, notification: Notification, ) { @@ -2888,7 +2888,7 @@ where return; } if let Err(error) = self - .find_received_certificates_from_validator(named_node) + .find_received_certificates_from_validator(remote_node) .await { error!("Fail to process notification: {error}"); @@ -2913,7 +2913,7 @@ where return; } if let Err(error) = self - .try_synchronize_chain_state_from(&named_node, chain_id) + .try_synchronize_chain_state_from(&remote_node, chain_id) .await { error!("Fail to process notification: {error}"); @@ -2935,7 +2935,7 @@ where } } if let Err(error) = self - .try_synchronize_chain_state_from(&named_node, chain_id) + .try_synchronize_chain_state_from(&remote_node, chain_id) .await { error!("Fail to process notification: {error}"); @@ -3062,11 +3062,15 @@ where }; let this = self.clone(); let local_node = local_node.clone(); - let named_node = NamedNode { name, node }; + let remote_node = RemoteNode { name, node }; validator_tasks.push(async move { while let Some(notification) = stream.next().await { - this.process_notification(named_node.clone(), local_node.clone(), notification) - .await; + this.process_notification( + remote_node.clone(), + local_node.clone(), + notification, + ) + .await; } }); entry.insert(abort); @@ -3081,12 +3085,12 @@ where /// We also don't try to synchronize the admin chain. pub async fn find_received_certificates_from_validator( &self, - named_node: NamedNode, + remote_node: RemoteNode, ) -> Result<(), ChainClientError> { let chain_id = self.chain_id; // Proceed to downloading received certificates. let (name, tracker, certificates) = self - .synchronize_received_certificates_from_validator(chain_id, &named_node) + .synchronize_received_certificates_from_validator(chain_id, &remote_node) .await?; // Process received certificates. If the client state has changed during the // network calls, we should still be fine. diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 7ca3857749ab..4f007b4cb8c4 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -256,7 +256,7 @@ where #[tracing::instrument(level = "trace", skip_all)] pub async fn try_process_certificates( &self, - named_node: &NamedNode, + remote_node: &RemoteNode, chain_id: ChainId, certificates: Vec, notifications: &mut impl Extend, @@ -275,7 +275,7 @@ where result = match &result { Err(LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids))) => { - let blobs = named_node.try_download_blobs(blob_ids).await; + let blobs = remote_node.try_download_blobs(blob_ids).await; if blobs.len() != blob_ids.len() { result } else { @@ -364,7 +364,7 @@ where /// Downloads and processes all certificates up to (excluding) the specified height. pub async fn download_certificates( &self, - validators: &[NamedNode], + validators: &[RemoteNode], chain_id: ChainId, target_next_block_height: BlockHeight, notifications: &mut impl Extend, @@ -372,13 +372,13 @@ where // Sequentially try each validator in random order. let mut validators: Vec<_> = validators.iter().collect(); validators.shuffle(&mut rand::thread_rng()); - for named_node in validators { + for remote_node in validators { let info = self.local_chain_info(chain_id).await?; if target_next_block_height <= info.next_block_height { return Ok(info); } self.try_download_certificates_from( - named_node, + remote_node, chain_id, info.next_block_height, target_next_block_height, @@ -424,7 +424,7 @@ where /// given validator. async fn try_download_certificates_from( &self, - named_node: &NamedNode, + remote_node: &RemoteNode, chain_id: ChainId, mut start: BlockHeight, stop: BlockHeight, @@ -437,13 +437,13 @@ where .ok_or(ArithmeticError::Overflow)? .min(1000); let Some(certificates) = self - .try_query_certificates_from(named_node, chain_id, start, limit) + .try_query_certificates_from(remote_node, chain_id, start, limit) .await? else { break; }; let Some(info) = self - .try_process_certificates(named_node, chain_id, certificates, notifications) + .try_process_certificates(remote_node, chain_id, certificates, notifications) .await else { break; @@ -457,22 +457,22 @@ where #[tracing::instrument(level = "trace", skip_all)] async fn try_query_certificates_from( &self, - named_node: &NamedNode, + remote_node: &RemoteNode, chain_id: ChainId, start: BlockHeight, limit: u64, ) -> Result>, LocalNodeError> { - tracing::debug!(name = ?named_node.name, ?chain_id, ?start, ?limit, "Querying certificates"); + tracing::debug!(name = ?remote_node.name, ?chain_id, ?start, ?limit, "Querying certificates"); let range = BlockHeightRange { start, limit: Some(limit), }; let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range); - if let Ok(info) = named_node.handle_chain_info_query(query).await { + if let Ok(info) = remote_node.handle_chain_info_query(query).await { let certificates = future::try_join_all( info.requested_sent_certificate_hashes .into_iter() - .map(|hash| named_node.node.download_certificate(hash)), + .map(|hash| remote_node.node.download_certificate(hash)), ) .await?; Ok(Some(certificates)) @@ -483,14 +483,14 @@ where #[tracing::instrument(level = "trace", skip(validators))] async fn download_blob( - validators: &[NamedNode], + validators: &[RemoteNode], blob_id: BlobId, ) -> Option { // Sequentially try each validator in random order. let mut validators: Vec<_> = validators.iter().collect(); validators.shuffle(&mut rand::thread_rng()); - for named_node in validators { - if let Some(blob) = named_node.try_download_blob(blob_id).await { + for remote_node in validators { + if let Some(blob) = remote_node.try_download_blob(blob_id).await { return Some(blob); } } @@ -500,7 +500,7 @@ where #[tracing::instrument(level = "trace", skip(nodes))] pub async fn download_blobs( blob_ids: &[BlobId], - nodes: &[NamedNode], + nodes: &[RemoteNode], ) -> Vec { future::join_all( blob_ids @@ -535,21 +535,21 @@ where /// A validator node together with the validator's name. #[derive(Clone)] -pub struct NamedNode { +pub struct RemoteNode { pub name: ValidatorName, pub node: N, } -impl fmt::Debug for NamedNode { +impl fmt::Debug for RemoteNode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NamedNode") + f.debug_struct("RemoteNode") .field("name", &self.name) .finish_non_exhaustive() } } #[allow(clippy::result_large_err)] -impl NamedNode { +impl RemoteNode { pub async fn handle_chain_info_query( &self, query: ChainInfoQuery, diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index 9d0af5a620e9..ae3da99f3426 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -25,7 +25,7 @@ use tracing::{error, warn}; use crate::{ data_types::{ChainInfo, ChainInfoQuery}, - local_node::{LocalNodeClient, NamedNode}, + local_node::{LocalNodeClient, RemoteNode}, node::{CrossChainMessageDelivery, NodeError, ValidatorNode}, }; @@ -68,7 +68,7 @@ pub struct ValidatorUpdater where S: Storage, { - pub named_node: NamedNode, + pub remote_node: RemoteNode, pub local_node: LocalNodeClient, } @@ -95,14 +95,14 @@ pub enum CommunicationError { /// are given this much additional time to contribute to the result, as a fraction of how long it /// took to reach the quorum. pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>( - validator_clients: &'a [NamedNode], + validator_clients: &'a [RemoteNode], committee: &Committee, group_by: G, execute: F, ) -> Result<(K, Vec), CommunicationError> where A: ValidatorNode + Clone + 'static, - F: Clone + Fn(NamedNode) -> R, + F: Clone + Fn(RemoteNode) -> R, R: Future> + 'a, G: Fn(&V) -> K, K: Hash + PartialEq + Eq + Clone + 'static, @@ -110,15 +110,15 @@ where { let mut responses: futures::stream::FuturesUnordered<_> = validator_clients .iter() - .filter_map(|named_node| { - if committee.weight(&named_node.name) == 0 { + .filter_map(|remote_node| { + if committee.weight(&remote_node.name) == 0 { // This should not happen but better prevent it because certificates // are not allowed to include votes with weight 0. return None; } let execute = execute.clone(); - let named_node = named_node.clone(); - Some(async move { (named_node.name, execute(named_node).await) }) + let remote_node = remote_node.clone(); + Some(async move { (remote_node.name, execute(remote_node).await) }) }) .collect(); @@ -200,16 +200,16 @@ where certificate: &Certificate, delivery: CrossChainMessageDelivery, ) -> Result, NodeError> { - if certificate.is_signed_by(&self.named_node.name) { + if certificate.is_signed_by(&self.remote_node.name) { let result = self - .named_node + .remote_node .handle_lite_certificate(certificate.lite_certificate(), delivery) .await; match result { Err(NodeError::MissingCertificateValue) => { warn!( "Validator {} forgot a certificate value that they signed before", - self.named_node.name + self.remote_node.name ); } _ => { @@ -217,7 +217,7 @@ where } } } - self.named_node + self.remote_node .handle_certificate(certificate.clone(), vec![], delivery) .await } @@ -238,7 +238,7 @@ where .find_missing_blobs(&certificate, blob_ids, certificate.value().chain_id()) .await?; ensure!(blobs.len() == blob_ids.len(), original_err.clone()); - self.named_node + self.remote_node .handle_certificate(certificate, blobs, delivery) .await } @@ -258,7 +258,7 @@ where } loop { match self - .named_node + .remote_node .handle_block_proposal(proposal.clone()) .await { @@ -279,7 +279,7 @@ where // certificates that last used the blobs to the validator node should be enough. let missing_blob_ids = stream::iter(mem::take(&mut blob_ids)) .filter(|blob_id| { - let node = self.named_node.node.clone(); + let node = self.remote_node.node.clone(); let blob_id = *blob_id; async move { node.blob_last_used_by(blob_id).await.is_err() } }) @@ -314,11 +314,11 @@ where ) -> Result<(), NodeError> { // Figure out which certificates this validator is missing. let query = ChainInfoQuery::new(chain_id); - let initial_block_height = match self.named_node.handle_chain_info_query(query).await { + let initial_block_height = match self.remote_node.handle_chain_info_query(query).await { Ok(info) => info.next_block_height, Err(error) => { error!( - name = ?self.named_node.name, ?chain_id, %error, + name = ?self.remote_node.name, ?chain_id, %error, "Failed to query validator about missing blocks" ); return Err(error); @@ -410,12 +410,12 @@ where } CommunicateAction::RequestTimeout { .. } => { let query = ChainInfoQuery::new(chain_id).with_timeout(); - let info = self.named_node.handle_chain_info_query(query).await?; + let info = self.remote_node.handle_chain_info_query(query).await?; info.manager.timeout_vote } }; match vote { - Some(vote) if vote.validator == self.named_node.name => { + Some(vote) if vote.validator == self.remote_node.name => { vote.check()?; Ok(vote) } diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index b077a616f679..877f50937377 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -33,7 +33,7 @@ use linera_client::{ }; use linera_core::{ data_types::{ChainInfoQuery, ClientOutcome}, - local_node::{LocalNodeClient, NamedNode}, + local_node::{LocalNodeClient, RemoteNode}, node::ValidatorNodeProvider, worker::{Reason, WorkerState}, JoinSetExt as _, @@ -1107,7 +1107,7 @@ impl Job { context .make_node_provider() .make_nodes_from_list(validators)? - .map(|(name, node)| NamedNode { name, node }) + .map(|(name, node)| RemoteNode { name, node }) .collect() } else { let info = node_client.handle_chain_info_query(query).await?; @@ -1117,7 +1117,7 @@ impl Job { context .make_node_provider() .make_nodes(committee)? - .map(|(name, node)| NamedNode { name, node }) + .map(|(name, node)| RemoteNode { name, node }) .collect() };