diff --git a/rs/p2p/artifact_manager/src/lib.rs b/rs/p2p/artifact_manager/src/lib.rs index 3fd2cd14d74..92949b68976 100644 --- a/rs/p2p/artifact_manager/src/lib.rs +++ b/rs/p2p/artifact_manager/src/lib.rs @@ -20,13 +20,11 @@ use std::{ time::Duration, }; use tokio::{ - sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, + sync::mpsc::{Sender, UnboundedReceiver}, time::timeout, }; use tracing::instrument; -type ArtifactEventSender = UnboundedSender>; - /// Metrics for a client artifact processor. struct ArtifactProcessorMetrics { /// The processing time histogram. @@ -136,26 +134,18 @@ pub fn run_artifact_processor( time_source: Arc, metrics_registry: MetricsRegistry, client: Box>, - send_advert: Sender>, + outbound_tx: Sender>, + inbound_rx: UnboundedReceiver>, initial_artifacts: Vec, -) -> (Box, ArtifactEventSender) { - // Making this channel bounded can be problematic since we don't have true multiplexing - // of P2P messages. - // Possible scenario is - adverts+chunks arrive on the same channel, slow consensus - // will result on slow consuption of chunks. Slow consumption of chunks will in turn - // result in slower consumptions of adverts. Ideally adverts are consumed at rate - // independent of consensus. - #[allow(clippy::disallowed_methods)] - let (sender, receiver) = unbounded_channel(); +) -> Box { let shutdown = Arc::new(AtomicBool::new(false)); - // Spawn the processor thread let shutdown_cl = shutdown.clone(); let handle = ThreadBuilder::new() .name(format!("{}_Processor", Artifact::NAME)) .spawn(move || { for artifact in initial_artifacts { - let _ = send_advert.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt { + let _ = outbound_tx.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt { artifact, is_latency_sensitive: false, })); @@ -163,18 +153,14 @@ pub fn run_artifact_processor( process_messages( time_source, client, - send_advert, - receiver, + outbound_tx, + inbound_rx, ArtifactProcessorMetrics::new(metrics_registry, Artifact::NAME.to_string()), shutdown_cl, ); }) .unwrap(); - - ( - Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown)), - sender, - ) + Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown)) } // The artifact processor thread loop @@ -243,7 +229,8 @@ const ARTIFACT_MANAGER_TIMER_DURATION_MSEC: u64 = 200; pub fn create_ingress_handlers< PoolIngress: MutablePool + Send + Sync + ValidatedPoolReader + 'static, >( - send_advert: Sender>, + outbound_tx: Sender>, + inbound_rx: UnboundedReceiver>, time_source: Arc, ingress_pool: Arc>, ingress_handler: Arc< @@ -254,46 +241,41 @@ pub fn create_ingress_handlers< + Sync, >, metrics_registry: MetricsRegistry, -) -> ( - UnboundedSender>, - Box, -) { +) -> Box { let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler); - let (jh, sender) = run_artifact_processor( + run_artifact_processor( time_source.clone(), metrics_registry, Box::new(client), - send_advert, + outbound_tx, + inbound_rx, vec![], - ); - (sender, jh) + ) } -/// Starts the event loop that pools consensus for updates on what needs to be replicated. +/// Starts the event loop that polls consensus for updates on what needs to be replicated. pub fn create_artifact_handler< Artifact: IdentifiableArtifact + Send + Sync + 'static, Pool: MutablePool + Send + Sync + ValidatedPoolReader + 'static, C: PoolMutationsProducer>::Mutations> + 'static, >( - send_advert: Sender>, + outbound_tx: Sender>, + inbound_rx: UnboundedReceiver>, change_set_producer: C, time_source: Arc, pool: Arc>, metrics_registry: MetricsRegistry, -) -> ( - UnboundedSender>, - Box, -) { +) -> Box { let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect(); let client = Processor::new(pool, change_set_producer); - let (jh, sender) = run_artifact_processor( + run_artifact_processor( time_source.clone(), metrics_registry, Box::new(client), - send_advert, + outbound_tx, + inbound_rx, inital_artifacts, - ); - (sender, jh) + ) } // TODO: make it private, it is used only for tests outside of this crate @@ -472,11 +454,14 @@ mod tests { let time_source = Arc::new(SysTimeSource::new()); let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100); + #[allow(clippy::disallowed_methods)] + let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); run_artifact_processor::( time_source, MetricsRegistry::default(), Box::new(DummyProcessor), send_tx, + inbound_rx, (0..10).map(Into::into).collect(), ); diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index bb2ee7d8bcf..3b957c05fde 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -16,7 +16,7 @@ use phantom_newtype::AmountOf; use tokio::{ runtime::Handle, sync::{ - mpsc::{Receiver, UnboundedSender}, + mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, watch, }, }; @@ -28,7 +28,13 @@ mod sender; type StartConsensusManagerFn = Box, watch::Receiver) -> Vec>; -pub struct ConsensusManagerBuilder { +/// Same order of magnitude as the number of active artifacts. +const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; + +pub type AbortableBroadcastSender = Sender>; +pub type AbortableBroadcastReceiver = UnboundedReceiver>; + +pub struct AbortableBroadcastChannelBuilder { log: ReplicaLogger, metrics_registry: MetricsRegistry, rt_handle: Handle, @@ -36,7 +42,7 @@ pub struct ConsensusManagerBuilder { router: Option, } -impl ConsensusManagerBuilder { +impl AbortableBroadcastChannelBuilder { pub fn new(log: ReplicaLogger, rt_handle: Handle, metrics_registry: MetricsRegistry) -> Self { Self { log, @@ -47,18 +53,32 @@ impl ConsensusManagerBuilder { } } - pub fn add_client< + /// Creates a channel for the corresponding artifact. The channel is used to broadcast artifacts within the subnet. + pub fn abortable_broadcast_channel< Artifact: IdentifiableArtifact, WireArtifact: PbArtifact, F: FnOnce(Arc) -> D + 'static, D: ArtifactAssembler, >( &mut self, - outbound_artifacts_rx: Receiver>, - inbound_artifacts_tx: UnboundedSender>, (assembler, assembler_router): (F, Router), slot_limit: usize, + ) -> ( + AbortableBroadcastSender, + AbortableBroadcastReceiver, + // TODO: remove this by introducing a new channel from the http handler into the processor + UnboundedSender>, ) { + let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE); + // Making this channel bounded can be problematic since we don't have true multiplexing + // of P2P messages. + // Possible scenario is - adverts+chunks arrive on the same channel, slow consensus + // will result on slow consuption of chunks. Slow consumption of chunks will in turn + // result in slower consumptions of adverts. Ideally adverts are consumed at rate + // independent of consensus. + #[allow(clippy::disallowed_methods)] + let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); + assert!(uri_prefix::() .chars() .all(char::is_alphabetic)); @@ -68,14 +88,15 @@ impl ConsensusManagerBuilder { let rt_handle = self.rt_handle.clone(); let metrics_registry = self.metrics_registry.clone(); + let inbound_tx_c = inbound_tx.clone(); let builder = move |transport: Arc, topology_watcher| { start_consensus_manager( log, &metrics_registry, rt_handle, - outbound_artifacts_rx, + outbound_rx, adverts_from_peers_rx, - inbound_artifacts_tx, + inbound_tx, assembler(transport.clone()), transport, topology_watcher, @@ -92,6 +113,7 @@ impl ConsensusManagerBuilder { ); self.clients.push(Box::new(builder)); + (outbound_tx, inbound_rx, inbound_tx_c) } pub fn router(&mut self) -> Router { diff --git a/rs/p2p/consensus_manager/src/receiver.rs b/rs/p2p/consensus_manager/src/receiver.rs index a448158b3d5..2d1a7e31ae1 100644 --- a/rs/p2p/consensus_manager/src/receiver.rs +++ b/rs/p2p/consensus_manager/src/receiver.rs @@ -652,6 +652,7 @@ mod tests { fn new() -> Self { let (_, adverts_received) = tokio::sync::mpsc::channel(100); + #[allow(clippy::disallowed_methods)] let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel(); let (_, topology_watcher) = watch::channel(SubnetTopology::default()); let artifact_assembler = diff --git a/rs/p2p/test_utils/src/lib.rs b/rs/p2p/test_utils/src/lib.rs index 7d749c41c3d..4135223ca9c 100644 --- a/rs/p2p/test_utils/src/lib.rs +++ b/rs/p2p/test_utils/src/lib.rs @@ -449,30 +449,31 @@ pub fn start_consensus_manager( processor: TestConsensus, ) -> ( Box, - ic_consensus_manager::ConsensusManagerBuilder, + ic_consensus_manager::AbortableBroadcastChannelBuilder, ) { let _enter = rt_handle.enter(); let pool = Arc::new(RwLock::new(processor)); - let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) = - start_test_processor(pool.clone(), pool.clone().read().unwrap().clone()); let bouncer_factory = Arc::new(pool.clone().read().unwrap().clone()); - let mut cm1 = ic_consensus_manager::ConsensusManagerBuilder::new( + let downloader = FetchArtifact::new( log.clone(), rt_handle.clone(), + pool.clone(), + bouncer_factory, MetricsRegistry::default(), ); - let downloader = FetchArtifact::new( - log, - rt_handle, - pool, - bouncer_factory, + + let mut cm1 = ic_consensus_manager::AbortableBroadcastChannelBuilder::new( + log.clone(), + rt_handle.clone(), MetricsRegistry::default(), ); - cm1.add_client( - artifact_manager_event_rx, - artifact_sender, - downloader, - usize::MAX, + let (outbound_tx, inbound_rx, _) = cm1.abortable_broadcast_channel(downloader, usize::MAX); + + let artifact_processor_jh = start_test_processor( + outbound_tx, + inbound_rx, + pool.clone(), + pool.clone().read().unwrap().clone(), ); (artifact_processor_jh, cm1) } diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index 3096d4453ab..c484641fb89 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -346,7 +346,7 @@ pub fn add_transport_to_sim( async move { let metrics_registry = MetricsRegistry::default(); - let mut consensus_builder = ic_consensus_manager::ConsensusManagerBuilder::new( + let mut consensus_builder = ic_consensus_manager::AbortableBroadcastChannelBuilder::new( log.clone(), tokio::runtime::Handle::current(), metrics_registry, @@ -370,25 +370,22 @@ pub fn add_transport_to_sim( }; let _artifact_processor_jh = if let Some(consensus) = consensus_manager_clone { - let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) = - start_test_processor( - consensus.clone(), - consensus.clone().read().unwrap().clone(), - ); let bouncer_factory = Arc::new(consensus.clone().read().unwrap().clone()); - let downloader = FetchArtifact::new( log.clone(), tokio::runtime::Handle::current(), - consensus, + consensus.clone(), bouncer_factory, MetricsRegistry::default(), ); - consensus_builder.add_client( - artifact_manager_event_rx, - artifact_sender, - downloader, - usize::MAX, + let (outbound_tx, inbound_tx, _) = + consensus_builder.abortable_broadcast_channel(downloader, usize::MAX); + + let artifact_processor_jh = start_test_processor( + outbound_tx, + inbound_tx, + consensus.clone(), + consensus.clone().read().unwrap().clone(), ); router = Some(router.unwrap_or_default().merge(consensus_builder.router())); @@ -442,22 +439,19 @@ pub fn waiter_fut( #[allow(clippy::type_complexity)] pub fn start_test_processor( + outbound_tx: mpsc::Sender>, + inbound_rx: mpsc::UnboundedReceiver>, pool: Arc>>, change_set_producer: TestConsensus, -) -> ( - Box, - mpsc::Receiver>, - mpsc::UnboundedSender>, -) { - let (tx, rx) = tokio::sync::mpsc::channel(1000); +) -> Box { let time_source = Arc::new(SysTimeSource::new()); let client = ic_artifact_manager::Processor::new(pool, change_set_producer); - let (jh, sender) = run_artifact_processor( + run_artifact_processor( time_source, MetricsRegistry::default(), Box::new(client), - tx, + outbound_tx, + inbound_rx, vec![], - ); - (jh, rx, sender) + ) } diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index 982cddc301c..44824366df2 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -15,7 +15,8 @@ use ic_consensus::{ consensus::{ConsensusBouncer, ConsensusImpl}, idkg, }; -use ic_consensus_manager::ConsensusManagerBuilder; +use ic_consensus_dkg::DkgBouncer; +use ic_consensus_manager::AbortableBroadcastChannelBuilder; use ic_consensus_utils::{crypto::ConsensusCrypto, pool_reader::PoolReader}; use ic_crypto_interfaces_sig_verification::IngressSigVerifier; use ic_crypto_tls_interfaces::TlsConfig; @@ -67,7 +68,6 @@ use tower_http::trace::TraceLayer; /// advertising the blocks. const HASHES_IN_BLOCKS_FEATURE_ENABLED: bool = false; -pub const MAX_ADVERT_BUFFER: usize = 100_000; /// This limit is used to protect against a malicious peer advertising many ingress messages. /// If no malicious peers are present the ingress pools are bounded by a separate limit. const SLOT_TABLE_LIMIT_INGRESS: usize = 50_000; @@ -246,11 +246,11 @@ fn start_consensus( Arc>, UnboundedSender>, Vec>, - ConsensusManagerBuilder, + AbortableBroadcastChannelBuilder, ) { let time_source = Arc::new(SysTimeSource::new()); - let mut new_p2p_consensus: ic_consensus_manager::ConsensusManagerBuilder = - ic_consensus_manager::ConsensusManagerBuilder::new( + let mut new_p2p_consensus: ic_consensus_manager::AbortableBroadcastChannelBuilder = + ic_consensus_manager::AbortableBroadcastChannelBuilder::new( log.clone(), rt_handle.clone(), metrics_registry.clone(), @@ -305,13 +305,6 @@ fn start_consensus( &PoolReader::new(&*consensus_pool.read().unwrap()), ))); - let (consensus_tx, consensus_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (certification_tx, certification_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (dkg_tx, dkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (ingress_tx, ingress_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (idkg_tx, idkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (http_outcalls_tx, http_outcalls_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - { let consensus_impl = ConsensusImpl::new( replica_config.clone(), @@ -337,53 +330,43 @@ fn start_consensus( let consensus_pool = Arc::clone(&consensus_pool); - // Create the consensus client. - let (client, jh) = create_artifact_handler( - consensus_tx, - consensus_impl, - time_source.clone(), - consensus_pool.clone(), - metrics_registry.clone(), - ); - - join_handles.push(jh); - let bouncer = Arc::new(ConsensusBouncer::new(metrics_registry, message_router)); - if HASHES_IN_BLOCKS_FEATURE_ENABLED { + let (outbound_tx, inbound_rx, _) = if HASHES_IN_BLOCKS_FEATURE_ENABLED { let assembler = ic_artifact_downloader::FetchStrippedConsensusArtifact::new( log.clone(), rt_handle.clone(), - consensus_pool, + consensus_pool.clone(), artifact_pools.ingress_pool.clone(), bouncer, metrics_registry.clone(), node_id, ); - new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT) } else { let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), rt_handle.clone(), - consensus_pool, + consensus_pool.clone(), bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT) }; - }; - let ingress_sender = { - // Create the ingress client. - let (client, jh) = create_ingress_handlers( - ingress_tx, - Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.ingress_pool), - ingress_manager, + // Create the consensus client. + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, + consensus_impl, + time_source.clone(), + consensus_pool, metrics_registry.clone(), ); join_handles.push(jh); + }; + let ingress_sender = { let bouncer = Arc::new(IngressBouncer::new(time_source.clone())); let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), @@ -393,13 +376,19 @@ fn start_consensus( metrics_registry.clone(), ); - new_p2p_consensus.add_client( - ingress_rx, - client.clone(), - assembler, - SLOT_TABLE_LIMIT_INGRESS, + let (outbound_tx, inbound_rx, inbound_tx) = + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_LIMIT_INGRESS); + // Create the ingress client. + let jh = create_ingress_handlers( + outbound_tx, + inbound_rx, + Arc::clone(&time_source) as Arc<_>, + Arc::clone(&artifact_pools.ingress_pool), + ingress_manager, + metrics_registry.clone(), ); - client + join_handles.push(jh); + inbound_tx }; { @@ -413,32 +402,45 @@ fn start_consensus( log.clone(), max_certified_height_tx, ); + let bouncer = CertifierBouncer::new(metrics_registry, Arc::clone(&consensus_pool_cache)); + let assembler = ic_artifact_downloader::FetchArtifact::new( + log.clone(), + rt_handle.clone(), + artifact_pools.certification_pool.clone(), + Arc::new(bouncer), + metrics_registry.clone(), + ); + let (outbound_tx, inbound_rx, _) = + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); // Create the certification client. - let (client, jh) = create_artifact_handler( - certification_tx, + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, certifier, Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.certification_pool), + artifact_pools.certification_pool, metrics_registry.clone(), ); join_handles.push(jh); + }; - let bouncer = CertifierBouncer::new(metrics_registry, Arc::clone(&consensus_pool_cache)); + { + let bouncer = Arc::new(DkgBouncer::new(metrics_registry)); let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), rt_handle.clone(), - artifact_pools.certification_pool, - Arc::new(bouncer), + artifact_pools.dkg_pool.clone(), + bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT); - }; - { + let (outbound_tx, inbound_rx, _) = + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); // Create the DKG client. - let (client, jh) = create_artifact_handler( - dkg_tx, + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, ic_consensus_dkg::DkgImpl::new( node_id, Arc::clone(&consensus_crypto), @@ -448,22 +450,11 @@ fn start_consensus( log.clone(), ), Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.dkg_pool), - metrics_registry.clone(), - ); - join_handles.push(jh); - - let bouncer = Arc::new(ic_consensus_dkg::DkgBouncer::new(metrics_registry)); - let assembler = ic_artifact_downloader::FetchArtifact::new( - log.clone(), - rt_handle.clone(), artifact_pools.dkg_pool, - bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(dkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + join_handles.push(jh); }; - { let finalized = consensus_pool_cache.finalized_block(); let chain_key_config = @@ -478,9 +469,26 @@ fn start_consensus( finalized.payload.as_ref().is_summary(), finalized.payload.as_ref().as_idkg().is_some(), ); + let bouncer = Arc::new(idkg::IDkgBouncer::new( + metrics_registry, + subnet_id, + consensus_pool.read().unwrap().get_block_cache(), + Arc::clone(&state_reader), + )); + let assembler = ic_artifact_downloader::FetchArtifact::new( + log.clone(), + rt_handle.clone(), + artifact_pools.idkg_pool.clone(), + bouncer, + metrics_registry.clone(), + ); + + let (outbound_tx, inbound_rx, _) = + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); - let (client, jh) = create_artifact_handler( - idkg_tx, + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, idkg::IDkgImpl::new( node_id, consensus_pool.read().unwrap().get_block_cache(), @@ -491,31 +499,32 @@ fn start_consensus( malicious_flags, ), Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.idkg_pool), + artifact_pools.idkg_pool, metrics_registry.clone(), ); - join_handles.push(jh); + }; - let bouncer = Arc::new(idkg::IDkgBouncer::new( - metrics_registry, - subnet_id, - consensus_pool.read().unwrap().get_block_cache(), + { + let bouncer = Arc::new(CanisterHttpGossipImpl::new( + Arc::clone(&consensus_pool_cache), Arc::clone(&state_reader), + log.clone(), )); let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), rt_handle.clone(), - artifact_pools.idkg_pool, + artifact_pools.canister_http_pool.clone(), bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); - }; - { - let (client, jh) = create_artifact_handler( - http_outcalls_tx, + let (outbound_tx, inbound_rx, _) = + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); + + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, CanisterHttpPoolManagerImpl::new( Arc::clone(&state_reader), Arc::new(Mutex::new(canister_http_adapter_client)), @@ -527,24 +536,10 @@ fn start_consensus( log.clone(), ), Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.canister_http_pool), - metrics_registry.clone(), - ); - join_handles.push(jh); - - let bouncer = Arc::new(CanisterHttpGossipImpl::new( - Arc::clone(&consensus_pool_cache), - Arc::clone(&state_reader), - log.clone(), - )); - let assembler = ic_artifact_downloader::FetchArtifact::new( - log.clone(), - rt_handle.clone(), artifact_pools.canister_http_pool, - bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(http_outcalls_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + join_handles.push(jh); }; (