Skip to content

Commit

Permalink
Store DataColumnInfo in database and various cleanups.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Mar 1, 2024
1 parent bb1b416 commit 04c82a3
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 27 deletions.
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl<E: EthSpec> RpcBlock<E> {
Self::new(Some(block_root), block, blobs)
}

#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ where
.init_blob_info(genesis.beacon_block.slot())
.map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_data_column_info(genesis.beacon_block.slot())
.map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?,
);

let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis)
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
Expand Down Expand Up @@ -564,6 +569,11 @@ where
.init_blob_info(weak_subj_block.slot())
.map_err(|e| format!("Failed to initialize blob info: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_data_column_info(weak_subj_block.slot())
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
);

// Store pruning checkpoint to prevent attempting to prune before the anchor state.
self.pending_io_batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ impl<T: EthSpec> PendingComponents<T> {
};
let verified_blobs = VariableList::new(verified_blobs)?;

// TODO(das) Add check later - we don't expect data columns to be available until we transition to PeerDAS.
// TODO(das) Do we need a check here for number of expected custody columns?
let verified_data_columns = verified_data_columns
.into_iter()
.cloned()
.map(|d| d.map(|d| d.to_data_column()))
.take(T::number_of_columns())
.collect::<Option<Vec<_>>>()
.map(Into::into);
.filter_map(|d| d.map(|d| d.to_data_column()))
.collect::<Vec<_>>()
.into();

let executed_block = recover(diet_executed_block)?;

Expand All @@ -126,7 +125,7 @@ impl<T: EthSpec> PendingComponents<T> {
block_root,
block,
blobs: Some(verified_blobs),
data_columns: verified_data_columns,
data_columns: Some(verified_data_columns),
};
Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct ProcessingComponents<E: EthSpec> {
/// `AvailabilityView`'s trait definition for more details.
pub blob_commitments: KzgCommitmentOpts<E>,
// TODO(das): `KzgCommitments` are available in every data column sidecar, hence it may not be useful to store them
// again here and a `()` may be sufficient.
// again here and a `()` may be sufficient to indicate what we have.
pub data_column_opts: FixedVector<Option<()>, E::DataColumnCount>,
}

Expand Down
18 changes: 17 additions & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use state_processing::{
use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
use types::{Hash256, Slot};

Expand Down Expand Up @@ -66,6 +67,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

// Take all blocks with slots less than the oldest block slot.
let num_relevant = blocks.partition_point(|available_block| {
Expand Down Expand Up @@ -100,6 +102,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;

let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import);
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
Expand Down Expand Up @@ -130,7 +133,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
// Store the data columns too
if let Some(data_columns) = maybe_data_columns {
// new_oldest_data_column_slot = Some(block.slot());
new_oldest_data_column_slot = Some(block.slot());
self.store
.data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
}
Expand Down Expand Up @@ -227,6 +230,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// Update the data column info.
if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot {
if let Some(oldest_data_column_slot) = new_oldest_data_column_slot {
let new_data_column_info = DataColumnInfo {
oldest_data_column_slot: Some(oldest_data_column_slot),
};
anchor_and_blob_batch.push(
self.store
.compare_and_set_data_column_info(data_column_info, new_data_column_info)?,
);
}
}

// Update the anchor.
let new_anchor = AnchorInfo {
oldest_block_slot: prev_block_slot,
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2569,10 +2569,10 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
let mut batch_with_invalid_first_block = available_blocks.clone();
batch_with_invalid_first_block[0] = {
let (block_root, block, blobs) = available_blocks[0].clone().deconstruct();
let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct();
let mut corrupt_block = (*block).clone();
*corrupt_block.signature_mut() = Signature::empty();
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs)
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs, data_columns)
};

// Importing the invalid batch should error.
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/lighthouse_network/src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl RateLimiterConfig {
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10);
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
// TODO(das): review quota
pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(256, 10);
pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
}

Expand Down
2 changes: 2 additions & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
AnchorInfoConcurrentMutation,
/// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied.
BlobInfoConcurrentMutation,
/// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied.
DataColumnInfoConcurrentMutation,
/// The block or state is unavailable due to weak subjectivity sync.
HistoryUnavailable,
/// State reconstruction cannot commence because not all historic blocks are known.
Expand Down
110 changes: 98 additions & 12 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion,
ANCHOR_INFO_KEY, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::metrics;
use crate::{
Expand Down Expand Up @@ -58,6 +59,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
anchor_info: RwLock<Option<AnchorInfo>>,
/// The starting slots for the range of blobs stored in the database.
blob_info: RwLock<BlobInfo>,
/// The starting slots for the range of data columns stored in the database.
data_column_info: RwLock<DataColumnInfo>,
pub(crate) config: StoreConfig,
/// Cold database containing compact historical data.
pub cold_db: Cold,
Expand Down Expand Up @@ -192,6 +195,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()),
data_column_info: RwLock::new(DataColumnInfo::default()),
cold_db: MemoryStore::open(),
blobs_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
Expand Down Expand Up @@ -229,6 +233,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()),
data_column_info: RwLock::new(DataColumnInfo::default()),
cold_db: LevelDB::open(cold_path)?,
blobs_db: LevelDB::open(blobs_db_path)?,
hot_db: LevelDB::open(hot_path)?,
Expand Down Expand Up @@ -293,27 +298,48 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
// Always initialize `blobs_db` to true, we no longer support storing the blobs
// in the freezer DB, because the UX is strictly worse for relocating the DB.
let oldest_blob_slot = blob_info.oldest_blob_slot.or(deneb_fork_slot);
let oldest_data_column_slot = blob_info.oldest_data_column_slot.or(deneb_fork_slot);
BlobInfo {
oldest_blob_slot,
oldest_data_column_slot,
blobs_db: true,
}
}
// First start.
None => BlobInfo {
// Set the oldest blob slot to the Deneb fork slot if it is not yet set.
oldest_blob_slot: deneb_fork_slot,
oldest_data_column_slot: deneb_fork_slot,
blobs_db: true,
},
};
db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?;

let data_column_info = db.load_data_column_info()?;
let new_data_column_info = match &data_column_info {
// TODO[das]: update to EIP-7594 fork
Some(data_column_info) => {
// Set the oldest data column slot to the Deneb fork slot if it is not yet set.
let oldest_data_column_slot =
data_column_info.oldest_data_column_slot.or(deneb_fork_slot);
DataColumnInfo {
oldest_data_column_slot,
}
}
// First start.
None => DataColumnInfo {
// Set the oldest data column slot to the Deneb fork slot if it is not yet set.
oldest_data_column_slot: deneb_fork_slot,
},
};
db.compare_and_set_data_column_info_with_write(
<_>::default(),
new_data_column_info.clone(),
)?;

info!(
db.log,
"Blob DB initialized";
"path" => ?blobs_db_path,
"oldest_blob_slot" => ?new_blob_info.oldest_blob_slot,
"oldest_data_column_slot" => ?new_data_column_info.oldest_data_column_slot,
);

// Ensure that the schema version of the on-disk database matches the software.
Expand Down Expand Up @@ -1732,13 +1758,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let oldest_blob_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| {
std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch()))
});
// TODO(das) update fork epoch
let oldest_data_column_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| {
std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch()))
});
let blob_info = BlobInfo {
oldest_blob_slot,
oldest_data_column_slot,
blobs_db: true,
};
self.compare_and_set_blob_info(self.get_blob_info(), blob_info)
Expand All @@ -1751,6 +1772,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.blob_info.read_recursive().clone()
}

/// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint.
pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result<KeyValueStoreOp, Error> {
let oldest_data_column_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| {
std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch()))
});
let data_column_info = DataColumnInfo {
oldest_data_column_slot,
};
self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info)
}

/// Get a clone of the store's data column info.
///
/// To do mutations, use `compare_and_set_data_column_info`.
pub fn get_data_column_info(&self) -> DataColumnInfo {
self.data_column_info.read_recursive().clone()
}

/// Atomically update the blob info from `prev_value` to `new_value`.
///
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
Expand Down Expand Up @@ -1796,6 +1835,54 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blob_info.as_kv_store_op(BLOB_INFO_KEY)
}

/// Atomically update the data column info from `prev_value` to `new_value`.
///
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
/// values.
///
/// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided
/// is not correct.
pub fn compare_and_set_data_column_info(
&self,
prev_value: DataColumnInfo,
new_value: DataColumnInfo,
) -> Result<KeyValueStoreOp, Error> {
let mut data_column_info = self.data_column_info.write();
if *data_column_info == prev_value {
let kv_op = self.store_data_column_info_in_batch(&new_value);
*data_column_info = new_value;
Ok(kv_op)
} else {
Err(Error::DataColumnInfoConcurrentMutation)
}
}

/// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately.
pub fn compare_and_set_data_column_info_with_write(
&self,
prev_value: DataColumnInfo,
new_value: DataColumnInfo,
) -> Result<(), Error> {
let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?;
self.hot_db.do_atomically(vec![kv_store_op])
}

/// Load the blob info from disk, but do not set `self.data_column_info`.
fn load_data_column_info(&self) -> Result<Option<DataColumnInfo>, Error> {
self.hot_db.get(&DATA_COLUMN_INFO_KEY)
}

/// Store the given `data_column_info` to disk.
///
/// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues
/// with recursive locking.
fn store_data_column_info_in_batch(
&self,
data_column_info: &DataColumnInfo,
) -> KeyValueStoreOp {
data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY)
}

/// Return the slot-window describing the available historic states.
///
/// Returns `(lower_limit, upper_limit)`.
Expand Down Expand Up @@ -2306,7 +2393,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let blob_lists_pruned = ops.len();
let new_blob_info = BlobInfo {
oldest_blob_slot: Some(end_slot + 1),
oldest_data_column_slot: Some(end_slot + 1),
blobs_db: blob_info.blobs_db,
};
let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?;
Expand Down
29 changes: 28 additions & 1 deletion beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);

/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
Expand Down Expand Up @@ -135,7 +136,6 @@ pub struct BlobInfo {
/// If the `oldest_blob_slot` is `None` then this means that the Deneb fork epoch is not yet
/// known.
pub oldest_blob_slot: Option<Slot>,
pub oldest_data_column_slot: Option<Slot>,
/// A separate blobs database is in use (deprecated, always `true`).
pub blobs_db: bool,
}
Expand All @@ -153,3 +153,30 @@ impl StoreItem for BlobInfo {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {
/// The slot after which data columns are or *will be* available (>=).
///
/// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which
/// data columns will be available.
///
/// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is
/// not yet known.
pub oldest_data_column_slot: Option<Slot>,
}

impl StoreItem for DataColumnInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
Loading

0 comments on commit 04c82a3

Please sign in to comment.