Skip to content

Commit

Permalink
fix: Make the consensus manager API resemble a classic channel interf…
Browse files Browse the repository at this point in the history
…ace (#3233)

The MR brings the consensus manager API closer to a normal channel API

---------

Co-authored-by: Daniel Sharifi <40335219+DSharifi@users.noreply.github.com>
  • Loading branch information
rumenov and DSharifi authored Jan 13, 2025
1 parent d469ea2 commit cf2f2cc
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 180 deletions.
67 changes: 26 additions & 41 deletions rs/p2p/artifact_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use std::{
time::Duration,
};
use tokio::{
sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
sync::mpsc::{Sender, UnboundedReceiver},
time::timeout,
};
use tracing::instrument;

type ArtifactEventSender<Artifact> = UnboundedSender<UnvalidatedArtifactMutation<Artifact>>;

/// Metrics for a client artifact processor.
struct ArtifactProcessorMetrics {
/// The processing time histogram.
Expand Down Expand Up @@ -136,45 +134,33 @@ pub fn run_artifact_processor<Artifact: IdentifiableArtifact>(
time_source: Arc<dyn TimeSource>,
metrics_registry: MetricsRegistry,
client: Box<dyn ArtifactProcessor<Artifact>>,
send_advert: Sender<ArtifactTransmit<Artifact>>,
outbound_tx: Sender<ArtifactTransmit<Artifact>>,
inbound_rx: UnboundedReceiver<UnvalidatedArtifactMutation<Artifact>>,
initial_artifacts: Vec<Artifact>,
) -> (Box<dyn JoinGuard>, ArtifactEventSender<Artifact>) {
// Making this channel bounded can be problematic since we don't have true multiplexing
// of P2P messages.
// Possible scenario is - adverts+chunks arrive on the same channel, slow consensus
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
// independent of consensus.
#[allow(clippy::disallowed_methods)]
let (sender, receiver) = unbounded_channel();
) -> Box<dyn JoinGuard> {
let shutdown = Arc::new(AtomicBool::new(false));

// Spawn the processor thread
let shutdown_cl = shutdown.clone();
let handle = ThreadBuilder::new()
.name(format!("{}_Processor", Artifact::NAME))
.spawn(move || {
for artifact in initial_artifacts {
let _ = send_advert.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt {
let _ = outbound_tx.blocking_send(ArtifactTransmit::Deliver(ArtifactWithOpt {
artifact,
is_latency_sensitive: false,
}));
}
process_messages(
time_source,
client,
send_advert,
receiver,
outbound_tx,
inbound_rx,
ArtifactProcessorMetrics::new(metrics_registry, Artifact::NAME.to_string()),
shutdown_cl,
);
})
.unwrap();

(
Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown)),
sender,
)
Box::new(ArtifactProcessorJoinGuard::new(handle, shutdown))
}

// The artifact processor thread loop
Expand Down Expand Up @@ -243,7 +229,8 @@ const ARTIFACT_MANAGER_TIMER_DURATION_MSEC: u64 = 200;
pub fn create_ingress_handlers<
PoolIngress: MutablePool<SignedIngress> + Send + Sync + ValidatedPoolReader<SignedIngress> + 'static,
>(
send_advert: Sender<ArtifactTransmit<SignedIngress>>,
outbound_tx: Sender<ArtifactTransmit<SignedIngress>>,
inbound_rx: UnboundedReceiver<UnvalidatedArtifactMutation<SignedIngress>>,
time_source: Arc<dyn TimeSource>,
ingress_pool: Arc<RwLock<PoolIngress>>,
ingress_handler: Arc<
Expand All @@ -254,46 +241,41 @@ pub fn create_ingress_handlers<
+ Sync,
>,
metrics_registry: MetricsRegistry,
) -> (
UnboundedSender<UnvalidatedArtifactMutation<SignedIngress>>,
Box<dyn JoinGuard>,
) {
) -> Box<dyn JoinGuard> {
let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler);
let (jh, sender) = run_artifact_processor(
run_artifact_processor(
time_source.clone(),
metrics_registry,
Box::new(client),
send_advert,
outbound_tx,
inbound_rx,
vec![],
);
(sender, jh)
)
}

/// Starts the event loop that pools consensus for updates on what needs to be replicated.
/// Starts the event loop that polls consensus for updates on what needs to be replicated.
pub fn create_artifact_handler<
Artifact: IdentifiableArtifact + Send + Sync + 'static,
Pool: MutablePool<Artifact> + Send + Sync + ValidatedPoolReader<Artifact> + 'static,
C: PoolMutationsProducer<Pool, Mutations = <Pool as MutablePool<Artifact>>::Mutations> + 'static,
>(
send_advert: Sender<ArtifactTransmit<Artifact>>,
outbound_tx: Sender<ArtifactTransmit<Artifact>>,
inbound_rx: UnboundedReceiver<UnvalidatedArtifactMutation<Artifact>>,
change_set_producer: C,
time_source: Arc<dyn TimeSource>,
pool: Arc<RwLock<Pool>>,
metrics_registry: MetricsRegistry,
) -> (
UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
Box<dyn JoinGuard>,
) {
) -> Box<dyn JoinGuard> {
let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect();
let client = Processor::new(pool, change_set_producer);
let (jh, sender) = run_artifact_processor(
run_artifact_processor(
time_source.clone(),
metrics_registry,
Box::new(client),
send_advert,
outbound_tx,
inbound_rx,
inital_artifacts,
);
(sender, jh)
)
}

// TODO: make it private, it is used only for tests outside of this crate
Expand Down Expand Up @@ -472,11 +454,14 @@ mod tests {

let time_source = Arc::new(SysTimeSource::new());
let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100);
#[allow(clippy::disallowed_methods)]
let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel();
run_artifact_processor::<DummyArtifact>(
time_source,
MetricsRegistry::default(),
Box::new(DummyProcessor),
send_tx,
inbound_rx,
(0..10).map(Into::into).collect(),
);

Expand Down
38 changes: 30 additions & 8 deletions rs/p2p/consensus_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use phantom_newtype::AmountOf;
use tokio::{
runtime::Handle,
sync::{
mpsc::{Receiver, UnboundedSender},
mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender},
watch,
},
};
Expand All @@ -28,15 +28,21 @@ mod sender;
type StartConsensusManagerFn =
Box<dyn FnOnce(Arc<dyn Transport>, watch::Receiver<SubnetTopology>) -> Vec<Shutdown>>;

pub struct ConsensusManagerBuilder {
/// Same order of magnitude as the number of active artifacts.
const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000;

pub type AbortableBroadcastSender<T> = Sender<ArtifactTransmit<T>>;
pub type AbortableBroadcastReceiver<T> = UnboundedReceiver<UnvalidatedArtifactMutation<T>>;

pub struct AbortableBroadcastChannelBuilder {
log: ReplicaLogger,
metrics_registry: MetricsRegistry,
rt_handle: Handle,
clients: Vec<StartConsensusManagerFn>,
router: Option<Router>,
}

impl ConsensusManagerBuilder {
impl AbortableBroadcastChannelBuilder {
pub fn new(log: ReplicaLogger, rt_handle: Handle, metrics_registry: MetricsRegistry) -> Self {
Self {
log,
Expand All @@ -47,18 +53,32 @@ impl ConsensusManagerBuilder {
}
}

pub fn add_client<
/// Creates a channel for the corresponding artifact. The channel is used to broadcast artifacts within the subnet.
pub fn abortable_broadcast_channel<
Artifact: IdentifiableArtifact,
WireArtifact: PbArtifact,
F: FnOnce(Arc<dyn Transport>) -> D + 'static,
D: ArtifactAssembler<Artifact, WireArtifact>,
>(
&mut self,
outbound_artifacts_rx: Receiver<ArtifactTransmit<Artifact>>,
inbound_artifacts_tx: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
(assembler, assembler_router): (F, Router),
slot_limit: usize,
) -> (
AbortableBroadcastSender<Artifact>,
AbortableBroadcastReceiver<Artifact>,
// TODO: remove this by introducing a new channel from the http handler into the processor
UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
) {
let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE);
// Making this channel bounded can be problematic since we don't have true multiplexing
// of P2P messages.
// Possible scenario is - adverts+chunks arrive on the same channel, slow consensus
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
// independent of consensus.
#[allow(clippy::disallowed_methods)]
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel();

assert!(uri_prefix::<WireArtifact>()
.chars()
.all(char::is_alphabetic));
Expand All @@ -68,14 +88,15 @@ impl ConsensusManagerBuilder {
let rt_handle = self.rt_handle.clone();
let metrics_registry = self.metrics_registry.clone();

let inbound_tx_c = inbound_tx.clone();
let builder = move |transport: Arc<dyn Transport>, topology_watcher| {
start_consensus_manager(
log,
&metrics_registry,
rt_handle,
outbound_artifacts_rx,
outbound_rx,
adverts_from_peers_rx,
inbound_artifacts_tx,
inbound_tx,
assembler(transport.clone()),
transport,
topology_watcher,
Expand All @@ -92,6 +113,7 @@ impl ConsensusManagerBuilder {
);

self.clients.push(Box::new(builder));
(outbound_tx, inbound_rx, inbound_tx_c)
}

pub fn router(&mut self) -> Router {
Expand Down
1 change: 1 addition & 0 deletions rs/p2p/consensus_manager/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ mod tests {

fn new() -> Self {
let (_, adverts_received) = tokio::sync::mpsc::channel(100);
#[allow(clippy::disallowed_methods)]
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel();
let (_, topology_watcher) = watch::channel(SubnetTopology::default());
let artifact_assembler =
Expand Down
29 changes: 15 additions & 14 deletions rs/p2p/test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,30 +449,31 @@ pub fn start_consensus_manager(
processor: TestConsensus<U64Artifact>,
) -> (
Box<dyn JoinGuard>,
ic_consensus_manager::ConsensusManagerBuilder,
ic_consensus_manager::AbortableBroadcastChannelBuilder,
) {
let _enter = rt_handle.enter();
let pool = Arc::new(RwLock::new(processor));
let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) =
start_test_processor(pool.clone(), pool.clone().read().unwrap().clone());
let bouncer_factory = Arc::new(pool.clone().read().unwrap().clone());
let mut cm1 = ic_consensus_manager::ConsensusManagerBuilder::new(
let downloader = FetchArtifact::new(
log.clone(),
rt_handle.clone(),
pool.clone(),
bouncer_factory,
MetricsRegistry::default(),
);
let downloader = FetchArtifact::new(
log,
rt_handle,
pool,
bouncer_factory,

let mut cm1 = ic_consensus_manager::AbortableBroadcastChannelBuilder::new(
log.clone(),
rt_handle.clone(),
MetricsRegistry::default(),
);
cm1.add_client(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
let (outbound_tx, inbound_rx, _) = cm1.abortable_broadcast_channel(downloader, usize::MAX);

let artifact_processor_jh = start_test_processor(
outbound_tx,
inbound_rx,
pool.clone(),
pool.clone().read().unwrap().clone(),
);
(artifact_processor_jh, cm1)
}
Expand Down
40 changes: 17 additions & 23 deletions rs/p2p/test_utils/src/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ pub fn add_transport_to_sim<F>(

async move {
let metrics_registry = MetricsRegistry::default();
let mut consensus_builder = ic_consensus_manager::ConsensusManagerBuilder::new(
let mut consensus_builder = ic_consensus_manager::AbortableBroadcastChannelBuilder::new(
log.clone(),
tokio::runtime::Handle::current(),
metrics_registry,
Expand All @@ -370,25 +370,22 @@ pub fn add_transport_to_sim<F>(
};

let _artifact_processor_jh = if let Some(consensus) = consensus_manager_clone {
let (artifact_processor_jh, artifact_manager_event_rx, artifact_sender) =
start_test_processor(
consensus.clone(),
consensus.clone().read().unwrap().clone(),
);
let bouncer_factory = Arc::new(consensus.clone().read().unwrap().clone());

let downloader = FetchArtifact::new(
log.clone(),
tokio::runtime::Handle::current(),
consensus,
consensus.clone(),
bouncer_factory,
MetricsRegistry::default(),
);
consensus_builder.add_client(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
let (outbound_tx, inbound_tx, _) =
consensus_builder.abortable_broadcast_channel(downloader, usize::MAX);

let artifact_processor_jh = start_test_processor(
outbound_tx,
inbound_tx,
consensus.clone(),
consensus.clone().read().unwrap().clone(),
);
router = Some(router.unwrap_or_default().merge(consensus_builder.router()));

Expand Down Expand Up @@ -442,22 +439,19 @@ pub fn waiter_fut(

#[allow(clippy::type_complexity)]
pub fn start_test_processor(
outbound_tx: mpsc::Sender<ArtifactTransmit<U64Artifact>>,
inbound_rx: mpsc::UnboundedReceiver<UnvalidatedArtifactMutation<U64Artifact>>,
pool: Arc<RwLock<TestConsensus<U64Artifact>>>,
change_set_producer: TestConsensus<U64Artifact>,
) -> (
Box<dyn JoinGuard>,
mpsc::Receiver<ArtifactTransmit<U64Artifact>>,
mpsc::UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
) {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
) -> Box<dyn JoinGuard> {
let time_source = Arc::new(SysTimeSource::new());
let client = ic_artifact_manager::Processor::new(pool, change_set_producer);
let (jh, sender) = run_artifact_processor(
run_artifact_processor(
time_source,
MetricsRegistry::default(),
Box::new(client),
tx,
outbound_tx,
inbound_rx,
vec![],
);
(jh, rx, sender)
)
}
Loading

0 comments on commit cf2f2cc

Please sign in to comment.