diff --git a/CLI.md b/CLI.md index eea5fc002c99..9fef8a6b36cf 100644 --- a/CLI.md +++ b/CLI.md @@ -256,11 +256,11 @@ Close an existing chain. A closed chain cannot execute operations or accept messages anymore. It can still reject incoming messages, so they bounce back to the sender. -**Usage:** `linera close-chain --from ` +**Usage:** `linera close-chain ` -###### **Options:** +###### **Arguments:** -* `--from ` — Chain ID (must be one of our chains) +* `` — Chain ID (must be one of our chains) @@ -704,11 +704,15 @@ Show the contents of the wallet Show the contents of the wallet -**Usage:** `linera wallet show [CHAIN_ID]` +**Usage:** `linera wallet show [OPTIONS] [CHAIN_ID]` ###### **Arguments:** -* `` +* `` — The chain to show the metadata + +###### **Options:** + +* `--short` — Only print a non-formatted list of the wallet's chain IDs diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index 3900deeb0689..36fc339e0f1e 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -369,7 +369,6 @@ pub enum ClientCommand { /// It can still reject incoming messages, so they bounce back to the sender. CloseChain { /// Chain ID (must be one of our chains) - #[arg(long = "from")] chain_id: ChainId, }, @@ -959,7 +958,13 @@ impl fmt::Display for ResourceControlPolicyConfig { #[derive(Clone, clap::Subcommand)] pub enum WalletCommand { /// Show the contents of the wallet. - Show { chain_id: Option }, + Show { + /// The chain to show the metadata. + chain_id: Option, + /// Only print a non-formatted list of the wallet's chain IDs. + #[arg(long)] + short: bool, + }, /// Change the wallet default chain. SetDefault { chain_id: ChainId }, diff --git a/linera-service/src/benchmark.rs b/linera-service/src/benchmark.rs index ad4ae23d2f14..0e10a6f09f73 100644 --- a/linera-service/src/benchmark.rs +++ b/linera-service/src/benchmark.rs @@ -15,7 +15,7 @@ use linera_base::{ use linera_sdk::abis::fungible::{self, FungibleTokenAbi, InitialState, Parameters}; use linera_service::cli_wrappers::{ local_net::{PathProvider, ProcessInbox}, - ApplicationWrapper, ClientWrapper, Faucet, FaucetOption, Network, + ApplicationWrapper, ClientWrapper, Faucet, FaucetOption, Network, OnClientDrop, }; use port_selector::random_free_tcp_port; use rand::{Rng as _, SeedableRng}; @@ -79,14 +79,26 @@ async fn benchmark_with_fungible( ) -> Result<()> { info!("Creating the clients and initializing the wallets"); let path_provider = PathProvider::create_temporary_directory().unwrap(); - let publisher = ClientWrapper::new(path_provider, Network::Grpc, None, num_wallets); + let publisher = ClientWrapper::new( + path_provider, + Network::Grpc, + None, + num_wallets, + OnClientDrop::CloseChains, + ); publisher .wallet_init(&[], FaucetOption::NewChain(&faucet)) .await?; let clients = (0..num_wallets) .map(|n| { let path_provider = PathProvider::create_temporary_directory().unwrap(); - Ok(ClientWrapper::new(path_provider, Network::Grpc, None, n)) + Ok(ClientWrapper::new( + path_provider, + Network::Grpc, + None, + n, + OnClientDrop::CloseChains, + )) }) .collect::, anyhow::Error>>()?; try_join_all( diff --git a/linera-service/src/cli_wrappers/local_kubernetes_net.rs b/linera-service/src/cli_wrappers/local_kubernetes_net.rs index dfcd377f5460..f5c922a331b6 100644 --- a/linera-service/src/cli_wrappers/local_kubernetes_net.rs +++ b/linera-service/src/cli_wrappers/local_kubernetes_net.rs @@ -28,7 +28,7 @@ use crate::cli_wrappers::{ kubectl::KubectlInstance, local_net::PathProvider, util::get_github_root, - ClientWrapper, LineraNet, LineraNetConfig, Network, + ClientWrapper, LineraNet, LineraNetConfig, Network, OnClientDrop, }; #[cfg(with_testing)] @@ -257,6 +257,7 @@ impl LineraNet for LocalKubernetesNet { self.network, self.testing_prng_seed, self.next_client_id, + OnClientDrop::LeakChains, ); if let Some(seed) = self.testing_prng_seed { self.testing_prng_seed = Some(seed + 1); diff --git a/linera-service/src/cli_wrappers/local_net.rs b/linera-service/src/cli_wrappers/local_net.rs index dcb42b1022a6..6c0f913910bd 100644 --- a/linera-service/src/cli_wrappers/local_net.rs +++ b/linera-service/src/cli_wrappers/local_net.rs @@ -34,7 +34,9 @@ use tonic_health::pb::{ use tracing::{info, warn}; use crate::{ - cli_wrappers::{ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig}, + cli_wrappers::{ + ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop, + }, util::ChildExt, }; @@ -317,6 +319,7 @@ impl LineraNet for LocalNet { self.network.external, self.testing_prng_seed, self.next_client_id, + OnClientDrop::LeakChains, ); if let Some(seed) = self.testing_prng_seed { self.testing_prng_seed = Some(seed + 1); diff --git a/linera-service/src/cli_wrappers/mod.rs b/linera-service/src/cli_wrappers/mod.rs index 513d7029ba55..0462aac4c96c 100644 --- a/linera-service/src/cli_wrappers/mod.rs +++ b/linera-service/src/cli_wrappers/mod.rs @@ -34,7 +34,9 @@ mod wallet; use anyhow::Result; use async_trait::async_trait; use linera_execution::ResourceControlPolicy; -pub use wallet::{ApplicationWrapper, ClientWrapper, Faucet, FaucetOption, NodeService}; +pub use wallet::{ + ApplicationWrapper, ClientWrapper, Faucet, FaucetOption, NodeService, OnClientDrop, +}; /// The information needed to start a Linera net of a particular kind. #[async_trait] diff --git a/linera-service/src/cli_wrappers/remote_net.rs b/linera-service/src/cli_wrappers/remote_net.rs index 61ae4c564eff..13b538e1d991 100644 --- a/linera-service/src/cli_wrappers/remote_net.rs +++ b/linera-service/src/cli_wrappers/remote_net.rs @@ -12,7 +12,7 @@ use tempfile::{tempdir, TempDir}; use super::{ local_net::PathProvider, ClientWrapper, Faucet, FaucetOption, LineraNet, LineraNetConfig, - Network, + Network, OnClientDrop, }; pub struct RemoteNetTestingConfig { @@ -97,6 +97,7 @@ impl LineraNet for RemoteNet { self.network, self.testing_prng_seed, self.next_client_id, + OnClientDrop::CloseChains, ); if let Some(seed) = self.testing_prng_seed { self.testing_prng_seed = Some(seed + 1); diff --git a/linera-service/src/cli_wrappers/wallet.rs b/linera-service/src/cli_wrappers/wallet.rs index 75e2b6a40907..b7758c9b9656 100644 --- a/linera-service/src/cli_wrappers/wallet.rs +++ b/linera-service/src/cli_wrappers/wallet.rs @@ -2,12 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ + borrow::Cow, collections::HashMap, env, marker::PhantomData, mem, path::{Path, PathBuf}, str::FromStr, + sync, time::Duration, }; @@ -30,7 +32,7 @@ use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::{json, Value}; use tempfile::TempDir; use tokio::process::{Child, Command}; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use crate::{ cli_wrappers::{ @@ -54,12 +56,23 @@ fn reqwest_client() -> reqwest::Client { /// Wrapper to run a Linera client command. pub struct ClientWrapper { + binary_path: sync::Mutex>, testing_prng_seed: Option, storage: String, wallet: String, max_pending_message_bundles: usize, network: Network, pub path_provider: PathProvider, + on_drop: OnClientDrop, +} + +/// Action to perform when the [`ClientWrapper`] is dropped. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum OnClientDrop { + /// Close all the chains on the wallet. + CloseChains, + /// Do not close any chains, leaving them active. + LeakChains, } impl ClientWrapper { @@ -68,6 +81,7 @@ impl ClientWrapper { network: Network, testing_prng_seed: Option, id: usize, + on_drop: OnClientDrop, ) -> Self { let storage = format!( "rocksdb:{}/client_{}.db", @@ -76,12 +90,14 @@ impl ClientWrapper { ); let wallet = format!("wallet_{}.json", id); Self { + binary_path: sync::Mutex::new(None), testing_prng_seed, storage, wallet, max_pending_message_bundles: 10_000, network, path_provider, + on_drop, } } @@ -141,26 +157,76 @@ impl ClientWrapper { } async fn command(&self) -> Result { - let path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?; - let mut command = Command::new(path); - command - .current_dir(self.path_provider.path()) - .env( - "RUST_LOG", - std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")), - ) - .args(["--wallet", &self.wallet]) - .args(["--storage", &self.storage]) - .args([ - "--max-pending-message-bundles", - &self.max_pending_message_bundles.to_string(), - ]) - .args(["--send-timeout-ms", "500000"]) - .args(["--recv-timeout-ms", "500000"]) - .arg("--wait-for-outgoing-messages"); + let mut command = self.command_binary().await?; + command.current_dir(self.path_provider.path()).env( + "RUST_LOG", + std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")), + ); + for argument in self.command_arguments() { + command.arg(&*argument); + } Ok(command) } + /// Returns an iterator over the arguments that should be added to all command invocations. + fn command_arguments(&self) -> impl Iterator> + '_ { + [ + "--wallet".into(), + self.wallet.as_str().into(), + "--storage".into(), + self.storage.as_str().into(), + "--max-pending-message-bundles".into(), + self.max_pending_message_bundles.to_string().into(), + "--send-timeout-ms".into(), + "500000".into(), + "--recv-timeout-ms".into(), + "500000".into(), + "--wait-for-outgoing-messages".into(), + ] + .into_iter() + } + + /// Returns the [`Command`] instance configured to run the appropriate binary. + /// + /// The path is resolved once and cached inside `self` for subsequent usages. + async fn command_binary(&self) -> Result { + match self.command_with_cached_binary_path() { + Some(command) => Ok(command), + None => { + let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?; + let command = Command::new(&resolved_path); + + self.set_cached_binary_path(resolved_path); + + Ok(command) + } + } + } + + /// Returns a [`Command`] instance configured with the cached `binary_path`, if available. + fn command_with_cached_binary_path(&self) -> Option { + let binary_path = self.binary_path.lock().unwrap(); + + binary_path.as_ref().map(Command::new) + } + + /// Sets the cached `binary_path` with the `new_binary_path`. + /// + /// # Panics + /// + /// If the cache is already set to a different value. In theory the two threads calling + /// `command_binary` can race and resolve the binary path twice, but they should always be the + /// same path. + fn set_cached_binary_path(&self, new_binary_path: PathBuf) { + let mut binary_path = self.binary_path.lock().unwrap(); + + if binary_path.is_none() { + *binary_path = Some(new_binary_path); + } else { + assert_eq!(*binary_path, Some(new_binary_path)); + } + } + /// Runs `linera create-genesis-config`. pub async fn create_genesis_config( &self, @@ -878,6 +944,79 @@ impl ClientWrapper { } } +impl Drop for ClientWrapper { + fn drop(&mut self) { + use std::process::Command as SyncCommand; + + if self.on_drop != OnClientDrop::CloseChains { + return; + } + + let Ok(binary_path) = self.binary_path.lock() else { + error!("Failed to close chains because a thread panicked with a lock to `binary_path`"); + return; + }; + + let Some(binary_path) = binary_path.as_ref() else { + warn!( + "Assuming no chains need to be closed, because the command binary was never \ + resolved and therefore presumably never called" + ); + return; + }; + + let working_directory = self.path_provider.path(); + let mut wallet_show_command = SyncCommand::new(binary_path); + + for argument in self.command_arguments() { + wallet_show_command.arg(&*argument); + } + + let Ok(wallet_show_output) = wallet_show_command + .current_dir(working_directory) + .args(["wallet", "show", "--short"]) + .output() + else { + warn!("Failed to execute `wallet show --short` to list chains to close"); + return; + }; + + if !wallet_show_output.status.success() { + warn!("Failed to list chains in the wallet to close them"); + return; + } + + let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else { + warn!( + "Failed to close chains because `linera wallet show --short` \ + returned a non-UTF-8 output" + ); + return; + }; + + let chain_ids = chain_list_string + .split('\n') + .map(|line| line.trim()) + .filter(|line| !line.is_empty()); + + for chain_id in chain_ids { + let mut close_chain_command = SyncCommand::new(binary_path); + + for argument in self.command_arguments() { + close_chain_command.arg(&*argument); + } + + close_chain_command.current_dir(working_directory); + + match close_chain_command.args(["close-chain", chain_id]).status() { + Ok(status) if status.success() => (), + Ok(failure) => warn!("Failed to close chain {chain_id}: {failure}"), + Err(error) => warn!("Failed to close chain {chain_id}: {error}"), + } + } + } +} + /// Whether `wallet_init` should use a faucet. #[derive(Clone, Copy, Debug)] pub enum FaucetOption<'a> { diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 5b03eb479974..4c6076d02c89 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -1534,8 +1534,14 @@ async fn run(options: &ClientOptions) -> anyhow::Result<()> { }, ClientCommand::Wallet(wallet_command) => match wallet_command { - WalletCommand::Show { chain_id } => { - wallet::pretty_print(&*options.wallet().await?, *chain_id); + WalletCommand::Show { chain_id, short } => { + if *short { + for chain_id in options.wallet().await?.chains.keys() { + println!("{chain_id}"); + } + } else { + wallet::pretty_print(&*options.wallet().await?, *chain_id); + } Ok(()) } diff --git a/linera-service/tests/linera_net_tests.rs b/linera-service/tests/linera_net_tests.rs index 69ace1ae1e70..f65a596e28ac 100644 --- a/linera-service/tests/linera_net_tests.rs +++ b/linera-service/tests/linera_net_tests.rs @@ -2959,6 +2959,11 @@ async fn test_end_to_end_fungible_client_benchmark(config: impl LineraNetConfig) #[cfg_attr(feature = "remote-net", test_case(RemoteNetTestingConfig::new(None) ; "remote_net_grpc"))] #[test_log::test(tokio::test)] async fn test_end_to_end_listen_for_new_rounds(config: impl LineraNetConfig) -> Result<()> { + use std::{ + sync::{Arc, Barrier}, + thread, + }; + let _guard = INTEGRATION_TEST_GUARD.lock().await; tracing::info!("Starting test {}", test_name!()); @@ -2987,20 +2992,51 @@ async fn test_end_to_end_listen_for_new_rounds(config: impl LineraNetConfig) -> client2.assign(client2_key, message_id).await?; client2.sync(chain2).await?; - let (mut tx1, mut rx) = mpsc::channel(8); - let mut tx2 = tx1.clone(); - let handle1: JoinHandle> = tokio::spawn(async move { - loop { - client1.transfer(Amount::ONE, chain2, chain1).await?; - tx1.send(()).await?; - } - }); - let handle2: JoinHandle> = tokio::spawn(async move { - loop { - client2.transfer(Amount::ONE, chain2, chain1).await?; - tx2.send(()).await?; + let (tx, mut rx) = mpsc::channel(8); + let drop_barrier = Arc::new(Barrier::new(3)); + let handle1 = tokio::spawn(run_client( + drop_barrier.clone(), + client1, + tx.clone(), + chain2, + chain1, + )); + let handle2 = tokio::spawn(run_client( + drop_barrier.clone(), + client2, + tx, + chain2, + chain1, + )); + + /// Runs the `client` in a task, so that it can race to produce blocks transferring tokens. + /// + /// Stops when transferring fails or the `notifier` channel is closed. When exiting, it will + /// drop the client in a separate thread so that the synchronous `Drop` implementation + /// can close the chains without blocking the asynchronous worker thread, which might be + /// shared with the other client's task. If the asynchronous thread is blocked, the + /// other client might have the round but not be able to execute and propose a block, + /// deadlocking the test. + async fn run_client( + drop_barrier: Arc, + client: ClientWrapper, + mut notifier: mpsc::Sender<()>, + source: ChainId, + target: ChainId, + ) -> Result>> { + let result = async { + loop { + client.transfer(Amount::ONE, source, target).await?; + notifier.send(()).await?; + } } - }); + .await; + thread::spawn(move || { + drop(client); + drop_barrier.wait(); + }); + result + } for _ in 0..8 { let () = rx.next().await.unwrap(); @@ -3011,6 +3047,7 @@ async fn test_end_to_end_listen_for_new_rounds(config: impl LineraNetConfig) -> assert!(result1?.is_err()); assert!(result2?.is_err()); + drop_barrier.wait(); net.ensure_is_running().await?; net.terminate().await?; diff --git a/linera-service/tests/local_net_tests.rs b/linera-service/tests/local_net_tests.rs index 9ef5e1807f66..b898043f47ec 100644 --- a/linera-service/tests/local_net_tests.rs +++ b/linera-service/tests/local_net_tests.rs @@ -23,7 +23,7 @@ use linera_base::{ use linera_service::{ cli_wrappers::{ local_net::{get_node_port, Database, LocalNetConfig, PathProvider, ProcessInbox}, - ClientWrapper, FaucetOption, LineraNet, LineraNetConfig, Network, + ClientWrapper, FaucetOption, LineraNet, LineraNetConfig, Network, OnClientDrop, }, test_name, }; @@ -417,7 +417,13 @@ async fn test_project_new() -> Result<()> { let _rustflags_override = override_disable_warnings_as_errors(); let path_provider = PathProvider::create_temporary_directory()?; let id = 0; - let client = ClientWrapper::new(path_provider, Network::Grpc, None, id); + let client = ClientWrapper::new( + path_provider, + Network::Grpc, + None, + id, + OnClientDrop::LeakChains, + ); let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let linera_root = manifest_dir .parent() @@ -435,7 +441,13 @@ async fn test_project_new() -> Result<()> { async fn test_project_test() -> Result<()> { let path_provider = PathProvider::create_temporary_directory()?; let id = 0; - let client = ClientWrapper::new(path_provider, Network::Grpc, None, id); + let client = ClientWrapper::new( + path_provider, + Network::Grpc, + None, + id, + OnClientDrop::LeakChains, + ); client .project_test(&ClientWrapper::example_path("counter")?) .await?;