From 916e516a64219d758fde6a7587d39bc36ec52a3b Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 09:59:50 +0000 Subject: [PATCH 01/15] . --- rs/p2p/consensus_manager/src/lib.rs | 4 ++-- rs/p2p/test_utils/src/lib.rs | 4 ++-- rs/p2p/test_utils/src/turmoil.rs | 2 +- rs/replica/setup_ic_network/src/lib.rs | 22 ++++++++++------------ 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index bb2ee7d8bcf..1cab6d75f93 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -28,7 +28,7 @@ mod sender; type StartConsensusManagerFn = Box, watch::Receiver) -> Vec>; -pub struct ConsensusManagerBuilder { +pub struct AbortableBroadcastChannelBuilder { log: ReplicaLogger, metrics_registry: MetricsRegistry, rt_handle: Handle, @@ -36,7 +36,7 @@ pub struct ConsensusManagerBuilder { router: Option, } -impl ConsensusManagerBuilder { +impl AbortableBroadcastChannelBuilder { pub fn new(log: ReplicaLogger, rt_handle: Handle, metrics_registry: MetricsRegistry) -> Self { Self { log, diff --git a/rs/p2p/test_utils/src/lib.rs b/rs/p2p/test_utils/src/lib.rs index 7d749c41c3d..880e795bc28 100644 --- a/rs/p2p/test_utils/src/lib.rs +++ b/rs/p2p/test_utils/src/lib.rs @@ -449,14 +449,14 @@ pub fn start_consensus_manager( processor: TestConsensus, ) -> ( Box, - ic_consensus_manager::ConsensusManagerBuilder, + ic_consensus_manager::AbortableBroadcastChannelBuilder, ) { let _enter = rt_handle.enter(); let pool = Arc::new(RwLock::new(processor)); let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) = start_test_processor(pool.clone(), pool.clone().read().unwrap().clone()); let bouncer_factory = Arc::new(pool.clone().read().unwrap().clone()); - let mut cm1 = ic_consensus_manager::ConsensusManagerBuilder::new( + let mut cm1 = ic_consensus_manager::AbortableBroadcastChannelBuilder::new( log.clone(), rt_handle.clone(), MetricsRegistry::default(), diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index 3096d4453ab..2017d250b6b 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -346,7 +346,7 @@ pub fn add_transport_to_sim( async move { let metrics_registry = MetricsRegistry::default(); - let mut consensus_builder = ic_consensus_manager::ConsensusManagerBuilder::new( + let mut consensus_builder = ic_consensus_manager::AbortableBroadcastChannelBuilder::new( log.clone(), tokio::runtime::Handle::current(), metrics_registry, diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index 832bc7c5741..5226fe9faf4 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -15,7 +15,7 @@ use ic_consensus::{ consensus::{dkg_key_manager::DkgKeyManager, ConsensusBouncer, ConsensusImpl}, dkg, idkg, }; -use ic_consensus_manager::ConsensusManagerBuilder; +use ic_consensus_manager::AbortableBroadcastChannelBuilder; use ic_consensus_utils::{crypto::ConsensusCrypto, pool_reader::PoolReader}; use ic_crypto_interfaces_sig_verification::IngressSigVerifier; use ic_crypto_tls_interfaces::TlsConfig; @@ -246,11 +246,11 @@ fn start_consensus( Arc>, UnboundedSender>, Vec>, - ConsensusManagerBuilder, + AbortableBroadcastChannelBuilder, ) { let time_source = Arc::new(SysTimeSource::new()); - let mut new_p2p_consensus: ic_consensus_manager::ConsensusManagerBuilder = - ic_consensus_manager::ConsensusManagerBuilder::new( + let mut new_p2p_consensus: ic_consensus_manager::AbortableBroadcastChannelBuilder = + ic_consensus_manager::AbortableBroadcastChannelBuilder::new( log.clone(), rt_handle.clone(), metrics_registry.clone(), @@ -305,13 +305,6 @@ fn start_consensus( &PoolReader::new(&*consensus_pool.read().unwrap()), ))); - let (consensus_tx, consensus_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (certification_tx, certification_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (dkg_tx, dkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (ingress_tx, ingress_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (idkg_tx, idkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - let (http_outcalls_tx, http_outcalls_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - { let consensus_impl = ConsensusImpl::new( replica_config.clone(), @@ -337,6 +330,7 @@ fn start_consensus( let consensus_pool = Arc::clone(&consensus_pool); + let (consensus_tx, consensus_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); // Create the consensus client. let (client, jh) = create_artifact_handler( consensus_tx, @@ -373,6 +367,7 @@ fn start_consensus( }; let ingress_sender = { + let (ingress_tx, ingress_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); // Create the ingress client. let (client, jh) = create_ingress_handlers( ingress_tx, @@ -413,7 +408,7 @@ fn start_consensus( log.clone(), max_certified_height_tx, ); - + let (certification_tx, certification_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); // Create the certification client. let (client, jh) = create_artifact_handler( certification_tx, @@ -436,6 +431,7 @@ fn start_consensus( }; { + let (dkg_tx, dkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); // Create the DKG client. let (client, jh) = create_artifact_handler( dkg_tx, @@ -478,6 +474,7 @@ fn start_consensus( finalized.payload.as_ref().is_summary(), finalized.payload.as_ref().as_idkg().is_some(), ); + let (idkg_tx, idkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); let (client, jh) = create_artifact_handler( idkg_tx, @@ -514,6 +511,7 @@ fn start_consensus( }; { + let (http_outcalls_tx, http_outcalls_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); let (client, jh) = create_artifact_handler( http_outcalls_tx, CanisterHttpPoolManagerImpl::new( From 3f552e3cbb8339dc72de20215ab9236bcfc56d06 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 11:18:35 +0000 Subject: [PATCH 02/15] . --- rs/p2p/artifact_manager/src/lib.rs | 58 +++++--------- rs/p2p/consensus_manager/src/lib.rs | 17 ++-- rs/p2p/test_utils/src/lib.rs | 22 +++--- rs/p2p/test_utils/src/turmoil.rs | 37 ++++----- rs/replica/setup_ic_network/src/lib.rs | 103 +++++++++++++------------ 5 files changed, 114 insertions(+), 123 deletions(-) diff --git a/rs/p2p/artifact_manager/src/lib.rs b/rs/p2p/artifact_manager/src/lib.rs index 3fd2cd14d74..177619f570b 100644 --- a/rs/p2p/artifact_manager/src/lib.rs +++ b/rs/p2p/artifact_manager/src/lib.rs @@ -136,26 +136,18 @@ pub fn run_artifact_processor( time_source: Arc, metrics_registry: MetricsRegistry, client: Box>, - send_advert: Sender>, + outbound_tx: Sender>, + inbound_tx: UnboundedReceiver>, initial_artifacts: Vec, -) -> (Box, ArtifactEventSender) { - // Making this channel bounded can be problematic since we don't have true multiplexing - // of P2P messages. - // Possible scenario is - adverts+chunks arrive on the same channel, slow consensus - // will result on slow consuption of chunks. Slow consumption of chunks will in turn - // result in slower consumptions of adverts. Ideally adverts are consumed at rate - // independent of consensus. - #[allow(clippy::disallowed_methods)] - let (sender, receiver) = unbounded_channel(); +) -> Box { let shutdown = Arc::new(AtomicBool::new(false)); - // Spawn the processor thread let shutdown_cl = shutdown.clone(); let handle = ThreadBuilder::new() .name(format!("{}_Processor", Artifact::NAME)) .spawn(move || { for artifact in initial_artifacts { - let _ = send_advert.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt { + let _ = outbound_tx.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt { artifact, is_latency_sensitive: false, })); @@ -163,18 +155,14 @@ pub fn run_artifact_processor( process_messages( time_source, client, - send_advert, - receiver, + outbound_tx, + inbound_tx, ArtifactProcessorMetrics::new(metrics_registry, Artifact::NAME.to_string()), shutdown_cl, ); }) .unwrap(); - - ( - Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown)), - sender, - ) + Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown)) } // The artifact processor thread loop @@ -243,7 +231,8 @@ const ARTIFACT_MANAGER_TIMER_DURATION_MSEC: u64 = 200; pub fn create_ingress_handlers< PoolIngress: MutablePool + Send + Sync + ValidatedPoolReader + 'static, >( - send_advert: Sender>, + outbound_tx: Sender>, + inbound_tx: UnboundedReceiver>, time_source: Arc, ingress_pool: Arc>, ingress_handler: Arc< @@ -254,19 +243,16 @@ pub fn create_ingress_handlers< + Sync, >, metrics_registry: MetricsRegistry, -) -> ( - UnboundedSender>, - Box, -) { +) -> Box { let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler); - let (jh, sender) = run_artifact_processor( + run_artifact_processor( time_source.clone(), metrics_registry, Box::new(client), - send_advert, + outbound_tx, + inbound_tx, vec![], - ); - (sender, jh) + ) } /// Starts the event loop that pools consensus for updates on what needs to be replicated. @@ -275,25 +261,23 @@ pub fn create_artifact_handler< Pool: MutablePool + Send + Sync + ValidatedPoolReader + 'static, C: PoolMutationsProducer>::Mutations> + 'static, >( - send_advert: Sender>, + outbound_tx: Sender>, + inbound_rx: UnboundedReceiver>, change_set_producer: C, time_source: Arc, pool: Arc>, metrics_registry: MetricsRegistry, -) -> ( - UnboundedSender>, - Box, -) { +) -> Box { let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect(); let client = Processor::new(pool, change_set_producer); - let (jh, sender) = run_artifact_processor( + run_artifact_processor( time_source.clone(), metrics_registry, Box::new(client), - send_advert, + outbound_tx, + inbound_rx, inital_artifacts, - ); - (sender, jh) + ) } // TODO: make it private, it is used only for tests outside of this crate diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 1cab6d75f93..f85b80b0d0d 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -16,7 +16,7 @@ use phantom_newtype::AmountOf; use tokio::{ runtime::Handle, sync::{ - mpsc::{Receiver, UnboundedSender}, + mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, watch, }, }; @@ -28,6 +28,8 @@ mod sender; type StartConsensusManagerFn = Box, watch::Receiver) -> Vec>; +pub const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; + pub struct AbortableBroadcastChannelBuilder { log: ReplicaLogger, metrics_registry: MetricsRegistry, @@ -54,11 +56,15 @@ impl AbortableBroadcastChannelBuilder { D: ArtifactAssembler, >( &mut self, - outbound_artifacts_rx: Receiver>, - inbound_artifacts_tx: UnboundedSender>, (assembler, assembler_router): (F, Router), slot_limit: usize, + ) -> ( + Sender>, + UnboundedReceiver>, ) { + let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE); + let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); + assert!(uri_prefix::() .chars() .all(char::is_alphabetic)); @@ -73,9 +79,9 @@ impl AbortableBroadcastChannelBuilder { log, &metrics_registry, rt_handle, - outbound_artifacts_rx, + outbound_rx, adverts_from_peers_rx, - inbound_artifacts_tx, + inbound_tx, assembler(transport.clone()), transport, topology_watcher, @@ -92,6 +98,7 @@ impl AbortableBroadcastChannelBuilder { ); self.clients.push(Box::new(builder)); + (outbound_tx, inbound_rx) } pub fn router(&mut self) -> Router { diff --git a/rs/p2p/test_utils/src/lib.rs b/rs/p2p/test_utils/src/lib.rs index 880e795bc28..4d3cc8714ca 100644 --- a/rs/p2p/test_utils/src/lib.rs +++ b/rs/p2p/test_utils/src/lib.rs @@ -453,27 +453,27 @@ pub fn start_consensus_manager( ) { let _enter = rt_handle.enter(); let pool = Arc::new(RwLock::new(processor)); - let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) = - start_test_processor(pool.clone(), pool.clone().read().unwrap().clone()); let bouncer_factory = Arc::new(pool.clone().read().unwrap().clone()); - let mut cm1 = ic_consensus_manager::AbortableBroadcastChannelBuilder::new( + let downloader = FetchArtifact::new( log.clone(), rt_handle.clone(), + pool.clone(), + bouncer_factory, MetricsRegistry::default(), ); - let downloader = FetchArtifact::new( - log, - rt_handle, - pool, - bouncer_factory, + + let mut cm1 = ic_consensus_manager::AbortableBroadcastChannelBuilder::new( + log.clone(), + rt_handle.clone(), MetricsRegistry::default(), ); - cm1.add_client( - artifact_manager_event_rx, - artifact_sender, + let (outbound_tx, inbound_rx) = cm1.add_client( downloader, usize::MAX, ); + + let artifact_processor_jh = + start_test_processor(outbound_tx, inbound_rx, pool.clone(), pool.clone().read().unwrap().clone()); (artifact_processor_jh, cm1) } diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index 2017d250b6b..70811ad390c 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -370,26 +370,26 @@ pub fn add_transport_to_sim( }; let _artifact_processor_jh = if let Some(consensus) = consensus_manager_clone { - let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) = - start_test_processor( - consensus.clone(), - consensus.clone().read().unwrap().clone(), - ); let bouncer_factory = Arc::new(consensus.clone().read().unwrap().clone()); - let downloader = FetchArtifact::new( log.clone(), tokio::runtime::Handle::current(), - consensus, + consensus.clone(), bouncer_factory, MetricsRegistry::default(), ); - consensus_builder.add_client( - artifact_manager_event_rx, - artifact_sender, + let (outbound_tx, inbound_tx) = consensus_builder.add_client( downloader, usize::MAX, ); + + let artifact_processor_jh = + start_test_processor( + outbound_tx, + inbound_tx, + consensus.clone(), + consensus.clone().read().unwrap().clone(), + ); router = Some(router.unwrap_or_default().merge(consensus_builder.router())); Some(artifact_processor_jh) @@ -442,22 +442,19 @@ pub fn waiter_fut( #[allow(clippy::type_complexity)] pub fn start_test_processor( + outbound_tx: mpsc::Sender>, + inbound_rx: mpsc::UnboundedReceiver>, pool: Arc>>, change_set_producer: TestConsensus, -) -> ( - Box, - mpsc::Receiver>, - mpsc::UnboundedSender>, -) { - let (tx, rx) = tokio::sync::mpsc::channel(1000); +) -> Box { let time_source = Arc::new(SysTimeSource::new()); let client = ic_artifact_manager::Processor::new(pool, change_set_producer); - let (jh, sender) = run_artifact_processor( + run_artifact_processor( time_source, MetricsRegistry::default(), Box::new(client), - tx, + outbound_tx, + inbound_rx, vec![], - ); - (jh, rx, sender) + ) } diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index 5226fe9faf4..18686c4570e 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -67,7 +67,6 @@ use tower_http::trace::TraceLayer; /// advertising the blocks. const HASHES_IN_BLOCKS_FEATURE_ENABLED: bool = true; -pub const MAX_ADVERT_BUFFER: usize = 100_000; /// This limit is used to protect against a malicious peer advertising many ingress messages. /// If no malicious peers are present the ingress pools are bounded by a separate limit. const SLOT_TABLE_LIMIT_INGRESS: usize = 50_000; @@ -367,6 +366,15 @@ fn start_consensus( }; let ingress_sender = { + let bouncer = Arc::new(IngressBouncer::new(time_source.clone())); + let assembler = ic_artifact_downloader::FetchArtifact::new( + log.clone(), + rt_handle.clone(), + artifact_pools.ingress_pool.clone(), + bouncer, + metrics_registry.clone(), + ); + let (ingress_tx, ingress_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); // Create the ingress client. let (client, jh) = create_ingress_handlers( @@ -379,15 +387,6 @@ fn start_consensus( join_handles.push(jh); - let bouncer = Arc::new(IngressBouncer::new(time_source.clone())); - let assembler = ic_artifact_downloader::FetchArtifact::new( - log.clone(), - rt_handle.clone(), - artifact_pools.ingress_pool.clone(), - bouncer, - metrics_registry.clone(), - ); - new_p2p_consensus.add_client( ingress_rx, client.clone(), @@ -408,29 +407,38 @@ fn start_consensus( log.clone(), max_certified_height_tx, ); + let bouncer = CertifierBouncer::new(metrics_registry, Arc::clone(&consensus_pool_cache)); + let assembler = ic_artifact_downloader::FetchArtifact::new( + log.clone(), + rt_handle.clone(), + artifact_pools.certification_pool.clone(), + Arc::new(bouncer), + metrics_registry.clone(), + ); + let (certification_tx, certification_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); // Create the certification client. let (client, jh) = create_artifact_handler( certification_tx, certifier, Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.certification_pool), + artifact_pools.certification_pool, metrics_registry.clone(), ); join_handles.push(jh); + new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + }; - let bouncer = CertifierBouncer::new(metrics_registry, Arc::clone(&consensus_pool_cache)); + { + let bouncer = Arc::new(dkg::DkgBouncer::new(metrics_registry)); let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), rt_handle.clone(), - artifact_pools.certification_pool, - Arc::new(bouncer), + artifact_pools.dkg_pool.clone(), + bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT); - }; - { let (dkg_tx, dkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); // Create the DKG client. let (client, jh) = create_artifact_handler( @@ -444,19 +452,11 @@ fn start_consensus( log.clone(), ), Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.dkg_pool), + artifact_pools.dkg_pool, metrics_registry.clone(), ); join_handles.push(jh); - let bouncer = Arc::new(dkg::DkgBouncer::new(metrics_registry)); - let assembler = ic_artifact_downloader::FetchArtifact::new( - log.clone(), - rt_handle.clone(), - artifact_pools.dkg_pool, - bouncer, - metrics_registry.clone(), - ); new_p2p_consensus.add_client(dkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; @@ -474,6 +474,20 @@ fn start_consensus( finalized.payload.as_ref().is_summary(), finalized.payload.as_ref().as_idkg().is_some(), ); + let bouncer = Arc::new(idkg::IDkgBouncer::new( + metrics_registry, + subnet_id, + consensus_pool.read().unwrap().get_block_cache(), + Arc::clone(&state_reader), + )); + let assembler = ic_artifact_downloader::FetchArtifact::new( + log.clone(), + rt_handle.clone(), + artifact_pools.idkg_pool.clone(), + bouncer, + metrics_registry.clone(), + ); + let (idkg_tx, idkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); let (client, jh) = create_artifact_handler( @@ -488,32 +502,34 @@ fn start_consensus( malicious_flags, ), Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.idkg_pool), + artifact_pools.idkg_pool, metrics_registry.clone(), ); join_handles.push(jh); - let bouncer = Arc::new(idkg::IDkgBouncer::new( - metrics_registry, - subnet_id, - consensus_pool.read().unwrap().get_block_cache(), + new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + }; + + { + let bouncer = Arc::new(CanisterHttpGossipImpl::new( + Arc::clone(&consensus_pool_cache), Arc::clone(&state_reader), + log.clone(), )); let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), rt_handle.clone(), - artifact_pools.idkg_pool, + artifact_pools.canister_http_pool.clone(), bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); - }; - { - let (http_outcalls_tx, http_outcalls_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); + let (outbound_tx, inbound_rx) = new_p2p_consensus.add_client( assembler, SLOT_TABLE_NO_LIMIT); + let (client, jh) = create_artifact_handler( http_outcalls_tx, + inbound_rx, CanisterHttpPoolManagerImpl::new( Arc::clone(&state_reader), Arc::new(Mutex::new(canister_http_adapter_client)), @@ -525,24 +541,11 @@ fn start_consensus( log.clone(), ), Arc::clone(&time_source) as Arc<_>, - Arc::clone(&artifact_pools.canister_http_pool), + artifact_pools.canister_http_pool, metrics_registry.clone(), ); join_handles.push(jh); - let bouncer = Arc::new(CanisterHttpGossipImpl::new( - Arc::clone(&consensus_pool_cache), - Arc::clone(&state_reader), - log.clone(), - )); - let assembler = ic_artifact_downloader::FetchArtifact::new( - log.clone(), - rt_handle.clone(), - artifact_pools.canister_http_pool, - bouncer, - metrics_registry.clone(), - ); - new_p2p_consensus.add_client(http_outcalls_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; ( From 911abfee6621629e00342b185390bcc41f837539 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 11:27:03 +0000 Subject: [PATCH 03/15] . --- rs/replica/setup_ic_network/src/lib.rs | 87 ++++++++++++-------------- 1 file changed, 40 insertions(+), 47 deletions(-) diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index 18686c4570e..a28a0d48586 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -329,20 +329,8 @@ fn start_consensus( let consensus_pool = Arc::clone(&consensus_pool); - let (consensus_tx, consensus_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); - // Create the consensus client. - let (client, jh) = create_artifact_handler( - consensus_tx, - consensus_impl, - time_source.clone(), - consensus_pool.clone(), - metrics_registry.clone(), - ); - - join_handles.push(jh); - let bouncer = Arc::new(ConsensusBouncer::new(metrics_registry, message_router)); - if HASHES_IN_BLOCKS_FEATURE_ENABLED { + let (outbound_tx, inbound_rx) = if HASHES_IN_BLOCKS_FEATURE_ENABLED { let assembler = ic_artifact_downloader::FetchStrippedConsensusArtifact::new( log.clone(), rt_handle.clone(), @@ -352,7 +340,7 @@ fn start_consensus( metrics_registry.clone(), node_id, ); - new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT) } else { let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), @@ -361,8 +349,20 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT) }; + + // Create the consensus client. + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, + consensus_impl, + time_source.clone(), + consensus_pool.clone(), + metrics_registry.clone(), + ); + + join_handles.push(jh); }; let ingress_sender = { @@ -375,25 +375,19 @@ fn start_consensus( metrics_registry.clone(), ); - let (ingress_tx, ingress_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); + let (outbound_tx, inbound_rx) = + new_p2p_consensus.add_client(assembler, SLOT_TABLE_LIMIT_INGRESS); // Create the ingress client. - let (client, jh) = create_ingress_handlers( - ingress_tx, + let jh = create_ingress_handlers( + outbound_tx.clone(), + inbound_rx, Arc::clone(&time_source) as Arc<_>, Arc::clone(&artifact_pools.ingress_pool), ingress_manager, metrics_registry.clone(), ); - join_handles.push(jh); - - new_p2p_consensus.add_client( - ingress_rx, - client.clone(), - assembler, - SLOT_TABLE_LIMIT_INGRESS, - ); - client + outbound_tx }; { @@ -416,17 +410,18 @@ fn start_consensus( metrics_registry.clone(), ); - let (certification_tx, certification_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); + let (outbound_tx, inbound_rx) = + new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); // Create the certification client. - let (client, jh) = create_artifact_handler( - certification_tx, + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, certifier, Arc::clone(&time_source) as Arc<_>, artifact_pools.certification_pool, metrics_registry.clone(), ); join_handles.push(jh); - new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -439,10 +434,12 @@ fn start_consensus( metrics_registry.clone(), ); - let (dkg_tx, dkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); + let (outbound_tx, inbound_rx) = + new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); // Create the DKG client. - let (client, jh) = create_artifact_handler( - dkg_tx, + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, dkg::DkgImpl::new( node_id, Arc::clone(&consensus_crypto), @@ -456,10 +453,7 @@ fn start_consensus( metrics_registry.clone(), ); join_handles.push(jh); - - new_p2p_consensus.add_client(dkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; - { let finalized = consensus_pool_cache.finalized_block(); let chain_key_config = @@ -488,10 +482,12 @@ fn start_consensus( metrics_registry.clone(), ); - let (idkg_tx, idkg_rx) = tokio::sync::mpsc::channel(MAX_ADVERT_BUFFER); + let (outbound_tx, inbound_rx) = + new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); - let (client, jh) = create_artifact_handler( - idkg_tx, + let jh = create_artifact_handler( + outbound_tx, + inbound_rx, idkg::IDkgImpl::new( node_id, consensus_pool.read().unwrap().get_block_cache(), @@ -505,10 +501,7 @@ fn start_consensus( artifact_pools.idkg_pool, metrics_registry.clone(), ); - join_handles.push(jh); - - new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -525,10 +518,11 @@ fn start_consensus( metrics_registry.clone(), ); - let (outbound_tx, inbound_rx) = new_p2p_consensus.add_client( assembler, SLOT_TABLE_NO_LIMIT); + let (outbound_tx, inbound_rx) = + new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); - let (client, jh) = create_artifact_handler( - http_outcalls_tx, + let jh = create_artifact_handler( + outbound_tx, inbound_rx, CanisterHttpPoolManagerImpl::new( Arc::clone(&state_reader), @@ -545,7 +539,6 @@ fn start_consensus( metrics_registry.clone(), ); join_handles.push(jh); - }; ( From af780e1b89b80408d3dcf3bccc9f6ec786783c85 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 12:22:15 +0000 Subject: [PATCH 04/15] . --- rs/p2p/consensus_manager/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index f85b80b0d0d..63ca840966b 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -61,6 +61,8 @@ impl AbortableBroadcastChannelBuilder { ) -> ( Sender>, UnboundedReceiver>, + // TODO: remove this by introducing a new channel from the http handler into the processor + UnboundedSender>, ) { let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE); let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -81,7 +83,7 @@ impl AbortableBroadcastChannelBuilder { rt_handle, outbound_rx, adverts_from_peers_rx, - inbound_tx, + inbound_tx.clone(), assembler(transport.clone()), transport, topology_watcher, @@ -98,7 +100,7 @@ impl AbortableBroadcastChannelBuilder { ); self.clients.push(Box::new(builder)); - (outbound_tx, inbound_rx) + (outbound_tx, inbound_rx, inbound_tx) } pub fn router(&mut self) -> Router { From 5fc618f5e59287081eaf5c87b383e5ddafebc538 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 12:26:56 +0000 Subject: [PATCH 05/15] . --- rs/p2p/consensus_manager/src/lib.rs | 5 +++-- rs/p2p/test_utils/src/lib.rs | 13 +++++++------ rs/p2p/test_utils/src/turmoil.rs | 21 +++++++++------------ rs/replica/setup_ic_network/src/lib.rs | 14 +++++++------- 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 63ca840966b..f9275af6369 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -76,6 +76,7 @@ impl AbortableBroadcastChannelBuilder { let rt_handle = self.rt_handle.clone(); let metrics_registry = self.metrics_registry.clone(); + let inbound_tx_c = inbound_tx.clone(); let builder = move |transport: Arc, topology_watcher| { start_consensus_manager( log, @@ -83,7 +84,7 @@ impl AbortableBroadcastChannelBuilder { rt_handle, outbound_rx, adverts_from_peers_rx, - inbound_tx.clone(), + inbound_tx, assembler(transport.clone()), transport, topology_watcher, @@ -100,7 +101,7 @@ impl AbortableBroadcastChannelBuilder { ); self.clients.push(Box::new(builder)); - (outbound_tx, inbound_rx, inbound_tx) + (outbound_tx, inbound_rx, inbound_tx_c) } pub fn router(&mut self) -> Router { diff --git a/rs/p2p/test_utils/src/lib.rs b/rs/p2p/test_utils/src/lib.rs index 4d3cc8714ca..53e0930e262 100644 --- a/rs/p2p/test_utils/src/lib.rs +++ b/rs/p2p/test_utils/src/lib.rs @@ -467,13 +467,14 @@ pub fn start_consensus_manager( rt_handle.clone(), MetricsRegistry::default(), ); - let (outbound_tx, inbound_rx) = cm1.add_client( - downloader, - usize::MAX, - ); + let (outbound_tx, inbound_rx, _) = cm1.add_client(downloader, usize::MAX); - let artifact_processor_jh = - start_test_processor(outbound_tx, inbound_rx, pool.clone(), pool.clone().read().unwrap().clone()); + let artifact_processor_jh = start_test_processor( + outbound_tx, + inbound_rx, + pool.clone(), + pool.clone().read().unwrap().clone(), + ); (artifact_processor_jh, cm1) } diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index 70811ad390c..a834825c37f 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -378,18 +378,15 @@ pub fn add_transport_to_sim( bouncer_factory, MetricsRegistry::default(), ); - let (outbound_tx, inbound_tx) = consensus_builder.add_client( - downloader, - usize::MAX, - ); + let (outbound_tx, inbound_tx, _) = + consensus_builder.add_client(downloader, usize::MAX); - let artifact_processor_jh = - start_test_processor( - outbound_tx, - inbound_tx, - consensus.clone(), - consensus.clone().read().unwrap().clone(), - ); + let artifact_processor_jh = start_test_processor( + outbound_tx, + inbound_tx, + consensus.clone(), + consensus.clone().read().unwrap().clone(), + ); router = Some(router.unwrap_or_default().merge(consensus_builder.router())); Some(artifact_processor_jh) @@ -442,7 +439,7 @@ pub fn waiter_fut( #[allow(clippy::type_complexity)] pub fn start_test_processor( - outbound_tx: mpsc::Sender>, + outbound_tx: mpsc::Sender>, inbound_rx: mpsc::UnboundedReceiver>, pool: Arc>>, change_set_producer: TestConsensus, diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index a28a0d48586..b92e4dc7518 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -330,7 +330,7 @@ fn start_consensus( let consensus_pool = Arc::clone(&consensus_pool); let bouncer = Arc::new(ConsensusBouncer::new(metrics_registry, message_router)); - let (outbound_tx, inbound_rx) = if HASHES_IN_BLOCKS_FEATURE_ENABLED { + let (outbound_tx, inbound_rx, _) = if HASHES_IN_BLOCKS_FEATURE_ENABLED { let assembler = ic_artifact_downloader::FetchStrippedConsensusArtifact::new( log.clone(), rt_handle.clone(), @@ -375,11 +375,11 @@ fn start_consensus( metrics_registry.clone(), ); - let (outbound_tx, inbound_rx) = + let (outbound_tx, inbound_rx, inbound_tx) = new_p2p_consensus.add_client(assembler, SLOT_TABLE_LIMIT_INGRESS); // Create the ingress client. let jh = create_ingress_handlers( - outbound_tx.clone(), + outbound_tx, inbound_rx, Arc::clone(&time_source) as Arc<_>, Arc::clone(&artifact_pools.ingress_pool), @@ -387,7 +387,7 @@ fn start_consensus( metrics_registry.clone(), ); join_handles.push(jh); - outbound_tx + inbound_tx }; { @@ -410,7 +410,7 @@ fn start_consensus( metrics_registry.clone(), ); - let (outbound_tx, inbound_rx) = + let (outbound_tx, inbound_rx, _) = new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); // Create the certification client. let jh = create_artifact_handler( @@ -434,7 +434,7 @@ fn start_consensus( metrics_registry.clone(), ); - let (outbound_tx, inbound_rx) = + let (outbound_tx, inbound_rx, _) = new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); // Create the DKG client. let jh = create_artifact_handler( @@ -518,7 +518,7 @@ fn start_consensus( metrics_registry.clone(), ); - let (outbound_tx, inbound_rx) = + let (outbound_tx, inbound_rx, _) = new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); let jh = create_artifact_handler( From b65761c07e00185c55eedf61ce7b0d7b13d6ed45 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 12:35:18 +0000 Subject: [PATCH 06/15] . --- rs/p2p/artifact_manager/src/lib.rs | 8 ++++---- rs/replica/setup_ic_network/src/lib.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rs/p2p/artifact_manager/src/lib.rs b/rs/p2p/artifact_manager/src/lib.rs index 177619f570b..d666d98092f 100644 --- a/rs/p2p/artifact_manager/src/lib.rs +++ b/rs/p2p/artifact_manager/src/lib.rs @@ -20,13 +20,11 @@ use std::{ time::Duration, }; use tokio::{ - sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, + sync::mpsc::{Sender, UnboundedReceiver}, time::timeout, }; use tracing::instrument; -type ArtifactEventSender = UnboundedSender>; - /// Metrics for a client artifact processor. struct ArtifactProcessorMetrics { /// The processing time histogram. @@ -243,7 +241,7 @@ pub fn create_ingress_handlers< + Sync, >, metrics_registry: MetricsRegistry, -) -> Box { +) -> Box { let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler); run_artifact_processor( time_source.clone(), @@ -456,11 +454,13 @@ mod tests { let time_source = Arc::new(SysTimeSource::new()); let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100); + let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); run_artifact_processor::( time_source, MetricsRegistry::default(), Box::new(DummyProcessor), send_tx, + inbound_rx, (0..10).map(Into::into).collect(), ); diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index b92e4dc7518..eb6178b7b3e 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -334,7 +334,7 @@ fn start_consensus( let assembler = ic_artifact_downloader::FetchStrippedConsensusArtifact::new( log.clone(), rt_handle.clone(), - consensus_pool, + consensus_pool.clone(), artifact_pools.ingress_pool.clone(), bouncer, metrics_registry.clone(), @@ -345,7 +345,7 @@ fn start_consensus( let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), rt_handle.clone(), - consensus_pool, + consensus_pool.clone(), bouncer, metrics_registry.clone(), ); @@ -358,7 +358,7 @@ fn start_consensus( inbound_rx, consensus_impl, time_source.clone(), - consensus_pool.clone(), + consensus_pool, metrics_registry.clone(), ); @@ -482,7 +482,7 @@ fn start_consensus( metrics_registry.clone(), ); - let (outbound_tx, inbound_rx) = + let (outbound_tx, inbound_rx, _) = new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); let jh = create_artifact_handler( From 960fbd48cb12133750d8dc86df2f356d68c7cf28 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 12:36:24 +0000 Subject: [PATCH 07/15] . --- rs/p2p/test_utils/src/lib.rs | 2 +- rs/p2p/test_utils/src/turmoil.rs | 2 +- rs/replica/setup_ic_network/src/lib.rs | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/rs/p2p/test_utils/src/lib.rs b/rs/p2p/test_utils/src/lib.rs index 53e0930e262..4135223ca9c 100644 --- a/rs/p2p/test_utils/src/lib.rs +++ b/rs/p2p/test_utils/src/lib.rs @@ -467,7 +467,7 @@ pub fn start_consensus_manager( rt_handle.clone(), MetricsRegistry::default(), ); - let (outbound_tx, inbound_rx, _) = cm1.add_client(downloader, usize::MAX); + let (outbound_tx, inbound_rx, _) = cm1.abortable_broadcast_channel(downloader, usize::MAX); let artifact_processor_jh = start_test_processor( outbound_tx, diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index a834825c37f..c484641fb89 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -379,7 +379,7 @@ pub fn add_transport_to_sim( MetricsRegistry::default(), ); let (outbound_tx, inbound_tx, _) = - consensus_builder.add_client(downloader, usize::MAX); + consensus_builder.abortable_broadcast_channel(downloader, usize::MAX); let artifact_processor_jh = start_test_processor( outbound_tx, diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index eb6178b7b3e..d67872ee996 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -340,7 +340,7 @@ fn start_consensus( metrics_registry.clone(), node_id, ); - new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT) + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT) } else { let assembler = ic_artifact_downloader::FetchArtifact::new( log.clone(), @@ -349,7 +349,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT) + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT) }; // Create the consensus client. @@ -376,7 +376,7 @@ fn start_consensus( ); let (outbound_tx, inbound_rx, inbound_tx) = - new_p2p_consensus.add_client(assembler, SLOT_TABLE_LIMIT_INGRESS); + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_LIMIT_INGRESS); // Create the ingress client. let jh = create_ingress_handlers( outbound_tx, @@ -411,7 +411,7 @@ fn start_consensus( ); let (outbound_tx, inbound_rx, _) = - new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); // Create the certification client. let jh = create_artifact_handler( outbound_tx, @@ -435,7 +435,7 @@ fn start_consensus( ); let (outbound_tx, inbound_rx, _) = - new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); // Create the DKG client. let jh = create_artifact_handler( outbound_tx, @@ -483,7 +483,7 @@ fn start_consensus( ); let (outbound_tx, inbound_rx, _) = - new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); let jh = create_artifact_handler( outbound_tx, @@ -519,7 +519,7 @@ fn start_consensus( ); let (outbound_tx, inbound_rx, _) = - new_p2p_consensus.add_client(assembler, SLOT_TABLE_NO_LIMIT); + new_p2p_consensus.abortable_broadcast_channel(assembler, SLOT_TABLE_NO_LIMIT); let jh = create_artifact_handler( outbound_tx, From 05a45f6dd76e622ad439ae05a88a51a7646771eb Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 12:38:51 +0000 Subject: [PATCH 08/15] . --- rs/p2p/artifact_manager/src/lib.rs | 1 + rs/p2p/consensus_manager/src/lib.rs | 7 +++++++ rs/p2p/consensus_manager/src/receiver.rs | 1 + 3 files changed, 9 insertions(+) diff --git a/rs/p2p/artifact_manager/src/lib.rs b/rs/p2p/artifact_manager/src/lib.rs index d666d98092f..a378880184d 100644 --- a/rs/p2p/artifact_manager/src/lib.rs +++ b/rs/p2p/artifact_manager/src/lib.rs @@ -454,6 +454,7 @@ mod tests { let time_source = Arc::new(SysTimeSource::new()); let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100); + #[allow(clippy::disallowed_methods)] let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); run_artifact_processor::( time_source, diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index f9275af6369..3fcae612122 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -65,6 +65,13 @@ impl AbortableBroadcastChannelBuilder { UnboundedSender>, ) { let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE); + // Making this channel bounded can be problematic since we don't have true multiplexing + // of P2P messages. + // Possible scenario is - adverts+chunks arrive on the same channel, slow consensus + // will result on slow consuption of chunks. Slow consumption of chunks will in turn + // result in slower consumptions of adverts. Ideally adverts are consumed at rate + // independent of consensus. + #[allow(clippy::disallowed_methods)] let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); assert!(uri_prefix::() diff --git a/rs/p2p/consensus_manager/src/receiver.rs b/rs/p2p/consensus_manager/src/receiver.rs index a448158b3d5..2d1a7e31ae1 100644 --- a/rs/p2p/consensus_manager/src/receiver.rs +++ b/rs/p2p/consensus_manager/src/receiver.rs @@ -652,6 +652,7 @@ mod tests { fn new() -> Self { let (_, adverts_received) = tokio::sync::mpsc::channel(100); + #[allow(clippy::disallowed_methods)] let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel(); let (_, topology_watcher) = watch::channel(SubnetTopology::default()); let artifact_assembler = From 4c2c66e24bdd27669f52e5698937e057bb2dd766 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 12:54:16 +0000 Subject: [PATCH 09/15] . --- rs/p2p/consensus_manager/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 3fcae612122..0853f66c7d8 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -28,7 +28,7 @@ mod sender; type StartConsensusManagerFn = Box, watch::Receiver) -> Vec>; -pub const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; +const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; pub struct AbortableBroadcastChannelBuilder { log: ReplicaLogger, From b2424c56eeb654b2bdf50745baf07c0607ba99e5 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 12:58:20 +0000 Subject: [PATCH 10/15] . --- rs/p2p/consensus_manager/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 0853f66c7d8..0d1d238538a 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -49,7 +49,8 @@ impl AbortableBroadcastChannelBuilder { } } - pub fn add_client< + #[allow(clippy::type_complexity)] + pub fn abortable_broadcast_channel< Artifact: IdentifiableArtifact, WireArtifact: PbArtifact, F: FnOnce(Arc) -> D + 'static, From 23fac7ca09a9f919f69a672c3b28e26c3c3def84 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 18 Dec 2024 16:43:11 +0000 Subject: [PATCH 11/15] . --- rs/p2p/consensus_manager/src/lib.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 0d1d238538a..c25a953afda 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -30,6 +30,9 @@ type StartConsensusManagerFn = const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; +pub type AbortableBroadcastSender = Sender>; +pub type AbortableBroadcastReceiver = UnboundedReceiver>; + pub struct AbortableBroadcastChannelBuilder { log: ReplicaLogger, metrics_registry: MetricsRegistry, @@ -49,7 +52,6 @@ impl AbortableBroadcastChannelBuilder { } } - #[allow(clippy::type_complexity)] pub fn abortable_broadcast_channel< Artifact: IdentifiableArtifact, WireArtifact: PbArtifact, @@ -60,8 +62,8 @@ impl AbortableBroadcastChannelBuilder { (assembler, assembler_router): (F, Router), slot_limit: usize, ) -> ( - Sender>, - UnboundedReceiver>, + AbortableBroadcastSender, + AbortableBroadcastReceiver, // TODO: remove this by introducing a new channel from the http handler into the processor UnboundedSender>, ) { From 0ca0bd6a969ad943fef84811b6ed8c9356742a12 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Fri, 20 Dec 2024 12:06:53 +0000 Subject: [PATCH 12/15] . --- rs/p2p/consensus_manager/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index c25a953afda..92e06e6de91 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -52,6 +52,7 @@ impl AbortableBroadcastChannelBuilder { } } + /// Creates a channel for the corresponding artifact. Tha channel is used to broadcast artifacts within the subnet. pub fn abortable_broadcast_channel< Artifact: IdentifiableArtifact, WireArtifact: PbArtifact, From 35cd331eca70cecd75ea759f5bc41541e7f25b50 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Mon, 13 Jan 2025 11:09:24 +0100 Subject: [PATCH 13/15] Update rs/p2p/consensus_manager/src/lib.rs Co-authored-by: Daniel Sharifi <40335219+DSharifi@users.noreply.github.com> --- rs/p2p/consensus_manager/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 92e06e6de91..cd1e9bbe5e1 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -52,7 +52,7 @@ impl AbortableBroadcastChannelBuilder { } } - /// Creates a channel for the corresponding artifact. Tha channel is used to broadcast artifacts within the subnet. + /// Creates a channel for the corresponding artifact. The channel is used to broadcast artifacts within the subnet. pub fn abortable_broadcast_channel< Artifact: IdentifiableArtifact, WireArtifact: PbArtifact, From 1231415e51da0977152c2d10a98b5c0509ea4fda Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Mon, 13 Jan 2025 10:15:21 +0000 Subject: [PATCH 14/15] . --- rs/p2p/artifact_manager/src/lib.rs | 10 +++++----- rs/p2p/consensus_manager/src/lib.rs | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/rs/p2p/artifact_manager/src/lib.rs b/rs/p2p/artifact_manager/src/lib.rs index a378880184d..92949b68976 100644 --- a/rs/p2p/artifact_manager/src/lib.rs +++ b/rs/p2p/artifact_manager/src/lib.rs @@ -135,7 +135,7 @@ pub fn run_artifact_processor( metrics_registry: MetricsRegistry, client: Box>, outbound_tx: Sender>, - inbound_tx: UnboundedReceiver>, + inbound_rx: UnboundedReceiver>, initial_artifacts: Vec, ) -> Box { let shutdown = Arc::new(AtomicBool::new(false)); @@ -154,7 +154,7 @@ pub fn run_artifact_processor( time_source, client, outbound_tx, - inbound_tx, + inbound_rx, ArtifactProcessorMetrics::new(metrics_registry, Artifact::NAME.to_string()), shutdown_cl, ); @@ -230,7 +230,7 @@ pub fn create_ingress_handlers< PoolIngress: MutablePool + Send + Sync + ValidatedPoolReader + 'static, >( outbound_tx: Sender>, - inbound_tx: UnboundedReceiver>, + inbound_rx: UnboundedReceiver>, time_source: Arc, ingress_pool: Arc>, ingress_handler: Arc< @@ -248,12 +248,12 @@ pub fn create_ingress_handlers< metrics_registry, Box::new(client), outbound_tx, - inbound_tx, + inbound_rx, vec![], ) } -/// Starts the event loop that pools consensus for updates on what needs to be replicated. +/// Starts the event loop that polls consensus for updates on what needs to be replicated. pub fn create_artifact_handler< Artifact: IdentifiableArtifact + Send + Sync + 'static, Pool: MutablePool + Send + Sync + ValidatedPoolReader + 'static, diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 92e06e6de91..47c86c4b532 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -28,6 +28,7 @@ mod sender; type StartConsensusManagerFn = Box, watch::Receiver) -> Vec>; +/// Same order of magnitude as the number of active artifacts. const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; pub type AbortableBroadcastSender = Sender>; From 2ab2afb76e6108290cc0ebbc4e49a2d823a00ea5 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Mon, 13 Jan 2025 12:03:32 +0000 Subject: [PATCH 15/15] . --- rs/p2p/consensus_manager/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 8b3399f8a44..3b957c05fde 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -28,7 +28,7 @@ mod sender; type StartConsensusManagerFn = Box, watch::Receiver) -> Vec>; -/// Same order of magnitude as the number of active artifacts. +/// Same order of magnitude as the number of active artifacts. const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; pub type AbortableBroadcastSender = Sender>;