Skip to content

Commit

Permalink
Faucet has a chain listener (#2410) (#2529)
Browse files Browse the repository at this point in the history
* Faucet has a chain listener

* Decouple ChainListener from ChainClient for notifications

* Faucet uses ChainClients

* Updated tests

* Formatting

* ChainClients created from Iterator instead of Context

* Update CLI.md
  • Loading branch information
christos-h authored Sep 24, 2024
1 parent b0d6c07 commit 085e7ad
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 53 deletions.
7 changes: 7 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,13 @@ Run a GraphQL service that exposes a faucet where users can claim tokens. This g
Default value: `8080`
* `--amount <AMOUNT>` — The number of tokens to send to each new chain
* `--limit-rate-until <LIMIT_RATE_UNTIL>` — The end timestamp: The faucet will rate-limit the token supply so it runs out of money no earlier than this
* `--listener-skip-process-inbox` — Do not create blocks automatically to receive incoming messages. Instead, wait for an explicit mutation `processInbox`
* `--listener-delay-before-ms <DELAY_BEFORE_MS>` — Wait before processing any notification (useful for testing)

Default value: `0`
* `--listener-delay-after-ms <DELAY_AFTER_MS>` — Wait after processing any notification (useful for rate limiting)

Default value: `0`



Expand Down
45 changes: 35 additions & 10 deletions linera-client/src/chain_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use linera_base::identifiers::ChainId;
use linera_core::client::ChainClient;
use linera_storage::Storage;

use crate::error::{self, Error};
use crate::{
chain_listener::ClientContext,
error::{self, Error},
};

pub type ClientMapInner<P, S> = BTreeMap<ChainId, ChainClient<P, S>>;
pub struct ChainClients<P, S>(pub Arc<Mutex<ClientMapInner<P, S>>>)
Expand All @@ -24,18 +27,10 @@ where
}
}

impl<P, S> Default for ChainClients<P, S>
where
S: Storage,
{
fn default() -> Self {
Self(Arc::new(Mutex::new(BTreeMap::new())))
}
}

impl<P, S> ChainClients<P, S>
where
S: Storage,
P: 'static,
{
async fn client(&self, chain_id: &ChainId) -> Option<ChainClient<P, S>> {
Some(self.0.lock().await.get(chain_id)?.clone())
Expand All @@ -54,4 +49,34 @@ where
pub async fn map_lock(&self) -> MutexGuard<ClientMapInner<P, S>> {
self.0.lock().await
}

pub async fn add_client(&self, client: ChainClient<P, S>) {
self.0.lock().await.insert(client.chain_id(), client);
}

pub async fn request_client(
&self,
chain_id: ChainId,
context: Arc<Mutex<impl ClientContext<ValidatorNodeProvider = P, Storage = S>>>,
) -> ChainClient<P, S> {
let mut guard = self.0.lock().await;
match guard.get(&chain_id) {
Some(client) => client.clone(),
None => {
let context = context.lock().await;
let client = context.make_chain_client(chain_id);
guard.insert(chain_id, client.clone());
client
}
}
}

pub async fn from_clients(chains: impl IntoIterator<Item = ChainClient<P, S>>) -> Self {
let chain_clients = Self(Default::default());
for chain_client in chains {
let mut map_guard = chain_clients.map_lock().await;
map_guard.insert(chain_client.chain_id(), chain_client);
}
chain_clients
}
}
48 changes: 32 additions & 16 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::btree_map, sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use futures::{
Expand Down Expand Up @@ -80,6 +80,14 @@ pub trait ClientContext {
&mut self,
client: &ChainClient<Self::ValidatorNodeProvider, Self::Storage>,
);

fn clients(&self) -> Vec<ChainClient<Self::ValidatorNodeProvider, Self::Storage>> {
let mut clients = vec![];
for chain_id in &self.wallet().chain_ids() {
clients.push(self.make_chain_client(*chain_id));
}
clients
}
}

/// A `ChainListener` is a process that listens to notifications from validators and reacts
Expand All @@ -90,6 +98,7 @@ where
{
config: ChainListenerConfig,
clients: ChainClients<P, S>,
listening: Arc<Mutex<HashSet<ChainId>>>,
}

impl<P, S> ChainListener<P, S>
Expand All @@ -100,7 +109,11 @@ where
{
/// Creates a new chain listener given client chains.
pub fn new(config: ChainListenerConfig, clients: ChainClients<P, S>) -> Self {
Self { config, clients }
Self {
config,
clients,
listening: Default::default(),
}
}

/// Runs the chain listener.
Expand All @@ -116,6 +129,7 @@ where
context.clone(),
storage.clone(),
self.config.clone(),
self.listening.clone(),
);
}
}
Expand All @@ -127,13 +141,15 @@ where
context: Arc<Mutex<C>>,
storage: S,
config: ChainListenerConfig,
listening: Arc<Mutex<HashSet<ChainId>>>,
) where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
let _handle = linera_base::task::spawn(
async move {
if let Err(err) =
Self::run_client_stream(chain_id, clients, context, storage, config).await
Self::run_client_stream(chain_id, clients, context, storage, config, listening)
.await
{
error!("Stream for chain {} failed: {}", chain_id, err);
}
Expand All @@ -149,24 +165,23 @@ where
context: Arc<Mutex<C>>,
storage: S,
config: ChainListenerConfig,
listening: Arc<Mutex<HashSet<ChainId>>>,
) -> Result<(), Error>
where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
let client = {
let mut map_guard = clients.map_lock().await;
let context_guard = context.lock().await;
let btree_map::Entry::Vacant(entry) = map_guard.entry(chain_id) else {
// For every entry in the client map we are already listening to notifications, so
// there's nothing to do. This can happen if we download a child before the parent
// chain, and then process the OpenChain message in the parent.
return Ok(());
};
let client = context_guard.make_chain_client(chain_id);
entry.insert(client.clone());
client
};
let mut guard = listening.lock().await;
if guard.contains(&chain_id) {
// If we are already listening to notifications, there's nothing to do.
// This can happen if we download a child before the parent
// chain, and then process the OpenChain message in the parent.
return Ok(());
}
// If the client is not present, we can request it.
let client = clients.request_client(chain_id, context.clone()).await;
let (listener, _listen_handle, mut local_stream) = client.listen().await?;
guard.insert(chain_id);
drop(guard);
client.synchronize_from_validators().await?;
drop(linera_base::task::spawn(listener.in_current_span()));
let mut timeout = storage.clock().current_time();
Expand Down Expand Up @@ -265,6 +280,7 @@ where
context.clone(),
storage.clone(),
config.clone(),
listening.clone(),
);
}
}
Expand Down
4 changes: 4 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ pub enum ClientCommand {
/// no earlier than this.
#[arg(long)]
limit_rate_until: Option<DateTime<Utc>>,

/// Configuration for the faucet chain listener.
#[command(flatten)]
config: ChainListenerConfig,
},

/// Publish bytecode.
Expand Down
5 changes: 3 additions & 2 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use linera_views::memory::MemoryStore;
use rand::SeedableRng as _;

use crate::{
chain_listener::{self, ChainListener, ChainListenerConfig, ClientContext as _},
chain_listener::{self, ChainClients, ChainListener, ChainListenerConfig, ClientContext as _},
config::{CommitteeConfig, GenesisConfig, ValidatorConfig},
wallet::{UserChain, Wallet},
};
Expand Down Expand Up @@ -159,8 +159,9 @@ async fn test_chain_listener() -> anyhow::Result<()> {
let key_pair = KeyPair::generate_from(&mut rng);
let public_key = key_pair.public();
context.update_wallet_for_new_chain(chain_id0, Some(key_pair), clock.current_time());
let chain_clients = ChainClients::from_clients(context.clients()).await;
let context = Arc::new(Mutex::new(context));
let listener = ChainListener::new(config, Default::default());
let listener = ChainListener::new(config, chain_clients);
listener.run(context, storage).await;

// Transfer ownership of chain 0 to the chain listener and some other key. The listener will
Expand Down
59 changes: 45 additions & 14 deletions linera-service/src/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ use linera_base::{
identifiers::{ChainId, MessageId},
ownership::ChainOwnership,
};
use linera_client::{chain_listener::ClientContext, config::GenesisConfig};
use linera_core::{client::ChainClient, data_types::ClientOutcome, node::ValidatorNodeProvider};
use linera_client::{
chain_clients::ChainClients,
chain_listener::{ChainListener, ChainListenerConfig, ClientContext},
config::GenesisConfig,
};
use linera_core::{
data_types::ClientOutcome,
node::{ValidatorNode, ValidatorNodeProvider},
};
use linera_execution::committee::ValidatorName;
use linera_storage::{Clock as _, Storage};
use serde::Deserialize;
Expand All @@ -33,15 +40,17 @@ where
S: Storage,
{
genesis_config: Arc<GenesisConfig>,
client: Arc<Mutex<ChainClient<P, S>>>,
clients: ChainClients<P, S>,
chain_id: ChainId,
}

/// The root GraphQL mutation type.
pub struct MutationRoot<P, S, C>
where
S: Storage,
{
client: Arc<Mutex<ChainClient<P, S>>>,
clients: ChainClients<P, S>,
chain_id: ChainId,
context: Arc<Mutex<C>>,
amount: Amount,
end_timestamp: Timestamp,
Expand Down Expand Up @@ -84,7 +93,7 @@ where

/// Returns the current committee's validators.
async fn current_validators(&self) -> Result<Vec<Validator>, Error> {
let client = self.client.lock().await;
let client = self.clients.try_client_lock(&self.chain_id).await?;
let committee = client.local_committee().await?;
Ok(committee
.validators()
Expand Down Expand Up @@ -119,7 +128,7 @@ where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
async fn do_claim(&self, public_key: PublicKey) -> Result<ClaimOutcome, Error> {
let client = self.client.lock().await;
let client = self.clients.try_client_lock(&self.chain_id).await?;

if self.start_timestamp < self.end_timestamp {
let local_time = client.storage_client().clock().current_time();
Expand Down Expand Up @@ -149,7 +158,7 @@ where
let result = client
.open_chain(ownership, ApplicationPermissions::default(), self.amount)
.await;
self.context.lock().await.update_wallet(&*client).await;
self.context.lock().await.update_wallet(&client).await;
let (message_id, certificate) = match result? {
ClientOutcome::Committed(result) => result,
ClientOutcome::WaitForTimeout(timeout) => {
Expand Down Expand Up @@ -189,9 +198,12 @@ pub struct FaucetService<P, S, C>
where
S: Storage,
{
client: Arc<Mutex<ChainClient<P, S>>>,
clients: ChainClients<P, S>,
chain_id: ChainId,
context: Arc<Mutex<C>>,
genesis_config: Arc<GenesisConfig>,
config: ChainListenerConfig,
storage: S,
port: NonZeroU16,
amount: Amount,
end_timestamp: Timestamp,
Expand All @@ -205,9 +217,12 @@ where
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
clients: self.clients.clone(),
chain_id: self.chain_id,
context: self.context.clone(),
genesis_config: self.genesis_config.clone(),
config: self.config.clone(),
storage: self.storage.clone(),
port: self.port,
amount: self.amount,
end_timestamp: self.end_timestamp,
Expand All @@ -220,25 +235,35 @@ where
impl<P, S, C> FaucetService<P, S, C>
where
P: ValidatorNodeProvider + Send + Sync + Clone + 'static,
<<P as ValidatorNodeProvider>::Node as ValidatorNode>::NotificationStream: Send,
S: Storage + Clone + Send + Sync + 'static,
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
/// Creates a new instance of the faucet service.
#[allow(clippy::too_many_arguments)]
pub async fn new(
port: NonZeroU16,
client: ChainClient<P, S>,
chain_id: ChainId,
context: C,
amount: Amount,
end_timestamp: Timestamp,
genesis_config: Arc<GenesisConfig>,
config: ChainListenerConfig,
storage: S,
) -> anyhow::Result<Self> {
let clients = ChainClients::<P, S>::from_clients(context.clients()).await;
let context = Arc::new(Mutex::new(context));
let client = clients.try_client_lock(&chain_id).await?;
let start_timestamp = client.storage_client().clock().current_time();
client.process_inbox().await?;
let start_balance = client.local_balance().await?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
context: Arc::new(Mutex::new(context)),
clients,
chain_id,
context,
genesis_config,
config,
storage,
port,
amount,
end_timestamp,
Expand All @@ -249,7 +274,8 @@ where

pub fn schema(&self) -> Schema<QueryRoot<P, S>, MutationRoot<P, S, C>, EmptySubscription> {
let mutation_root = MutationRoot {
client: self.client.clone(),
clients: self.clients.clone(),
chain_id: self.chain_id,
context: self.context.clone(),
amount: self.amount,
end_timestamp: self.end_timestamp,
Expand All @@ -258,7 +284,8 @@ where
};
let query_root = QueryRoot {
genesis_config: self.genesis_config.clone(),
client: self.client.clone(),
clients: self.clients.clone(),
chain_id: self.chain_id,
};
Schema::build(query_root, mutation_root, EmptySubscription).finish()
}
Expand All @@ -277,6 +304,10 @@ where

info!("GraphiQL IDE: http://localhost:{}", port);

ChainListener::new(self.config.clone(), self.clients.clone())
.run(self.context.clone(), self.storage.clone())
.await;

axum::serve(
tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?,
app,
Expand Down
Loading

0 comments on commit 085e7ad

Please sign in to comment.