diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 84ec75cf03d..68c693d3196 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,6 +23,7 @@ use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, }; +use crate::data_column_custody_tracker::DataColumnCustodyTracker; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; @@ -471,6 +472,8 @@ pub struct BeaconChain { pub light_client_server_cache: LightClientServerCache, /// Sender to signal the light_client server to produce new updates pub light_client_server_tx: Option>>, + /// Used to keep track of column custody requirements for a given epoch + pub data_column_custody_tracker: Arc, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index dace8f6bfb4..b9d0194f252 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -974,6 +974,7 @@ where early_attester_cache: <_>::default(), light_client_server_cache: LightClientServerCache::new(), light_client_server_tx: self.light_client_server_tx, + data_column_custody_tracker: <_>::default(), shutdown_sender: self .shutdown_sender .ok_or("Cannot build without a shutdown sender.")?, diff --git a/beacon_node/beacon_chain/src/data_column_custody_tracker.rs b/beacon_node/beacon_chain/src/data_column_custody_tracker.rs new file mode 100644 index 00000000000..ba5de6aac96 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_column_custody_tracker.rs @@ -0,0 +1,21 @@ +use parking_lot::RwLock; + +/// Maintains a list of data column custody requirements for a given epoch. +/// +/// Each time the node transitions to a new epoch, `register_epoch` must be called to populate +/// custody requirements for the new epoch. +#[derive(Default, Debug)] +pub struct DataColumnCustodyTracker { + data_column_ids: RwLock>, +} + +impl DataColumnCustodyTracker { + pub fn set_custody_requirements(&self, data_column_ids: Vec) { + let mut write_guard = self.data_column_ids.write(); + *write_guard = data_column_ids; + } + + pub fn get_custody_requirements(&self) -> Vec { + self.data_column_ids.read().clone() + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 67ccdeaacbf..7cbf87f8440 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -18,6 +18,7 @@ pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; pub mod data_availability_checker; +pub mod data_column_custody_tracker; pub mod data_column_verification; pub mod deneb_readiness; mod early_attester_cache; diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 226c5f7bade..ead5a6de107 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -6,7 +6,7 @@ use crate::router::{Router, RouterMessage}; use crate::subnet_service::SyncCommitteeService; use crate::{error, metrics}; use crate::{ - subnet_service::{AttestationService, SubnetServiceMessage}, + subnet_service::{AttestationService, DataColumnService, SubnetServiceMessage}, NetworkConfig, }; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -173,6 +173,8 @@ pub struct NetworkService { attestation_service: AttestationService, /// A sync committeee subnet manager service. sync_committee_service: SyncCommitteeService, + /// A data column subnet manager service. + _data_column_service: DataColumnService, /// The receiver channel for lighthouse to communicate with the network service. network_recv: mpsc::UnboundedReceiver>, /// The receiver channel for lighthouse to send validator subscription requests. @@ -326,10 +328,15 @@ impl NetworkService { config, &network_log, ); + // sync committee subnet service let sync_committee_service = SyncCommitteeService::new(beacon_chain.clone(), config, &network_log); + // data column subnet service + let data_column_service = + DataColumnService::new(beacon_chain.clone(), network_globals.clone(), &network_log); + // create a timer for updating network metrics let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL)); @@ -348,6 +355,7 @@ impl NetworkService { libp2p, attestation_service, sync_committee_service, + _data_column_service: data_column_service, network_recv, validator_subscription_recv, router_send, diff --git a/beacon_node/network/src/subnet_service/data_column_subnets.rs b/beacon_node/network/src/subnet_service/data_column_subnets.rs new file mode 100644 index 00000000000..b85fcd68787 --- /dev/null +++ b/beacon_node/network/src/subnet_service/data_column_subnets.rs @@ -0,0 +1,55 @@ +//! This service keeps track of which data column subnets the beacon node should be subscribed to at any +//! given time. It schedules subscriptions to data column subnets and requests peer discoveries. + +use itertools::Itertools; +use std::sync::Arc; + +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::{discovery::peer_id_to_node_id, NetworkGlobals}; +use slog::o; +use types::DataColumnSubnetId; + +pub struct DataColumnService { + /// A reference to the beacon chain to process data columns. + pub(crate) _beacon_chain: Arc>, + + /// A reference to the nodes network globals + _network_globals: Arc>, + + /// The logger for the data column service. + _log: slog::Logger, +} + +impl DataColumnService { + pub fn new( + beacon_chain: Arc>, + network_globals: Arc>, + log: &slog::Logger, + ) -> Self { + let log = log.new(o!("service" => "data_column_service")); + let peer_id = network_globals.local_peer_id(); + + // TODO(das) temporary logic so we can have data column ids avail on the beacon chain + // future iteration of the data column subnet service will introduce data column rotation + // and other relevant logic. + if let Ok(node_id) = peer_id_to_node_id(&peer_id) { + let mut data_column_subnet_ids = DataColumnSubnetId::compute_subnets_for_data_column::< + T::EthSpec, + >(node_id.raw().into(), &beacon_chain.spec); + + beacon_chain + .data_column_custody_tracker + .set_custody_requirements( + data_column_subnet_ids + .by_ref() + .map(|data_column| *data_column) + .collect_vec(), + ); + } + Self { + _beacon_chain: beacon_chain, + _network_globals: network_globals, + _log: log, + } + } +} diff --git a/beacon_node/network/src/subnet_service/mod.rs b/beacon_node/network/src/subnet_service/mod.rs index 6450fc72eee..3040f919eb2 100644 --- a/beacon_node/network/src/subnet_service/mod.rs +++ b/beacon_node/network/src/subnet_service/mod.rs @@ -1,9 +1,11 @@ pub mod attestation_subnets; +pub mod data_column_subnets; pub mod sync_subnets; use lighthouse_network::{Subnet, SubnetDiscovery}; pub use attestation_subnets::AttestationService; +pub use data_column_subnets::DataColumnService; pub use sync_subnets::SyncCommitteeService; #[cfg(test)]