diff --git a/Cargo.lock b/Cargo.lock index a5a684967843..91c91bfe519c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2335,6 +2335,15 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -4467,6 +4476,7 @@ dependencies = [ "chrono", "clap", "counter", + "crossbeam-channel", "crowd-funding", "derive_more", "dirs", @@ -4904,6 +4914,7 @@ dependencies = [ "convert_case", "counter", "criterion", + "crossbeam-channel", "crowd-funding", "current_platform", "ethereum-tracker", diff --git a/Cargo.toml b/Cargo.toml index 532c7fb80fd6..a78ee56b2354 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ colored = "2.1.0" comfy-table = "7.1.0" convert_case = "0.6.0" criterion = { version = "0.5.1", default-features = false } +crossbeam-channel = "0.5.14" custom_debug_derive = "0.6.1" dashmap = "5.5.3" derive_more = "1.0.0" diff --git a/linera-client/Cargo.toml b/linera-client/Cargo.toml index 8576cd030e85..0826d44413c8 100644 --- a/linera-client/Cargo.toml +++ b/linera-client/Cargo.toml @@ -13,7 +13,7 @@ version.workspace = true [features] test = ["linera-views/test", "linera-execution/test"] -benchmark = ["linera-base/test", "dep:linera-sdk", "dep:tokio-util"] +benchmark = ["linera-base/test", "dep:linera-sdk", "dep:tokio-util", "dep:crossbeam-channel"] wasmer = [ "linera-core/wasmer", "linera-execution/wasmer", @@ -55,6 +55,7 @@ bcs.workspace = true cfg-if.workspace = true chrono = { workspace = true, features = ["clock"] } clap.workspace = true +crossbeam-channel = { workspace = true, optional = true } derive_more = { workspace = true, features = ["deref", "deref_mut"] } dirs.workspace = true futures.workspace = true diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index cb69732d14a4..f5bdb2b47ecd 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -33,13 +33,12 @@ use { data_types::Amount, hashed::Hashed, identifiers::{AccountOwner, ApplicationId, Owner}, - listen_for_shutdown_signals, }, linera_chain::{ data_types::{BlockProposal, ExecutedBlock, ProposedBlock, SignatureAggregator, Vote}, types::{CertificateValue, ConfirmedBlock, GenericCertificate}, }, - linera_core::{client::ChainClientError, data_types::ChainInfoQuery, node::ValidatorNode}, + linera_core::{data_types::ChainInfoQuery, node::ValidatorNode}, linera_execution::{ committee::Epoch, system::{OpenChainConfig, Recipient, SystemOperation, OPEN_CHAIN_MESSAGE_INDEX}, @@ -49,7 +48,7 @@ use { std::{collections::HashMap, iter}, tokio::task, tokio_util::sync::CancellationToken, - tracing::{error, trace, warn}, + tracing::{error, trace}, }; #[cfg(feature = "fs")] use { @@ -585,28 +584,30 @@ where { #[allow(clippy::too_many_arguments)] pub async fn run_benchmark( - &mut self, + &self, bps: Option, - blocks_infos_iter: impl Iterator, AccountSecretKey)>, + chain_id: ChainId, + operations: Vec, + key_pair: AccountSecretKey, clients: Vec, - transactions_per_block: usize, epoch: Epoch, - chain_clients: HashMap>, + chain_client: ChainClient, + shutdown_notifier: CancellationToken, + sender: crossbeam_channel::Sender<()>, ) -> Result<(), Error> { - let shutdown_notifier = CancellationToken::new(); - tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone())); - + info!( + "Starting benchmark at target BPS of {:?}, for chain {:?}", + bps, chain_id + ); let mut num_sent_proposals = 0; - let mut start = Instant::now(); - for (chain_id, operations, key_pair) in blocks_infos_iter { + loop { if shutdown_notifier.is_cancelled() { info!("Shutdown signal received, stopping benchmark"); break; } - let chain_client = &chain_clients[chain_id]; let block = ProposedBlock { epoch, - chain_id: *chain_id, + chain_id, incoming_bundles: Vec::new(), operations: operations.clone(), previous_block_hash: chain_client.block_hash(), @@ -617,7 +618,7 @@ where let executed_block = self.stage_block_execution(block.clone(), None).await?; let value = Hashed::new(ConfirmedBlock::new(executed_block)); let proposal = - BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, key_pair); + BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, &key_pair); let mut join_set = task::JoinSet::new(); for client in &clients { @@ -637,14 +638,8 @@ where }) .collect::>(); - let certificates = self.make_benchmark_certificates_from_votes(votes); - assert_eq!( - certificates.len(), - 1, - "Unable to build all the expected certificates from received votes" - ); + let certificate = self.make_benchmark_certificate_from_votes(votes); - let certificate = &certificates[0]; let mut join_set = task::JoinSet::new(); for client in &clients { let client = client.clone(); @@ -679,40 +674,20 @@ where num_sent_proposals += 1; if let Some(bps) = bps { if num_sent_proposals == bps { - let elapsed = start.elapsed(); - if elapsed > Duration::from_secs(1) { - warn!( - "Failed to achieve {} BPS/{} TPS, took {} ms", - bps, - bps * transactions_per_block, - elapsed.as_millis() - ); - } else { - tokio::time::sleep(Duration::from_secs(1) - elapsed).await; - info!( - "Achieved {} BPS/{} TPS in {} ms", - bps, - bps * transactions_per_block, - elapsed.as_millis() - ); - } - start = Instant::now(); + sender.send(())?; num_sent_proposals = 0; } + } else { + break; } } if bps.is_none() { - let elapsed = start.elapsed(); - let bps = num_sent_proposals as f64 / elapsed.as_secs_f64(); - info!( - "Achieved {} BPS/{} TPS", - bps, - bps * transactions_per_block as f64 - ); + sender.send(())?; } - self.close_benchmark_chains(chain_clients).await?; + self.close_benchmark_chain(chain_client).await?; + info!("Exiting task..."); Ok(()) } @@ -835,35 +810,20 @@ where Ok((benchmark_chains, chain_clients)) } - /// Closes the chains that were created for the benchmark. - pub async fn close_benchmark_chains( - &mut self, - chain_clients: HashMap>, + /// Closes the chain that was created for the benchmark. + pub async fn close_benchmark_chain( + &self, + chain_client: ChainClient, ) -> Result<(), Error> { - let num_chains = chain_clients.len(); - - let mut join_set = task::JoinSet::new(); - for chain_client in chain_clients.values() { - let chain_client = chain_client.clone(); - join_set.spawn(async move { - chain_client - .execute_operation(Operation::System(SystemOperation::CloseChain)) - .await? - .expect("Close chain operation should not fail!"); - Ok::<_, ChainClientError>(()) - }); - } - let start = Instant::now(); - join_set - .join_all() - .await - .into_iter() - .collect::, _>>()?; + chain_client + .execute_operation(Operation::System(SystemOperation::CloseChain)) + .await? + .expect("Close chain operation should not fail!"); info!( - "Closed {} chains in {} ms", - num_chains, + "Closed chain {:?} in {} ms", + chain_client.chain_id(), start.elapsed().as_millis() ); @@ -1025,24 +985,18 @@ where blocks_infos } - /// Tries to aggregate votes into certificates. - pub fn make_benchmark_certificates_from_votes( + /// Tries to aggregate votes into a certificate. + pub fn make_benchmark_certificate_from_votes( &self, votes: Vec>, - ) -> Vec> + ) -> GenericCertificate where T: std::fmt::Debug + CertificateValue, { let committee = self.wallet.genesis_config().create_committee(); let mut aggregators = HashMap::new(); - let mut certificates = Vec::new(); - let mut done_senders = HashSet::new(); for vote in votes { - // We aggregate votes indexed by sender. let chain_id = vote.value().inner().chain_id(); - if done_senders.contains(&chain_id) { - continue; - } trace!( "Processing vote on {:?}'s block by {:?}", chain_id, @@ -1058,8 +1012,7 @@ where match aggregator.append(vote.validator, vote.signature) { Ok(Some(certificate)) => { trace!("Found certificate: {:?}", certificate); - certificates.push(certificate); - done_senders.insert(chain_id); + return certificate; } Ok(None) => { trace!("Added one vote"); @@ -1069,7 +1022,8 @@ where } } } - certificates + + panic!("Could not build a certificate from votes"); } pub async fn update_wallet_from_certificate(&mut self, certificate: ConfirmedBlockCertificate) { diff --git a/linera-client/src/error.rs b/linera-client/src/error.rs index a7d5c961a463..69e82348ce6d 100644 --- a/linera-client/src/error.rs +++ b/linera-client/src/error.rs @@ -26,6 +26,9 @@ pub(crate) enum Inner { LocalNode(#[from] linera_core::local_node::LocalNodeError), #[error("remote node operation failed: {0}")] RemoteNode(#[from] linera_core::node::NodeError), + #[cfg(feature = "benchmark")] + #[error("failed to send message: {0}")] + SendError(#[from] crossbeam_channel::SendError<()>), } thiserror_context::impl_context!(Error(Inner)); diff --git a/linera-service/Cargo.toml b/linera-service/Cargo.toml index 1c706b2a7553..f734b28f1a76 100644 --- a/linera-service/Cargo.toml +++ b/linera-service/Cargo.toml @@ -26,6 +26,7 @@ benchmark = [ "linera-base/test", "linera-client/benchmark", "linera-chain/benchmark", + "dep:crossbeam-channel", ] wasmer = ["linera-client/wasmer", "linera-execution/wasmer", "linera-storage/wasmer"] wasmtime = [ @@ -80,6 +81,7 @@ current_platform = "0.2.0" fs-err = { workspace = true, features = ["tokio"] } fs_extra = { workspace = true, optional = true } futures.workspace = true +crossbeam-channel = { workspace = true, optional = true } hex.workspace = true http.workspace = true k8s-openapi = { workspace = true, optional = true } diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 6a0ce36055ea..33cb706fca95 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -55,6 +55,12 @@ use linera_views::store::CommonStoreConfig; use serde_json::Value; use tokio::task::JoinSet; use tracing::{debug, error, info, warn, Instrument as _}; +#[cfg(feature = "benchmark")] +use { + linera_base::listen_for_shutdown_signals, + tokio::{runtime::Handle, time}, + tokio_util::sync::CancellationToken, +}; mod net_up_utils; @@ -748,6 +754,10 @@ impl Runnable for Job { ); if let Some(bps) = bps { assert!(bps > 0, "BPS must be greater than 0"); + assert!( + bps >= num_chains, + "BPS must be greater than or equal to the number of chains" + ); } let start = Instant::now(); @@ -755,12 +765,12 @@ impl Runnable for Job { .wallet .default_chain() .expect("should have default chain"); - let chain_client = context.make_chain_client(default_chain_id)?; + let default_chain_client = context.make_chain_client(default_chain_id)?; // Below all block proposals are supposed to succeed without retries, we // must make sure that all incoming payments have been accepted on-chain // and that no validator is missing user certificates. context - .process_inbox_without_updating_wallet(&chain_client) + .process_inbox_without_updating_wallet(&default_chain_client) .await?; info!( "Processed inbox and forced validator updates in {} ms", @@ -788,9 +798,9 @@ impl Runnable for Job { ); } - let chain_client = context.make_chain_client(default_chain_id)?; - let (epoch, committees) = - chain_client.epoch_and_committees(default_chain_id).await?; + let (epoch, committees) = default_chain_client + .epoch_and_committees(default_chain_id) + .await?; let epoch = epoch.expect("default chain should have an epoch"); let committee = committees .get(&epoch) @@ -806,31 +816,118 @@ impl Runnable for Job { .make_nodes(committee)? .map(|(_, node)| node) .collect::>(); - let blocks_infos_iter = blocks_infos.iter(); - if bps.is_some() { - let blocks_infos_iter = blocks_infos_iter.cycle(); - context - .run_benchmark( - bps, - blocks_infos_iter, - clients, - transactions_per_block, - epoch, - chain_clients, - ) - .await?; - } else { - context - .run_benchmark( - bps, - blocks_infos_iter, - clients, - transactions_per_block, - epoch, - chain_clients, + + let shutdown_notifier = CancellationToken::new(); + tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone())); + + // The bps control task will control the BPS from the threads. `crossbeam_channel` is used + // for two reasons: + // 1. it allows bounded channels with zero sized buffers. + // 2. it blocks the current thread if the message can't be sent, which is exactly + // what we want to happen here. `tokio::sync::mpsc` doesn't do that. `std::sync::mpsc` + // does, but it is slower than `crossbeam_channel`. + // Number 1 is the main reason. `tokio::sync::mpsc` doesn't allow 0 sized buffers. + // With a channel with a buffer of size 1 or larger, even if we have already reached + // the desired BPS, the tasks would continue sending block proposals until the channel's + // buffer is filled, which would cause us to not properly control the BPS rate. + let (sender, receiver) = crossbeam_channel::bounded(0); + let bps_control_task = tokio::spawn(async move { + let mut recv_count = 0; + let mut start = time::Instant::now(); + while let Ok(()) = receiver.recv() { + recv_count += 1; + if recv_count == num_chains { + let elapsed = start.elapsed(); + if bps.is_some() { + if elapsed > time::Duration::from_secs(1) { + warn!( + "Failed to achieve {} BPS/{} TPS in {} ms", + bps.unwrap(), + bps.unwrap() * transactions_per_block, + elapsed.as_millis(), + ); + } else { + time::sleep(time::Duration::from_secs(1) - elapsed).await; + info!( + "Achieved {} BPS/{} TPS in {} ms", + bps.unwrap(), + bps.unwrap() * transactions_per_block, + elapsed.as_millis(), + ); + } + } else { + let achieved_bps = num_chains as f64 / elapsed.as_secs_f64(); + info!( + "Achieved {} BPS/{} TPS in {} ms", + achieved_bps, + achieved_bps * transactions_per_block as f64, + elapsed.as_millis(), + ); + } + + recv_count = 0; + start = time::Instant::now(); + } + } + + info!("Exiting logging task...",); + }); + + let mut bps_remainder = bps.unwrap_or_default() % num_chains; + let bps_share = bps.map(|bps| bps / num_chains); + + let context = Arc::new(context); + let mut join_set = JoinSet::>::new(); + for (chain_id, operations, key_pair) in blocks_infos { + let bps_share = if bps_remainder > 0 { + bps_remainder -= 1; + Some(bps_share.unwrap() + 1) + } else { + bps_share + }; + + let shutdown_notifier = shutdown_notifier.clone(); + let sender = sender.clone(); + let handle = Handle::current(); + let context = context.clone(); + let clients = clients.clone(); + let chain_client = chain_clients[&chain_id].clone(); + let short_chain_id = format!("{:?}", chain_id); + join_set.spawn_blocking(move || { + handle.block_on( + async move { + context + .run_benchmark( + bps_share, + chain_id, + operations, + key_pair, + clients, + epoch, + chain_client, + shutdown_notifier, + sender, + ) + .await?; + + Ok(()) + } + .instrument(tracing::info_span!( + "benchmark_chain_id", + chain_id = short_chain_id + )), ) - .await?; + }); } + + join_set + .join_all() + .await + .into_iter() + .collect::, _>>()?; + drop(sender); + info!("All benchmark tasks completed"); + bps_control_task.await?; } Watch { chain_id, raw } => {