Skip to content

Commit

Permalink
Have one thread per chain on benchmark (#3374)
Browse files Browse the repository at this point in the history
## Motivation

Right now on `linera benchmark` we do everything from a single thread. That doesn't scale much.

## Proposal

Spawn one task per chain, and split the desired BPS across the number of chains. For example, if the desired BPS is 10, and you have 10 chains, each task will need to reach 1 BPS.

Every task just tries to send as many block proposals as fast as possible. We don't control the BPS in the tasks, but we have a BPS control task for that. Once a task has sent the BPS amount of blocks successfully, it sends a message to the BPS control task. It uses a `crossbeam` channel for that, as it allows bounded channels with buffer size of 0.
If we have a buffer size of 0, that means that if the BPS control task is sleeping or processing another message, the sender task will be blocked. This is exactly what we want to properly control the BPS behavior. These are faster than `std` channels and comparable speed of `tokio` channels, but `tokio` channels don't allow a buffer size of 0, and also don't block the sender task as sending is async.
The BPS control task will have a timer going. Once it receives `num_chains` messages, that means that the total number of blocks to be sent was reached. If more than a second elapsed, we failed to achieve the desired BPS. Otherwise, we achieved the desired BPS.

This all runs sharing the same wallet and `ClientContext`. The only resource the different threads could race for is the wallet, but since we don't save the benchmark chains to the wallet anymore, that's no longer a concern. So using the same wallet for multiple threads is feasible, because the wallet is only used to create the chains at the start using its default chain, then the wallet isn't used by the threads anymore.

Shutdown signals also continue to work (all threads are gracefully shutdown), and we continue to close all the chains on exit.

## Test Plan

Ran with a few different BPS/TPB variations, seems to work. Shutdown also seems to work correctly.

## Release Plan

- Nothing to do / These changes follow the usual release cycle.
  • Loading branch information
ndr-ds authored Feb 21, 2025
1 parent 4f3e9e8 commit cfefa52
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 392 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ colored = "2.1.0"
comfy-table = "7.1.0"
convert_case = "0.6.0"
criterion = { version = "0.5.1", default-features = false }
crossbeam-channel = "0.5.14"
custom_debug_derive = "0.6.1"
dashmap = "5.5.3"
derive_more = "1.0.0"
Expand Down
8 changes: 7 additions & 1 deletion linera-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ version.workspace = true

[features]
test = ["linera-views/test", "linera-execution/test"]
benchmark = ["linera-base/test", "dep:linera-sdk", "dep:tokio-util"]
benchmark = [
"linera-base/test",
"dep:linera-sdk",
"dep:tokio-util",
"dep:crossbeam-channel",
]
wasmer = [
"linera-core/wasmer",
"linera-execution/wasmer",
Expand Down Expand Up @@ -55,6 +60,7 @@ bcs.workspace = true
cfg-if.workspace = true
chrono = { workspace = true, features = ["clock"] }
clap.workspace = true
crossbeam-channel = { workspace = true, optional = true }
derive_more = { workspace = true, features = ["deref", "deref_mut"] }
dirs.workspace = true
futures.workspace = true
Expand Down
322 changes: 322 additions & 0 deletions linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, iter};

use linera_base::{
crypto::{AccountPublicKey, AccountSecretKey},
data_types::{Amount, Timestamp},
hashed::Hashed,
identifiers::{AccountOwner, ApplicationId, ChainId, Owner},
listen_for_shutdown_signals,
time::Instant,
};
use linera_chain::{
data_types::{BlockProposal, ProposedBlock},
types::ConfirmedBlock,
};
use linera_core::{client::ChainClient, local_node::LocalNodeClient};
use linera_execution::{
committee::{Committee, Epoch},
system::{Recipient, SystemOperation},
Operation,
};
use linera_rpc::node_provider::NodeProvider;
use linera_sdk::abis::fungible;
use linera_storage::Storage;
use tokio::{runtime::Handle, task, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn, Instrument as _};

use crate::Error;

pub struct Benchmark<Storage>
where
Storage: linera_storage::Storage,
{
_phantom: std::marker::PhantomData<Storage>,
}

impl<S> Benchmark<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub async fn run_benchmark(
num_chains: usize,
transactions_per_block: usize,
bps: Option<usize>,
chain_clients: HashMap<ChainId, ChainClient<NodeProvider, S>>,
epoch: Epoch,
blocks_infos: Vec<(ChainId, Vec<Operation>, AccountSecretKey)>,
committee: Committee,
local_node: LocalNodeClient<S>,
) -> Result<(), Error> {
let shutdown_notifier = CancellationToken::new();
tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));

// The bps control task will control the BPS from the threads. `crossbeam_channel` is used
// for two reasons:
// 1. it allows bounded channels with zero sized buffers.
// 2. it blocks the current thread if the message can't be sent, which is exactly
// what we want to happen here. `tokio::sync::mpsc` doesn't do that. `std::sync::mpsc`
// does, but it is slower than `crossbeam_channel`.
// Number 1 is the main reason. `tokio::sync::mpsc` doesn't allow 0 sized buffers.
// With a channel with a buffer of size 1 or larger, even if we have already reached
// the desired BPS, the tasks would continue sending block proposals until the channel's
// buffer is filled, which would cause us to not properly control the BPS rate.
let (sender, receiver) = crossbeam_channel::bounded(0);
let bps_control_task = tokio::spawn(async move {
let mut recv_count = 0;
let mut start = time::Instant::now();
while let Ok(()) = receiver.recv() {
recv_count += 1;
if recv_count == num_chains {
let elapsed = start.elapsed();
if let Some(bps) = bps {
if elapsed > time::Duration::from_secs(1) {
warn!(
"Failed to achieve {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis(),
);
} else {
time::sleep(time::Duration::from_secs(1) - elapsed).await;
info!(
"Achieved {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis(),
);
}
} else {
let achieved_bps = num_chains as f64 / elapsed.as_secs_f64();
info!(
"Achieved {} BPS/{} TPS in {} ms",
achieved_bps,
achieved_bps * transactions_per_block as f64,
elapsed.as_millis(),
);
}

recv_count = 0;
start = time::Instant::now();
}
}

info!("Exiting logging task...");
});

let mut bps_remainder = bps.unwrap_or_default() % num_chains;
let bps_share = bps.map(|bps| bps / num_chains);

let mut join_set = task::JoinSet::<Result<(), Error>>::new();
for (chain_id, operations, key_pair) in blocks_infos {
let bps_share = if bps_remainder > 0 {
bps_remainder -= 1;
Some(bps_share.unwrap() + 1)
} else {
bps_share
};

let shutdown_notifier = shutdown_notifier.clone();
let sender = sender.clone();
let handle = Handle::current();
let committee = committee.clone();
let local_node = local_node.clone();
let chain_client = chain_clients[&chain_id].clone();
let short_chain_id = format!("{:?}", chain_id);
join_set.spawn_blocking(move || {
handle.block_on(
async move {
Self::run_benchmark_internal(
bps_share,
operations,
key_pair,
epoch,
chain_client,
shutdown_notifier,
sender,
committee,
local_node,
)
.await?;

Ok(())
}
.instrument(tracing::info_span!(
"benchmark_chain_id",
chain_id = short_chain_id
)),
)
});
}

join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
drop(sender);
info!("All benchmark tasks completed");
bps_control_task.await?;

Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn run_benchmark_internal(
bps: Option<usize>,
operations: Vec<Operation>,
key_pair: AccountSecretKey,
epoch: Epoch,
chain_client: ChainClient<NodeProvider, S>,
shutdown_notifier: CancellationToken,
sender: crossbeam_channel::Sender<()>,
committee: Committee,
local_node: LocalNodeClient<S>,
) -> Result<(), Error> {
let chain_id = chain_client.chain_id();
info!(
"Starting benchmark at target BPS of {:?}, for chain {:?}",
bps, chain_id
);
let cross_chain_message_delivery = chain_client.options().cross_chain_message_delivery;
let mut num_sent_proposals = 0;
loop {
if shutdown_notifier.is_cancelled() {
info!("Shutdown signal received, stopping benchmark");
break;
}
let block = ProposedBlock {
epoch,
chain_id,
incoming_bundles: Vec::new(),
operations: operations.clone(),
previous_block_hash: chain_client.block_hash(),
height: chain_client.next_block_height(),
authenticated_signer: Some(Owner::from(key_pair.public())),
timestamp: chain_client.timestamp().max(Timestamp::now()),
};
let executed_block = local_node
.stage_block_execution(block.clone(), None)
.await?
.0;

let value = Hashed::new(ConfirmedBlock::new(executed_block));
let proposal =
BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, &key_pair);

chain_client
.submit_block_proposal(&committee, Box::new(proposal), value)
.await?;
let next_block_height = chain_client.next_block_height();
// We assume the committee will not change during the benchmark.
chain_client
.communicate_chain_updates(
&committee,
chain_id,
next_block_height,
cross_chain_message_delivery,
)
.await?;

num_sent_proposals += 1;
if let Some(bps) = bps {
if num_sent_proposals == bps {
sender.send(())?;
num_sent_proposals = 0;
}
} else {
sender.send(())?;
break;
}
}

Self::close_benchmark_chain(chain_client).await?;
info!("Exiting task...");
Ok(())
}

/// Closes the chain that was created for the benchmark.
async fn close_benchmark_chain(
chain_client: ChainClient<NodeProvider, S>,
) -> Result<(), Error> {
let start = Instant::now();
chain_client
.execute_operation(Operation::System(SystemOperation::CloseChain))
.await?
.expect("Close chain operation should not fail!");

debug!(
"Closed chain {:?} in {} ms",
chain_client.chain_id(),
start.elapsed().as_millis()
);

Ok(())
}

/// Generates information related to one block per chain, up to `num_chains` blocks.
pub fn make_benchmark_block_info(
key_pairs: HashMap<ChainId, AccountSecretKey>,
transactions_per_block: usize,
fungible_application_id: Option<ApplicationId>,
) -> Vec<(ChainId, Vec<Operation>, AccountSecretKey)> {
let mut blocks_infos = Vec::new();
let mut previous_chain_id = *key_pairs
.iter()
.last()
.expect("There should be a last element")
.0;
let amount = Amount::from(1);
for (chain_id, key_pair) in key_pairs {
let public_key = key_pair.public();
let operation = match fungible_application_id {
Some(application_id) => Self::fungible_transfer(
application_id,
previous_chain_id,
public_key,
public_key,
amount,
),
None => Operation::System(SystemOperation::Transfer {
owner: None,
recipient: Recipient::chain(previous_chain_id),
amount,
}),
};
let operations = iter::repeat(operation)
.take(transactions_per_block)
.collect();
blocks_infos.push((chain_id, operations, key_pair));
previous_chain_id = chain_id;
}
blocks_infos
}

/// Creates a fungible token transfer operation.
pub fn fungible_transfer(
application_id: ApplicationId,
chain_id: ChainId,
sender: AccountPublicKey,
receiver: AccountPublicKey,
amount: Amount,
) -> Operation {
let target_account = fungible::Account {
chain_id,
owner: AccountOwner::User(Owner::from(receiver)),
};
let bytes = bcs::to_bytes(&fungible::Operation::Transfer {
owner: AccountOwner::User(Owner::from(sender)),
amount,
target_account,
})
.expect("should serialize fungible token operation");
Operation::User {
application_id,
bytes,
}
}
}
Loading

0 comments on commit cfefa52

Please sign in to comment.