diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index fc29259256f..263a6eab074 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -148,6 +148,7 @@ impl RpcBlock { Self::new(Some(block_root), block, blobs) } + #[allow(clippy::type_complexity)] pub fn deconstruct( self, ) -> ( diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index c75c3f695b3..a1d2706726b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -407,6 +407,11 @@ where .init_blob_info(genesis.beacon_block.slot()) .map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?, ); + self.pending_io_batch.push( + store + .init_data_column_info(genesis.beacon_block.slot()) + .map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?, + ); let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis) .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; @@ -564,6 +569,11 @@ where .init_blob_info(weak_subj_block.slot()) .map_err(|e| format!("Failed to initialize blob info: {:?}", e))?, ); + self.pending_io_batch.push( + store + .init_data_column_info(weak_subj_block.slot()) + .map_err(|e| format!("Failed to initialize data column info: {:?}", e))?, + ); // Store pruning checkpoint to prevent attempting to prune before the anchor state. self.pending_io_batch diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 12c0a79f8d3..246daf9579d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -105,14 +105,13 @@ impl PendingComponents { }; let verified_blobs = VariableList::new(verified_blobs)?; - // TODO(das) Add check later - we don't expect data columns to be available until we transition to PeerDAS. + // TODO(das) Do we need a check here for number of expected custody columns? let verified_data_columns = verified_data_columns .into_iter() .cloned() - .map(|d| d.map(|d| d.to_data_column())) - .take(T::number_of_columns()) - .collect::>>() - .map(Into::into); + .filter_map(|d| d.map(|d| d.to_data_column())) + .collect::>() + .into(); let executed_block = recover(diet_executed_block)?; @@ -126,7 +125,7 @@ impl PendingComponents { block_root, block, blobs: Some(verified_blobs), - data_columns: verified_data_columns, + data_columns: Some(verified_data_columns), }; Ok(Availability::Available(Box::new( AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome), diff --git a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs index faa1114808b..7abbd700104 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs @@ -55,7 +55,7 @@ pub struct ProcessingComponents { /// `AvailabilityView`'s trait definition for more details. pub blob_commitments: KzgCommitmentOpts, // TODO(das): `KzgCommitments` are available in every data column sidecar, hence it may not be useful to store them - // again here and a `()` may be sufficient. + // again here and a `()` may be sufficient to indicate what we have. pub data_column_opts: FixedVector, E::DataColumnCount>, } diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 13babcfe5bb..526027b998f 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -9,6 +9,7 @@ use state_processing::{ use std::borrow::Cow; use std::iter; use std::time::Duration; +use store::metadata::DataColumnInfo; use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore}; use types::{Hash256, Slot}; @@ -66,6 +67,7 @@ impl BeaconChain { .get_anchor_info() .ok_or(HistoricalBlockError::NoAnchorInfo)?; let blob_info = self.store.get_blob_info(); + let data_column_info = self.store.get_data_column_info(); // Take all blocks with slots less than the oldest block slot. let num_relevant = blocks.partition_point(|available_block| { @@ -100,6 +102,7 @@ impl BeaconChain { let mut chunk_writer = ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; + let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import); let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); @@ -130,7 +133,7 @@ impl BeaconChain { } // Store the data columns too if let Some(data_columns) = maybe_data_columns { - // new_oldest_data_column_slot = Some(block.slot()); + new_oldest_data_column_slot = Some(block.slot()); self.store .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch); } @@ -227,6 +230,19 @@ impl BeaconChain { } } + // Update the data column info. + if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot { + if let Some(oldest_data_column_slot) = new_oldest_data_column_slot { + let new_data_column_info = DataColumnInfo { + oldest_data_column_slot: Some(oldest_data_column_slot), + }; + anchor_and_blob_batch.push( + self.store + .compare_and_set_data_column_info(data_column_info, new_data_column_info)?, + ); + } + } + // Update the anchor. let new_anchor = AnchorInfo { oldest_block_slot: prev_block_slot, diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ff201729821..cec5f22af5a 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2569,10 +2569,10 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. let mut batch_with_invalid_first_block = available_blocks.clone(); batch_with_invalid_first_block[0] = { - let (block_root, block, blobs) = available_blocks[0].clone().deconstruct(); + let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs) + AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs, data_columns) }; // Importing the invalid batch should error. diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index 0a8e06e6866..ea780e1dff7 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -104,8 +104,7 @@ impl RateLimiterConfig { pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10); pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); - // TODO(das): review quota - pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(256, 10); + pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 96e02b80ff8..7e5b539406c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -27,6 +27,8 @@ pub enum Error { AnchorInfoConcurrentMutation, /// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied. BlobInfoConcurrentMutation, + /// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied. + DataColumnInfoConcurrentMutation, /// The block or state is unavailable due to weak subjectivity sync. HistoryUnavailable, /// State reconstruction cannot commence because not all historic blocks are known. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 5019c030aee..5acd8ff8445 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -12,9 +12,10 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, - BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, - PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, + AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, + ANCHOR_INFO_KEY, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, + DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, + STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::metrics; use crate::{ @@ -58,6 +59,8 @@ pub struct HotColdDB, Cold: ItemStore> { anchor_info: RwLock>, /// The starting slots for the range of blobs stored in the database. blob_info: RwLock, + /// The starting slots for the range of data columns stored in the database. + data_column_info: RwLock, pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, @@ -192,6 +195,7 @@ impl HotColdDB, MemoryStore> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: MemoryStore::open(), blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), @@ -229,6 +233,7 @@ impl HotColdDB, LevelDB> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: LevelDB::open(cold_path)?, blobs_db: LevelDB::open(blobs_db_path)?, hot_db: LevelDB::open(hot_path)?, @@ -293,10 +298,8 @@ impl HotColdDB, LevelDB> { // Always initialize `blobs_db` to true, we no longer support storing the blobs // in the freezer DB, because the UX is strictly worse for relocating the DB. let oldest_blob_slot = blob_info.oldest_blob_slot.or(deneb_fork_slot); - let oldest_data_column_slot = blob_info.oldest_data_column_slot.or(deneb_fork_slot); BlobInfo { oldest_blob_slot, - oldest_data_column_slot, blobs_db: true, } } @@ -304,16 +307,39 @@ impl HotColdDB, LevelDB> { None => BlobInfo { // Set the oldest blob slot to the Deneb fork slot if it is not yet set. oldest_blob_slot: deneb_fork_slot, - oldest_data_column_slot: deneb_fork_slot, blobs_db: true, }, }; db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; + + let data_column_info = db.load_data_column_info()?; + let new_data_column_info = match &data_column_info { + // TODO[das]: update to EIP-7594 fork + Some(data_column_info) => { + // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + let oldest_data_column_slot = + data_column_info.oldest_data_column_slot.or(deneb_fork_slot); + DataColumnInfo { + oldest_data_column_slot, + } + } + // First start. + None => DataColumnInfo { + // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + oldest_data_column_slot: deneb_fork_slot, + }, + }; + db.compare_and_set_data_column_info_with_write( + <_>::default(), + new_data_column_info.clone(), + )?; + info!( db.log, "Blob DB initialized"; "path" => ?blobs_db_path, "oldest_blob_slot" => ?new_blob_info.oldest_blob_slot, + "oldest_data_column_slot" => ?new_data_column_info.oldest_data_column_slot, ); // Ensure that the schema version of the on-disk database matches the software. @@ -1732,13 +1758,8 @@ impl, Cold: ItemStore> HotColdDB let oldest_blob_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| { std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) }); - // TODO(das) update fork epoch - let oldest_data_column_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| { - std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) - }); let blob_info = BlobInfo { oldest_blob_slot, - oldest_data_column_slot, blobs_db: true, }; self.compare_and_set_blob_info(self.get_blob_info(), blob_info) @@ -1751,6 +1772,24 @@ impl, Cold: ItemStore> HotColdDB self.blob_info.read_recursive().clone() } + /// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint. + pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result { + let oldest_data_column_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| { + std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) + }); + let data_column_info = DataColumnInfo { + oldest_data_column_slot, + }; + self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info) + } + + /// Get a clone of the store's data column info. + /// + /// To do mutations, use `compare_and_set_data_column_info`. + pub fn get_data_column_info(&self) -> DataColumnInfo { + self.data_column_info.read_recursive().clone() + } + /// Atomically update the blob info from `prev_value` to `new_value`. /// /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other @@ -1796,6 +1835,54 @@ impl, Cold: ItemStore> HotColdDB blob_info.as_kv_store_op(BLOB_INFO_KEY) } + /// Atomically update the data column info from `prev_value` to `new_value`. + /// + /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other + /// values. + /// + /// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided + /// is not correct. + pub fn compare_and_set_data_column_info( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result { + let mut data_column_info = self.data_column_info.write(); + if *data_column_info == prev_value { + let kv_op = self.store_data_column_info_in_batch(&new_value); + *data_column_info = new_value; + Ok(kv_op) + } else { + Err(Error::DataColumnInfoConcurrentMutation) + } + } + + /// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately. + pub fn compare_and_set_data_column_info_with_write( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result<(), Error> { + let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?; + self.hot_db.do_atomically(vec![kv_store_op]) + } + + /// Load the blob info from disk, but do not set `self.data_column_info`. + fn load_data_column_info(&self) -> Result, Error> { + self.hot_db.get(&DATA_COLUMN_INFO_KEY) + } + + /// Store the given `data_column_info` to disk. + /// + /// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues + /// with recursive locking. + fn store_data_column_info_in_batch( + &self, + data_column_info: &DataColumnInfo, + ) -> KeyValueStoreOp { + data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY) + } + /// Return the slot-window describing the available historic states. /// /// Returns `(lower_limit, upper_limit)`. @@ -2306,7 +2393,6 @@ impl, Cold: ItemStore> HotColdDB let blob_lists_pruned = ops.len(); let new_blob_info = BlobInfo { oldest_blob_slot: Some(end_slot + 1), - oldest_data_column_slot: Some(end_slot + 1), blobs_db: blob_info.blobs_db, }; let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?; diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index ff850e17b52..5aada7c95a5 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); +pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); @@ -135,7 +136,6 @@ pub struct BlobInfo { /// If the `oldest_blob_slot` is `None` then this means that the Deneb fork epoch is not yet /// known. pub oldest_blob_slot: Option, - pub oldest_data_column_slot: Option, /// A separate blobs database is in use (deprecated, always `true`). pub blobs_db: bool, } @@ -153,3 +153,30 @@ impl StoreItem for BlobInfo { Ok(Self::from_ssz_bytes(bytes)?) } } + +/// Database parameters relevant to data column sync. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] +pub struct DataColumnInfo { + /// The slot after which data columns are or *will be* available (>=). + /// + /// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which + /// data columns will be available. + /// + /// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is + /// not yet known. + pub oldest_data_column_slot: Option, +} + +impl StoreItem for DataColumnInfo { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 5a607c8ec6b..e5f94bfe3ae 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1291,7 +1291,6 @@ const fn default_max_request_blob_sidecars() -> u64 { } const fn default_max_request_data_column_sidecars() -> u64 { - // TODO(das) review 16384 } diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index a6d5b83083e..559da34327b 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -22,7 +22,7 @@ use tree_hash_derive::TreeHash; pub type ColumnIndex = u64; pub type Cell = FixedVector::FieldElementsPerCell>; -pub type DataColumn = VariableList, ::MaxBlobsPerBlock>; +pub type DataColumn = VariableList, ::MaxBlobCommitmentsPerBlock>; /// Container of the data that identifies an individual data column. #[derive(