From fd0602be12eca4f2dfcea59f4e2144f28a64e26c Mon Sep 17 00:00:00 2001 From: Andre da Silva Date: Wed, 19 Feb 2025 15:20:56 -0300 Subject: [PATCH] Have one thread per chain on benchmark --- Cargo.lock | 10 + Cargo.toml | 1 + linera-client/Cargo.toml | 8 +- linera-client/src/benchmark.rs | 322 ++++++++++++++++++++++++ linera-client/src/client_context.rs | 375 +++++----------------------- linera-client/src/error.rs | 6 + linera-client/src/lib.rs | 3 + linera-core/src/client/mod.rs | 12 +- linera-service/src/linera/main.rs | 97 ++----- 9 files changed, 442 insertions(+), 392 deletions(-) create mode 100644 linera-client/src/benchmark.rs diff --git a/Cargo.lock b/Cargo.lock index f7535318beb9..5f04d68968a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2335,6 +2335,15 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -4468,6 +4477,7 @@ dependencies = [ "chrono", "clap", "counter", + "crossbeam-channel", "crowd-funding", "derive_more", "dirs", diff --git a/Cargo.toml b/Cargo.toml index 532c7fb80fd6..a78ee56b2354 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ colored = "2.1.0" comfy-table = "7.1.0" convert_case = "0.6.0" criterion = { version = "0.5.1", default-features = false } +crossbeam-channel = "0.5.14" custom_debug_derive = "0.6.1" dashmap = "5.5.3" derive_more = "1.0.0" diff --git a/linera-client/Cargo.toml b/linera-client/Cargo.toml index 8576cd030e85..2aaecbfd01a4 100644 --- a/linera-client/Cargo.toml +++ b/linera-client/Cargo.toml @@ -13,7 +13,12 @@ version.workspace = true [features] test = ["linera-views/test", "linera-execution/test"] -benchmark = ["linera-base/test", "dep:linera-sdk", "dep:tokio-util"] +benchmark = [ + "linera-base/test", + "dep:linera-sdk", + "dep:tokio-util", + "dep:crossbeam-channel", +] wasmer = [ "linera-core/wasmer", "linera-execution/wasmer", @@ -55,6 +60,7 @@ bcs.workspace = true cfg-if.workspace = true chrono = { workspace = true, features = ["clock"] } clap.workspace = true +crossbeam-channel = { workspace = true, optional = true } derive_more = { workspace = true, features = ["deref", "deref_mut"] } dirs.workspace = true futures.workspace = true diff --git a/linera-client/src/benchmark.rs b/linera-client/src/benchmark.rs new file mode 100644 index 000000000000..eb5c3c3cd3d6 --- /dev/null +++ b/linera-client/src/benchmark.rs @@ -0,0 +1,322 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, iter}; + +use linera_base::{ + crypto::{AccountPublicKey, AccountSecretKey}, + data_types::{Amount, Timestamp}, + hashed::Hashed, + identifiers::{AccountOwner, ApplicationId, ChainId, Owner}, + listen_for_shutdown_signals, + time::Instant, +}; +use linera_chain::{ + data_types::{BlockProposal, ProposedBlock}, + types::ConfirmedBlock, +}; +use linera_core::{client::ChainClient, local_node::LocalNodeClient}; +use linera_execution::{ + committee::{Committee, Epoch}, + system::{Recipient, SystemOperation}, + Operation, +}; +use linera_rpc::node_provider::NodeProvider; +use linera_sdk::abis::fungible; +use linera_storage::Storage; +use tokio::{runtime::Handle, task, time}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn, Instrument as _}; + +use crate::Error; + +pub struct Benchmark +where + Storage: linera_storage::Storage, +{ + _phantom: std::marker::PhantomData, +} + +impl Benchmark +where + S: Storage + Clone + Send + Sync + 'static, +{ + #[allow(clippy::too_many_arguments)] + pub async fn run_benchmark( + num_chains: usize, + transactions_per_block: usize, + bps: Option, + chain_clients: HashMap>, + epoch: Epoch, + blocks_infos: Vec<(ChainId, Vec, AccountSecretKey)>, + committee: Committee, + local_node: LocalNodeClient, + ) -> Result<(), Error> { + let shutdown_notifier = CancellationToken::new(); + tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone())); + + // The bps control task will control the BPS from the threads. `crossbeam_channel` is used + // for two reasons: + // 1. it allows bounded channels with zero sized buffers. + // 2. it blocks the current thread if the message can't be sent, which is exactly + // what we want to happen here. `tokio::sync::mpsc` doesn't do that. `std::sync::mpsc` + // does, but it is slower than `crossbeam_channel`. + // Number 1 is the main reason. `tokio::sync::mpsc` doesn't allow 0 sized buffers. + // With a channel with a buffer of size 1 or larger, even if we have already reached + // the desired BPS, the tasks would continue sending block proposals until the channel's + // buffer is filled, which would cause us to not properly control the BPS rate. + let (sender, receiver) = crossbeam_channel::bounded(0); + let bps_control_task = tokio::spawn(async move { + let mut recv_count = 0; + let mut start = time::Instant::now(); + while let Ok(()) = receiver.recv() { + recv_count += 1; + if recv_count == num_chains { + let elapsed = start.elapsed(); + if let Some(bps) = bps { + if elapsed > time::Duration::from_secs(1) { + warn!( + "Failed to achieve {} BPS/{} TPS in {} ms", + bps, + bps * transactions_per_block, + elapsed.as_millis(), + ); + } else { + time::sleep(time::Duration::from_secs(1) - elapsed).await; + info!( + "Achieved {} BPS/{} TPS in {} ms", + bps, + bps * transactions_per_block, + elapsed.as_millis(), + ); + } + } else { + let achieved_bps = num_chains as f64 / elapsed.as_secs_f64(); + info!( + "Achieved {} BPS/{} TPS in {} ms", + achieved_bps, + achieved_bps * transactions_per_block as f64, + elapsed.as_millis(), + ); + } + + recv_count = 0; + start = time::Instant::now(); + } + } + + info!("Exiting logging task..."); + }); + + let mut bps_remainder = bps.unwrap_or_default() % num_chains; + let bps_share = bps.map(|bps| bps / num_chains); + + let mut join_set = task::JoinSet::>::new(); + for (chain_id, operations, key_pair) in blocks_infos { + let bps_share = if bps_remainder > 0 { + bps_remainder -= 1; + Some(bps_share.unwrap() + 1) + } else { + bps_share + }; + + let shutdown_notifier = shutdown_notifier.clone(); + let sender = sender.clone(); + let handle = Handle::current(); + let committee = committee.clone(); + let local_node = local_node.clone(); + let chain_client = chain_clients[&chain_id].clone(); + let short_chain_id = format!("{:?}", chain_id); + join_set.spawn_blocking(move || { + handle.block_on( + async move { + Self::run_benchmark_internal( + bps_share, + operations, + key_pair, + epoch, + chain_client, + shutdown_notifier, + sender, + committee, + local_node, + ) + .await?; + + Ok(()) + } + .instrument(tracing::info_span!( + "benchmark_chain_id", + chain_id = short_chain_id + )), + ) + }); + } + + join_set + .join_all() + .await + .into_iter() + .collect::, _>>()?; + drop(sender); + info!("All benchmark tasks completed"); + bps_control_task.await?; + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn run_benchmark_internal( + bps: Option, + operations: Vec, + key_pair: AccountSecretKey, + epoch: Epoch, + chain_client: ChainClient, + shutdown_notifier: CancellationToken, + sender: crossbeam_channel::Sender<()>, + committee: Committee, + local_node: LocalNodeClient, + ) -> Result<(), Error> { + let chain_id = chain_client.chain_id(); + info!( + "Starting benchmark at target BPS of {:?}, for chain {:?}", + bps, chain_id + ); + let cross_chain_message_delivery = chain_client.options().cross_chain_message_delivery; + let mut num_sent_proposals = 0; + loop { + if shutdown_notifier.is_cancelled() { + info!("Shutdown signal received, stopping benchmark"); + break; + } + let block = ProposedBlock { + epoch, + chain_id, + incoming_bundles: Vec::new(), + operations: operations.clone(), + previous_block_hash: chain_client.block_hash(), + height: chain_client.next_block_height(), + authenticated_signer: Some(Owner::from(key_pair.public())), + timestamp: chain_client.timestamp().max(Timestamp::now()), + }; + let executed_block = local_node + .stage_block_execution(block.clone(), None) + .await? + .0; + + let value = Hashed::new(ConfirmedBlock::new(executed_block)); + let proposal = + BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, &key_pair); + + chain_client + .submit_block_proposal(&committee, Box::new(proposal), value) + .await?; + let next_block_height = chain_client.next_block_height(); + // We assume the committee will not change during the benchmark. + chain_client + .communicate_chain_updates( + &committee, + chain_id, + next_block_height, + cross_chain_message_delivery, + ) + .await?; + + num_sent_proposals += 1; + if let Some(bps) = bps { + if num_sent_proposals == bps { + sender.send(())?; + num_sent_proposals = 0; + } + } else { + sender.send(())?; + break; + } + } + + Self::close_benchmark_chain(chain_client).await?; + info!("Exiting task..."); + Ok(()) + } + + /// Closes the chain that was created for the benchmark. + async fn close_benchmark_chain( + chain_client: ChainClient, + ) -> Result<(), Error> { + let start = Instant::now(); + chain_client + .execute_operation(Operation::System(SystemOperation::CloseChain)) + .await? + .expect("Close chain operation should not fail!"); + + debug!( + "Closed chain {:?} in {} ms", + chain_client.chain_id(), + start.elapsed().as_millis() + ); + + Ok(()) + } + + /// Generates information related to one block per chain, up to `num_chains` blocks. + pub fn make_benchmark_block_info( + key_pairs: HashMap, + transactions_per_block: usize, + fungible_application_id: Option, + ) -> Vec<(ChainId, Vec, AccountSecretKey)> { + let mut blocks_infos = Vec::new(); + let mut previous_chain_id = *key_pairs + .iter() + .last() + .expect("There should be a last element") + .0; + let amount = Amount::from(1); + for (chain_id, key_pair) in key_pairs { + let public_key = key_pair.public(); + let operation = match fungible_application_id { + Some(application_id) => Self::fungible_transfer( + application_id, + previous_chain_id, + public_key, + public_key, + amount, + ), + None => Operation::System(SystemOperation::Transfer { + owner: None, + recipient: Recipient::chain(previous_chain_id), + amount, + }), + }; + let operations = iter::repeat(operation) + .take(transactions_per_block) + .collect(); + blocks_infos.push((chain_id, operations, key_pair)); + previous_chain_id = chain_id; + } + blocks_infos + } + + /// Creates a fungible token transfer operation. + pub fn fungible_transfer( + application_id: ApplicationId, + chain_id: ChainId, + sender: AccountPublicKey, + receiver: AccountPublicKey, + amount: Amount, + ) -> Operation { + let target_account = fungible::Account { + chain_id, + owner: AccountOwner::User(Owner::from(receiver)), + }; + let bytes = bcs::to_bytes(&fungible::Operation::Transfer { + owner: AccountOwner::User(Owner::from(sender)), + amount, + target_account, + }) + .expect("should serialize fungible token operation"); + Operation::User { + application_id, + bytes, + } + } +} diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index 78dda4b5b0b1..60e1472d2109 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -28,28 +28,15 @@ use thiserror_context::Context; use tracing::{debug, info}; #[cfg(feature = "benchmark")] use { - linera_base::{ - crypto::AccountPublicKey, - data_types::Amount, - hashed::Hashed, - identifiers::{AccountOwner, ApplicationId, Owner}, - listen_for_shutdown_signals, - }, - linera_chain::{ - data_types::{BlockProposal, ExecutedBlock, ProposedBlock, SignatureAggregator, Vote}, - types::{CertificateValue, ConfirmedBlock, GenericCertificate}, - }, - linera_core::{client::ChainClientError, data_types::ChainInfoQuery, node::ValidatorNode}, + crate::benchmark::Benchmark, + linera_base::{data_types::Amount, identifiers::ApplicationId}, linera_execution::{ - committee::Epoch, - system::{OpenChainConfig, Recipient, SystemOperation, OPEN_CHAIN_MESSAGE_INDEX}, + committee::{Committee, Epoch}, + system::{OpenChainConfig, SystemOperation, OPEN_CHAIN_MESSAGE_INDEX}, Operation, }, - linera_sdk::abis::fungible, std::{collections::HashMap, iter}, tokio::task, - tokio_util::sync::CancellationToken, - tracing::{error, trace, warn}, }; #[cfg(feature = "fs")] use { @@ -583,135 +570,73 @@ where S: Storage + Clone + Send + Sync + 'static, W: Persist, { - #[allow(clippy::too_many_arguments)] - pub async fn run_benchmark( + pub async fn prepare_for_benchmark( &mut self, - bps: Option, - blocks_infos_iter: impl Iterator, AccountSecretKey)>, - clients: Vec, + num_chains: usize, transactions_per_block: usize, - epoch: Epoch, - chain_clients: HashMap>, - ) -> Result<(), Error> { - let shutdown_notifier = CancellationToken::new(); - tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone())); - - let mut num_sent_proposals = 0; - let mut start = Instant::now(); - for (chain_id, operations, key_pair) in blocks_infos_iter { - if shutdown_notifier.is_cancelled() { - info!("Shutdown signal received, stopping benchmark"); - break; - } - let chain_client = &chain_clients[chain_id]; - let block = ProposedBlock { - epoch, - chain_id: *chain_id, - incoming_bundles: Vec::new(), - operations: operations.clone(), - previous_block_hash: chain_client.block_hash(), - height: chain_client.next_block_height(), - authenticated_signer: Some(Owner::from(key_pair.public())), - timestamp: chain_client.timestamp().max(Timestamp::now()), - }; - let executed_block = self.stage_block_execution(block.clone(), None).await?; - let value = Hashed::new(ConfirmedBlock::new(executed_block)); - let proposal = - BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, key_pair); - - let mut join_set = task::JoinSet::new(); - for client in &clients { - let client = client.clone(); - let proposal = proposal.clone(); - join_set.spawn(async move { client.handle_block_proposal(proposal).await }); - } - let votes = join_set - .join_all() - .await - .into_iter() - .collect::, _>>()? - .into_iter() - .filter_map(|response| { - let vote = response.info.manager.pending?; - vote.clone().with_value(value.clone()) - }) - .collect::>(); - - let certificates = self.make_benchmark_certificates_from_votes(votes); - assert_eq!( - certificates.len(), - 1, - "Unable to build all the expected certificates from received votes" - ); - - let certificate = &certificates[0]; - let mut join_set = task::JoinSet::new(); - for client in &clients { - let client = client.clone(); - let certificate = certificate.clone(); - join_set.spawn(async move { - client - .handle_confirmed_certificate( - certificate, - CrossChainMessageDelivery::NonBlocking, - ) - .await - }); - } + tokens_per_chain: Amount, + fungible_application_id: Option, + ) -> Result< + ( + HashMap>, + Epoch, + Vec<(ChainId, Vec, AccountSecretKey)>, + Committee, + ), + Error, + > { + let start = Instant::now(); + // Below all block proposals are supposed to succeed without retries, we + // must make sure that all incoming payments have been accepted on-chain + // and that no validator is missing user certificates. + self.process_inboxes_and_force_validator_updates().await; + info!( + "Processed inboxes and forced validator updates in {} ms", + start.elapsed().as_millis() + ); - join_set - .join_all() - .await - .into_iter() - .collect::, _>>()?; + let start = Instant::now(); + let (key_pairs, chain_clients) = self + .make_benchmark_chains(num_chains, tokens_per_chain) + .await?; + info!( + "Got {} chains in {} ms", + key_pairs.len(), + start.elapsed().as_millis() + ); - // Replay the certificate locally. - // No required certificates from other chains: This is only used with benchmark. - chain_client - .process_certificate(certificate.clone()) + if let Some(id) = fungible_application_id { + let start = Instant::now(); + self.supply_fungible_tokens(&key_pairs, id, &chain_clients) .await?; - - num_sent_proposals += 1; - if let Some(bps) = bps { - if num_sent_proposals == bps { - let elapsed = start.elapsed(); - if elapsed > Duration::from_secs(1) { - warn!( - "Failed to achieve {} BPS/{} TPS, took {} ms", - bps, - bps * transactions_per_block, - elapsed.as_millis() - ); - } else { - tokio::time::sleep(Duration::from_secs(1) - elapsed).await; - info!( - "Achieved {} BPS/{} TPS in {} ms", - bps, - bps * transactions_per_block, - elapsed.as_millis() - ); - } - start = Instant::now(); - num_sent_proposals = 0; - } - } - } - - if bps.is_none() { - let elapsed = start.elapsed(); - let bps = num_sent_proposals as f64 / elapsed.as_secs_f64(); info!( - "Achieved {} BPS/{} TPS", - bps, - bps * transactions_per_block as f64 + "Supplied fungible tokens in {} ms", + start.elapsed().as_millis() ); } - self.close_benchmark_chains(chain_clients).await?; - Ok(()) + let default_chain_id = self + .wallet + .default_chain() + .expect("should have default chain"); + let default_chain_client = self.make_chain_client(default_chain_id)?; + let (epoch, committees) = default_chain_client + .epoch_and_committees(default_chain_id) + .await?; + let epoch = epoch.expect("default chain should have an epoch"); + let committee = committees + .get(&epoch) + .expect("current epoch should have a committee"); + let blocks_infos = Benchmark::::make_benchmark_block_info( + key_pairs, + transactions_per_block, + fungible_application_id, + ); + + Ok((chain_clients, epoch, blocks_infos, committee.clone())) } - pub async fn process_inboxes_and_force_validator_updates(&mut self) { + async fn process_inboxes_and_force_validator_updates(&mut self) { let chain_clients = self .wallet .owned_chain_ids() @@ -754,7 +679,7 @@ where /// Creates chains, and returns a map of exactly `num_chains` chain IDs /// with key pairs, as well as a map of the chain clients. - pub async fn make_benchmark_chains( + async fn make_benchmark_chains( &mut self, num_chains: usize, balance: Amount, @@ -838,41 +763,6 @@ where Ok((benchmark_chains, chain_clients)) } - /// Closes the chains that were created for the benchmark. - pub async fn close_benchmark_chains( - &mut self, - chain_clients: HashMap>, - ) -> Result<(), Error> { - let num_chains = chain_clients.len(); - - let mut join_set = task::JoinSet::new(); - for chain_client in chain_clients.values() { - let chain_client = chain_client.clone(); - join_set.spawn(async move { - chain_client - .execute_operation(Operation::System(SystemOperation::CloseChain)) - .await? - .expect("Close chain operation should not fail!"); - Ok::<_, ChainClientError>(()) - }); - } - - let start = Instant::now(); - join_set - .join_all() - .await - .into_iter() - .collect::, _>>()?; - - info!( - "Closed {} chains in {} ms", - num_chains, - start.elapsed().as_millis() - ); - - Ok(()) - } - async fn execute_open_chains_operations( num_new_chains: usize, chain_client: &ChainClient, @@ -902,7 +792,7 @@ where } /// Supplies fungible tokens to the chains. - pub async fn supply_fungible_tokens( + async fn supply_fungible_tokens( &mut self, key_pairs: &HashMap, application_id: ApplicationId, @@ -924,7 +814,7 @@ where let operations: Vec<_> = key_pairs .iter() .map(|(chain_id, key_pair)| { - Self::fungible_transfer( + Benchmark::::fungible_transfer( application_id, *chain_id, default_key, @@ -988,143 +878,4 @@ where Ok(()) } - - /// Generates information related to one block per chain, up to `num_chains` blocks. - pub fn make_benchmark_block_info( - &mut self, - key_pairs: HashMap, - transactions_per_block: usize, - fungible_application_id: Option, - ) -> Vec<(ChainId, Vec, AccountSecretKey)> { - let mut blocks_infos = Vec::new(); - let mut previous_chain_id = *key_pairs - .iter() - .last() - .expect("There should be a last element") - .0; - let amount = Amount::from(1); - for (chain_id, key_pair) in key_pairs { - let public_key = key_pair.public(); - let operation = match fungible_application_id { - Some(application_id) => Self::fungible_transfer( - application_id, - previous_chain_id, - public_key, - public_key, - amount, - ), - None => Operation::System(SystemOperation::Transfer { - owner: None, - recipient: Recipient::chain(previous_chain_id), - amount, - }), - }; - let operations = iter::repeat(operation) - .take(transactions_per_block) - .collect(); - blocks_infos.push((chain_id, operations, key_pair)); - previous_chain_id = chain_id; - } - blocks_infos - } - - /// Tries to aggregate votes into certificates. - pub fn make_benchmark_certificates_from_votes( - &self, - votes: Vec>, - ) -> Vec> - where - T: std::fmt::Debug + CertificateValue, - { - let committee = self.wallet.genesis_config().create_committee(); - let mut aggregators = HashMap::new(); - let mut certificates = Vec::new(); - let mut done_senders = HashSet::new(); - for vote in votes { - // We aggregate votes indexed by sender. - let chain_id = vote.value().inner().chain_id(); - if done_senders.contains(&chain_id) { - continue; - } - trace!( - "Processing vote on {:?}'s block by {:?}", - chain_id, - vote.public_key, - ); - let aggregator = aggregators.entry(chain_id).or_insert_with(|| { - SignatureAggregator::new( - vote.value, - linera_base::data_types::Round::Fast, - &committee, - ) - }); - match aggregator.append(vote.public_key, vote.signature) { - Ok(Some(certificate)) => { - trace!("Found certificate: {:?}", certificate); - certificates.push(certificate); - done_senders.insert(chain_id); - } - Ok(None) => { - trace!("Added one vote"); - } - Err(error) => { - error!("Failed to aggregate vote: {}", error); - } - } - } - certificates - } - - pub async fn update_wallet_from_certificate(&mut self, certificate: ConfirmedBlockCertificate) { - let node = self.client.local_node().clone(); - // Replay the certificate locally. - // No required certificates from other chains: This is only used with benchmark. - node.handle_certificate(certificate, &()).await.unwrap(); - // Last update the wallet. - for chain in self.wallet.as_mut().chains_mut() { - let query = ChainInfoQuery::new(chain.chain_id); - let info = node.handle_chain_info_query(query).await.unwrap().info; - // We don't have private keys but that's ok. - chain.block_hash = info.block_hash; - chain.next_block_height = info.next_block_height; - } - } - - /// Creates a fungible token transfer operation. - fn fungible_transfer( - application_id: ApplicationId, - chain_id: ChainId, - sender: AccountPublicKey, - receiver: AccountPublicKey, - amount: Amount, - ) -> Operation { - let target_account = fungible::Account { - chain_id, - owner: AccountOwner::User(Owner::from(receiver)), - }; - let bytes = bcs::to_bytes(&fungible::Operation::Transfer { - owner: AccountOwner::User(Owner::from(sender)), - amount, - target_account, - }) - .expect("should serialize fungible token operation"); - Operation::User { - application_id, - bytes, - } - } - - /// Stages the execution of a block proposal. - pub async fn stage_block_execution( - &self, - block: ProposedBlock, - round: Option, - ) -> Result { - Ok(self - .client - .local_node() - .stage_block_execution(block, round) - .await? - .0) - } } diff --git a/linera-client/src/error.rs b/linera-client/src/error.rs index a7d5c961a463..cd073f81d801 100644 --- a/linera-client/src/error.rs +++ b/linera-client/src/error.rs @@ -26,6 +26,12 @@ pub(crate) enum Inner { LocalNode(#[from] linera_core::local_node::LocalNodeError), #[error("remote node operation failed: {0}")] RemoteNode(#[from] linera_core::node::NodeError), + #[cfg(feature = "benchmark")] + #[error("failed to send message: {0}")] + SendError(#[from] crossbeam_channel::SendError<()>), + #[cfg(feature = "benchmark")] + #[error("failed to join task: {0}")] + JoinError(#[from] tokio::task::JoinError), } thiserror_context::impl_context!(Error(Inner)); diff --git a/linera-client/src/lib.rs b/linera-client/src/lib.rs index 68f6be402718..47133cd586cb 100644 --- a/linera-client/src/lib.rs +++ b/linera-client/src/lib.rs @@ -16,6 +16,9 @@ pub mod storage; pub mod util; pub mod wallet; +#[cfg(feature = "benchmark")] +pub mod benchmark; + #[cfg(test)] mod unit_tests; diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index 00fbe23e3c29..8180914635b6 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -684,12 +684,18 @@ impl ChainClient { ) } - /// Gets the per-`ChainClient` options. + /// Gets a mutable reference to the per-`ChainClient` options. #[instrument(level = "trace", skip(self))] pub fn options_mut(&mut self) -> &mut ChainClientOptions { &mut self.options } + /// Gets a reference to the per-`ChainClient` options. + #[instrument(level = "trace", skip(self))] + pub fn options(&self) -> &ChainClientOptions { + &self.options + } + /// Gets the ID of the associated chain. #[instrument(level = "trace", skip(self))] pub fn chain_id(&self) -> ChainId { @@ -1070,7 +1076,7 @@ where /// Submits a block proposal to the validators. #[instrument(level = "trace", skip(committee, proposal, value))] - async fn submit_block_proposal( + pub async fn submit_block_proposal( &self, committee: &Committee, proposal: Box, @@ -1123,7 +1129,7 @@ where /// Broadcasts certified blocks to validators. #[instrument(level = "trace", skip(committee, delivery))] - async fn communicate_chain_updates( + pub async fn communicate_chain_updates( &self, committee: &Committee, chain_id: ChainId, diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 1184d04f6500..15a0ed529d80 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -749,87 +749,32 @@ impl Runnable for Job { ); if let Some(bps) = bps { assert!(bps > 0, "BPS must be greater than 0"); - } - - let start = Instant::now(); - // Below all block proposals are supposed to succeed without retries, we - // must make sure that all incoming payments have been accepted on-chain - // and that no validator is missing user certificates. - context.process_inboxes_and_force_validator_updates().await; - info!( - "Processed inboxes and forced validator updates in {} ms", - start.elapsed().as_millis() - ); - - let start = Instant::now(); - let (key_pairs, chain_clients) = context - .make_benchmark_chains(num_chains, tokens_per_chain) - .await?; - info!( - "Got {} chains in {} ms", - key_pairs.len(), - start.elapsed().as_millis() - ); - - if let Some(id) = fungible_application_id { - let start = Instant::now(); - context - .supply_fungible_tokens(&key_pairs, id, &chain_clients) - .await?; - info!( - "Supplied fungible tokens in {} ms", - start.elapsed().as_millis() + assert!( + bps >= num_chains, + "BPS must be greater than or equal to the number of chains" ); } - let default_chain_id = context - .wallet - .default_chain() - .expect("should have default chain"); - let default_chain_client = context.make_chain_client(default_chain_id)?; - let (epoch, committees) = default_chain_client - .epoch_and_committees(default_chain_id) + let (chain_clients, epoch, blocks_infos, committee) = context + .prepare_for_benchmark( + num_chains, + transactions_per_block, + tokens_per_chain, + fungible_application_id, + ) .await?; - let epoch = epoch.expect("default chain should have an epoch"); - let committee = committees - .get(&epoch) - .expect("current epoch should have a committee"); - let blocks_infos = context.make_benchmark_block_info( - key_pairs, - transactions_per_block, - fungible_application_id, - ); - let clients = context - .make_node_provider() - .make_nodes(committee)? - .map(|(_, node)| node) - .collect::>(); - let blocks_infos_iter = blocks_infos.iter(); - if bps.is_some() { - let blocks_infos_iter = blocks_infos_iter.cycle(); - context - .run_benchmark( - bps, - blocks_infos_iter, - clients, - transactions_per_block, - epoch, - chain_clients, - ) - .await?; - } else { - context - .run_benchmark( - bps, - blocks_infos_iter, - clients, - transactions_per_block, - epoch, - chain_clients, - ) - .await?; - } + linera_client::benchmark::Benchmark::::run_benchmark( + num_chains, + transactions_per_block, + bps, + chain_clients, + epoch, + blocks_infos, + committee, + context.client.local_node().clone(), + ) + .await?; } Watch { chain_id, raw } => {