Skip to content

Commit

Permalink
(nit) Rename NamedNode into RemoteNode (#2521)
Browse files Browse the repository at this point in the history
* named_node -> remote_node

* NamedNode -> RemoteNode
  • Loading branch information
ma2bd authored Sep 23, 2024
1 parent cb7a8c4 commit 3959fa9
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 88 deletions.
96 changes: 50 additions & 46 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use crate::{
data_types::{
BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout,
},
local_node::{LocalNodeClient, LocalNodeError, NamedNode},
local_node::{LocalNodeClient, LocalNodeError, RemoteNode},
node::{
CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
ValidatorNodeProvider,
Expand Down Expand Up @@ -293,7 +293,7 @@ where
{
async fn download_certificates(
&self,
nodes: &[NamedNode<P::Node>],
nodes: &[RemoteNode<P::Node>],
chain_id: ChainId,
height: BlockHeight,
) -> Result<Box<ChainInfo>, LocalNodeError> {
Expand All @@ -308,14 +308,14 @@ where

async fn try_process_certificates(
&self,
named_node: &NamedNode<P::Node>,
remote_node: &RemoteNode<P::Node>,
chain_id: ChainId,
certificates: Vec<Certificate>,
) -> Option<Box<ChainInfo>> {
let mut notifications = Vec::<Notification>::new();
let result = self
.local_node
.try_process_certificates(named_node, chain_id, certificates, &mut notifications)
.try_process_certificates(remote_node, chain_id, certificates, &mut notifications)
.await;
self.notifier.handle_notifications(&notifications);
result
Expand Down Expand Up @@ -830,18 +830,18 @@ where
}

#[tracing::instrument(level = "trace")]
fn make_nodes(&self, committee: &Committee) -> Result<Vec<NamedNode<P::Node>>, NodeError> {
fn make_nodes(&self, committee: &Committee) -> Result<Vec<RemoteNode<P::Node>>, NodeError> {
Ok(self
.client
.validator_node_provider
.make_nodes(committee)?
.map(|(name, node)| NamedNode { name, node })
.map(|(name, node)| RemoteNode { name, node })
.collect())
}

#[tracing::instrument(level = "trace")]
/// Obtains the validators trusted by the local chain.
async fn validator_nodes(&self) -> Result<Vec<NamedNode<P::Node>>, ChainClientError> {
async fn validator_nodes(&self) -> Result<Vec<RemoteNode<P::Node>>, ChainClientError> {
match self.local_committee().await {
Ok(committee) => Ok(self.make_nodes(&committee)?),
Err(LocalNodeError::InactiveChain(_)) => Ok(Vec::new()),
Expand Down Expand Up @@ -1011,9 +1011,9 @@ where
&nodes,
committee,
|_: &()| (),
|named_node| {
|remote_node| {
let mut updater = ValidatorUpdater {
named_node,
remote_node,
local_node: local_node.clone(),
};
Box::pin(async move {
Expand Down Expand Up @@ -1045,9 +1045,9 @@ where
&nodes,
committee,
|vote: &LiteVote| (vote.value.value_hash, vote.round),
|named_node| {
|remote_node| {
let mut updater = ValidatorUpdater {
named_node,
remote_node,
local_node: local_node.clone(),
};
let action = action.clone();
Expand Down Expand Up @@ -1159,12 +1159,12 @@ where
async fn synchronize_received_certificates_from_validator(
&self,
chain_id: ChainId,
named_node: &NamedNode<P::Node>,
remote_node: &RemoteNode<P::Node>,
) -> Result<(ValidatorName, u64, Vec<Certificate>), NodeError> {
let tracker = self
.state()
.received_certificate_trackers
.get(&named_node.name)
.get(&remote_node.name)
.copied()
.unwrap_or(0);
let (committees, max_epoch) = self
Expand All @@ -1173,7 +1173,7 @@ where
.map_err(|_| NodeError::InvalidChainInfoResponse)?;
// Retrieve newly received certificates from this validator.
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_nth(tracker);
let info = named_node.handle_chain_info_query(query).await?;
let info = remote_node.handle_chain_info_query(query).await?;
let mut certificates = Vec::new();
let mut new_tracker = tracker;
for entry in info.requested_received_log {
Expand All @@ -1196,12 +1196,12 @@ where
continue;
}

let mut info = named_node.handle_chain_info_query(query).await?;
let mut info = remote_node.handle_chain_info_query(query).await?;
let Some(certificate_hash) = info.requested_sent_certificate_hashes.pop() else {
break;
};

let certificate = named_node
let certificate = remote_node
.node
.download_certificate(certificate_hash)
.await?;
Expand Down Expand Up @@ -1243,7 +1243,7 @@ where
}
}
}
Ok((named_node.name, new_tracker, certificates))
Ok((remote_node.name, new_tracker, certificates))
}

#[tracing::instrument(level = "trace", skip(tracker, certificates))]
Expand Down Expand Up @@ -1307,11 +1307,11 @@ where
&nodes,
&local_committee,
|_| (),
|named_node| {
|remote_node| {
let client = client.clone();
Box::pin(async move {
client
.synchronize_received_certificates_from_validator(chain_id, &named_node)
.synchronize_received_certificates_from_validator(chain_id, &remote_node)
.await
})
},
Expand Down Expand Up @@ -1465,19 +1465,19 @@ where
/// Downloads and processes any certificates we are missing for the given chain.
pub async fn synchronize_chain_state(
&self,
validators: &[NamedNode<P::Node>],
validators: &[RemoteNode<P::Node>],
chain_id: ChainId,
) -> Result<Box<ChainInfo>, ChainClientError> {
#[cfg(with_metrics)]
let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();

let mut futures = vec![];

for named_node in validators {
for remote_node in validators {
let client = self.clone();
futures.push(async move {
client
.try_synchronize_chain_state_from(named_node, chain_id)
.try_synchronize_chain_state_from(remote_node, chain_id)
.await
});
}
Expand All @@ -1495,12 +1495,12 @@ where
.map_err(Into::into)
}

#[tracing::instrument(level = "trace", skip(self, named_node, chain_id))]
#[tracing::instrument(level = "trace", skip(self, remote_node, chain_id))]
/// Downloads any certificates from the specified validator that we are missing for the given
/// chain, and processes them.
async fn try_synchronize_chain_state_from(
&self,
named_node: &NamedNode<P::Node>,
remote_node: &RemoteNode<P::Node>,
chain_id: ChainId,
) -> Result<(), ChainClientError> {
let local_info = self.client.local_node.local_chain_info(chain_id).await?;
Expand All @@ -1511,19 +1511,19 @@ where
let query = ChainInfoQuery::new(chain_id)
.with_sent_certificate_hashes_in_range(range)
.with_manager_values();
let info = named_node.handle_chain_info_query(query).await?;
let info = remote_node.handle_chain_info_query(query).await?;

let certificates = future::try_join_all(
info.requested_sent_certificate_hashes
.into_iter()
.map(move |hash| async move { named_node.node.download_certificate(hash).await }),
.map(move |hash| async move { remote_node.node.download_certificate(hash).await }),
)
.await?;

if !certificates.is_empty()
&& self
.client
.try_process_certificates(named_node, chain_id, certificates)
.try_process_certificates(remote_node, chain_id, certificates)
.await
.is_none()
{
Expand All @@ -1547,14 +1547,14 @@ where
_,
) = &**chain_error
{
self.update_local_node_with_blob_from(*blob_id, named_node)
self.update_local_node_with_blob_from(*blob_id, remote_node)
.await?;
continue; // We found the missing blob: retry.
}
}
warn!(
"Skipping proposal from {} and validator {}: {}",
owner, named_node.name, original_err
owner, remote_node.name, original_err
);
break;
}
Expand All @@ -1567,7 +1567,7 @@ where
if let LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) =
&original_err
{
blobs = named_node
blobs = remote_node
.find_missing_blobs(blob_ids.clone(), chain_id)
.await?;

Expand All @@ -1577,7 +1577,7 @@ where
}
warn!(
"Skipping certificate {} from validator {}: {}",
hash, named_node.name, original_err
hash, remote_node.name, original_err
);
break;
}
Expand All @@ -1590,9 +1590,9 @@ where
async fn update_local_node_with_blob_from(
&self,
blob_id: BlobId,
named_node: &NamedNode<P::Node>,
remote_node: &RemoteNode<P::Node>,
) -> Result<(), ChainClientError> {
let certificate = named_node.download_certificate_for_blob(blob_id).await?;
let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
// This will download all ancestors of the certificate and process all of them locally.
self.receive_certificate(certificate).await?;
Ok(())
Expand All @@ -1603,10 +1603,10 @@ where
async fn receive_certificate_for_blob(&self, blob_id: BlobId) -> Result<(), ChainClientError> {
let validators = self.validator_nodes().await?;
let mut tasks = FuturesUnordered::new();
for named_node in validators {
let named_node = named_node.clone();
for remote_node in validators {
let remote_node = remote_node.clone();
tasks.push(async move {
let cert = named_node
let cert = remote_node
.download_certificate_for_blob(blob_id)
.await
.ok()?;
Expand Down Expand Up @@ -2870,10 +2870,10 @@ where
Some(info.next_block_height)
}

#[tracing::instrument(level = "trace", skip(named_node, local_node, notification))]
#[tracing::instrument(level = "trace", skip(remote_node, local_node, notification))]
async fn process_notification(
&self,
named_node: NamedNode<P::Node>,
remote_node: RemoteNode<P::Node>,
mut local_node: LocalNodeClient<S>,
notification: Notification,
) {
Expand All @@ -2888,7 +2888,7 @@ where
return;
}
if let Err(error) = self
.find_received_certificates_from_validator(named_node)
.find_received_certificates_from_validator(remote_node)
.await
{
error!("Fail to process notification: {error}");
Expand All @@ -2913,7 +2913,7 @@ where
return;
}
if let Err(error) = self
.try_synchronize_chain_state_from(&named_node, chain_id)
.try_synchronize_chain_state_from(&remote_node, chain_id)
.await
{
error!("Fail to process notification: {error}");
Expand All @@ -2935,7 +2935,7 @@ where
}
}
if let Err(error) = self
.try_synchronize_chain_state_from(&named_node, chain_id)
.try_synchronize_chain_state_from(&remote_node, chain_id)
.await
{
error!("Fail to process notification: {error}");
Expand Down Expand Up @@ -3062,11 +3062,15 @@ where
};
let this = self.clone();
let local_node = local_node.clone();
let named_node = NamedNode { name, node };
let remote_node = RemoteNode { name, node };
validator_tasks.push(async move {
while let Some(notification) = stream.next().await {
this.process_notification(named_node.clone(), local_node.clone(), notification)
.await;
this.process_notification(
remote_node.clone(),
local_node.clone(),
notification,
)
.await;
}
});
entry.insert(abort);
Expand All @@ -3081,12 +3085,12 @@ where
/// We also don't try to synchronize the admin chain.
pub async fn find_received_certificates_from_validator(
&self,
named_node: NamedNode<P::Node>,
remote_node: RemoteNode<P::Node>,
) -> Result<(), ChainClientError> {
let chain_id = self.chain_id;
// Proceed to downloading received certificates.
let (name, tracker, certificates) = self
.synchronize_received_certificates_from_validator(chain_id, &named_node)
.synchronize_received_certificates_from_validator(chain_id, &remote_node)
.await?;
// Process received certificates. If the client state has changed during the
// network calls, we should still be fine.
Expand Down
Loading

0 comments on commit 3959fa9

Please sign in to comment.