diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index f5f78c45327..e4f90ea43d3 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2,7 +2,7 @@ use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::hdiff::{HDiff, HDiffBuffer, HierarchyModuli, StorageStrategy}; use crate::historic_state_cache::HistoricStateCache; -use crate::impls::beacon_state::{get_full_state, store_full_state}; +use crate::impls::beacon_state::store_full_state; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; @@ -164,7 +164,7 @@ pub enum HotColdDBError { MissingEpochBoundaryState(Hash256), MissingPrevState(Hash256), MissingSplitState(Hash256, Slot), - MissingStateDiff(Hash256), + MissingHotHDiff(Hash256), MissingHDiff(Slot), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), @@ -1118,36 +1118,6 @@ impl, Cold: ItemStore> HotColdDB ) } - /// Load an epoch boundary state by using the hot state summary look-up. - /// - /// Will fall back to the cold DB if a hot state summary is not found. - pub fn load_epoch_boundary_state( - &self, - state_root: &Hash256, - ) -> Result>, Error> { - if let Some(HotStateSummary { - epoch_boundary_state_root, - .. - }) = self.load_hot_state_summary(state_root)? - { - // NOTE: minor inefficiency here because we load an unnecessary hot state summary - let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), - )?; - Ok(Some(state)) - } else { - // Try the cold DB - match self.load_cold_state_slot(state_root)? { - Some(state_slot) => { - let epoch_boundary_slot = - state_slot / E::slots_per_epoch() * E::slots_per_epoch(); - self.load_cold_state_by_slot(epoch_boundary_slot).map(Some) - } - None => Ok(None), - } - } - } - pub fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { self.hot_db.put(key, item) } @@ -1483,7 +1453,7 @@ impl, Cold: ItemStore> HotColdDB // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. - let hot_state_summary = HotStateSummary::new(state_root, state)?; + let hot_state_summary = HotStateSummary::new(state_root, state, &self.hierarchy_hot)?; other_ops.push(hot_state_summary.as_kv_store_op(*state_root)); // On the epoch boundary, store the full state. @@ -1541,10 +1511,7 @@ impl, Cold: ItemStore> HotColdDB let target_buffer = HDiffBuffer::from_state(state.clone()); let diff = HDiff::compute(&base_buffer, &target_buffer, &self.config)?; let diff_bytes = diff.as_ssz_bytes(); - let key = get_key_for_col( - DBColumn::BeaconStateHotDiff.into(), - &state_root.as_slice().to_vec(), - ); + let key = get_key_for_col(DBColumn::BeaconStateHotDiff.into(), state_root.as_slice()); ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes)); Ok(()) } @@ -1585,13 +1552,9 @@ impl, Cold: ItemStore> HotColdDB } fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result { - // TODO: Add cache of hot hdiff buffers - + // FIXME(tree-states): Add cache of hot hdiff buffers let Some(HotStateSummary { - slot, - latest_block_root, - storage_strategy, - .. + storage_strategy, .. }) = self.load_hot_state_summary(&state_root)? else { return Err(Error::MissingState(state_root)); @@ -1599,14 +1562,16 @@ impl, Cold: ItemStore> HotColdDB match storage_strategy { StorageStrategyHot::Snapshot(_) => { - let state = get_full_state(&self.hot_db, &state_root, &self.spec)? - .ok_or(HotColdDBError::MissingEpochBoundaryState(state_root.into()))?; - let buffer = HDiffBuffer::from_state(state.clone()); + // FIXME(tree-states): rename error + let state = self + .load_hot_state_as_snapshot(state_root)? + .ok_or(HotColdDBError::MissingEpochBoundaryState(state_root))?; + let buffer = HDiffBuffer::from_state(state); Ok(buffer) } StorageStrategyHot::DiffFrom(from) => { let mut buffer = self.load_hot_hdiff_buffer(from)?; - let diff = self.load_hot_diff(state_root)?; + let diff = self.load_hot_hdiff(state_root)?; diff.apply(&mut buffer, &self.config)?; Ok(buffer) } @@ -1614,8 +1579,18 @@ impl, Cold: ItemStore> HotColdDB } } - fn load_hot_diff(&self, _state_root: Hash256) -> Result { - todo!(); + fn load_hot_hdiff(&self, state_root: Hash256) -> Result { + let bytes = { + let _t = metrics::start_timer(&metrics::BEACON_HDIFF_READ_TIMES); + self.hot_db + .get_bytes(DBColumn::BeaconStateHotDiff.into(), state_root.as_slice())? + .ok_or(HotColdDBError::MissingHotHDiff(state_root))? + }; + let hdiff = { + let _t = metrics::start_timer(&metrics::BEACON_HDIFF_DECODE_TIMES); + HDiff::from_ssz_bytes(&bytes)? + }; + Ok(hdiff) } /// Load a post-finalization state from the hot database. @@ -1642,65 +1617,25 @@ impl, Cold: ItemStore> HotColdDB storage_strategy, }) = self.load_hot_state_summary(state_root)? { - match storage_strategy { - StorageStrategyHot::Snapshot | StorageStrategy::DiffFrom(_) => { - let buffer = self.load_hot_hdiff_buffer(state_root)?; + let mut state = match storage_strategy { + StorageStrategyHot::Snapshot(_) | StorageStrategyHot::DiffFrom(_) => { + let buffer = self.load_hot_hdiff_buffer(*state_root)?; + buffer.as_state(&self.spec)? } StorageStrategyHot::ReplayFrom(from) => { - let state = self.load_hot_state_something(state_root)?; + // FIXME(tree-states): better error + let (mut base_state, _) = self + .load_hot_state(&from)? + .ok_or(HotColdDBError::MissingHotStateSummary(from))?; + + // Immediately rebase the state from disk on the finalized state so that we can + // reuse parts of the tree for state root calculation in `replay_blocks`. + self.state_cache + .lock() + .rebase_on_finalized(&mut base_state, &self.spec)?; + + self.load_hot_state_using_replay(base_state, slot, latest_block_root)? } - } - let mut boundary_state = - get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), - )?; - - // Immediately rebase the state from disk on the finalized state so that we can reuse - // parts of the tree for state root calculation in `replay_blocks`. - self.state_cache - .lock() - .rebase_on_finalized(&mut boundary_state, &self.spec)?; - - // Optimization to avoid even *thinking* about replaying blocks if we're already - // on an epoch boundary. - // TODO: review this condition - let mut state = if slot % E::slots_per_epoch() == 0 { - boundary_state - } else { - // Cache ALL intermediate states that are reached during block replay. We may want - // to restrict this in future to only cache epoch boundary states. At worst we will - // cache up to 32 states for each state loaded, which should not flush out the cache - // entirely. - let state_cache_hook = |state_root, state: &mut BeaconState| { - // Ensure all caches are built before attempting to cache. - state.update_tree_hash_cache()?; - state.build_all_caches(&self.spec)?; - - let latest_block_root = state.get_latest_block_root(state_root); - if let PutStateOutcome::New = - self.state_cache - .lock() - .put_state(state_root, latest_block_root, state)? - { - debug!( - self.log, - "Cached ancestor state"; - "state_root" => ?state_root, - "slot" => slot, - ); - } - Ok(()) - }; - let blocks = - self.load_blocks_to_replay(boundary_state.slot(), slot, latest_block_root)?; - let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME); - self.replay_blocks( - boundary_state, - blocks, - slot, - no_state_root_iter(), - Some(Box::new(state_cache_hook)), - )? }; state.apply_pending_mutations()?; @@ -1710,6 +1645,22 @@ impl, Cold: ItemStore> HotColdDB } } + fn load_hot_state_using_replay( + &self, + base_state: BeaconState, + slot: Slot, + latest_block_root: Hash256, + ) -> Result, Error> { + if base_state.slot() == slot { + return Ok(base_state); + } + + let blocks = self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?; + let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME); + // FIXME(tree-states): reconsider caching + self.replay_blocks(base_state, blocks, slot, no_state_root_iter(), None) + } + pub fn store_cold_state_summary( &self, state_root: &Hash256, @@ -1816,6 +1767,29 @@ impl, Cold: ItemStore> HotColdDB } } + fn load_hot_state_bytes_as_snapshot( + &self, + state_root: Hash256, + ) -> Result>, Error> { + match self.hot_db.get_bytes( + DBColumn::BeaconStateHotSnapshot.into(), + state_root.as_slice(), + )? { + Some(bytes) => { + let _timer = + metrics::start_timer(&metrics::STORE_BEACON_STATE_FREEZER_DECOMPRESS_TIME); + let mut ssz_bytes = + Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len())); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + Ok(Some(ssz_bytes)) + } + None => Ok(None), + } + } + fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result>, Error> { Ok(self .load_cold_state_bytes_as_snapshot(slot)? @@ -1823,6 +1797,16 @@ impl, Cold: ItemStore> HotColdDB .transpose()?) } + fn load_hot_state_as_snapshot( + &self, + state_root: Hash256, + ) -> Result>, Error> { + Ok(self + .load_hot_state_bytes_as_snapshot(state_root)? + .map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec)) + .transpose()?) + } + pub fn store_cold_state_as_diff( &self, state: &BeaconState, @@ -3329,7 +3313,11 @@ fn no_state_root_iter() -> Option(state_root: &Hash256, state: &BeaconState) -> Result { + pub fn new( + state_root: &Hash256, + state: &BeaconState, + hdiff_moduli: &HierarchyModuli, + ) -> Result { // Fill in the state root on the latest block header if necessary (this happens on all // slots where there isn't a skip). let latest_block_root = state.get_latest_block_root(*state_root); - let epoch_boundary_slot = state.slot() / E::slots_per_epoch() * E::slots_per_epoch(); - let epoch_boundary_state_root = if epoch_boundary_slot == state.slot() { - *state_root - } else { - *state - .get_state_root(epoch_boundary_slot) - .map_err(HotColdDBError::HotStateSummaryError)? + + let get_state_root = |slot| { + if slot == state.slot() { + Ok(*state_root) + } else { + state + .get_state_root(slot) + .copied() + .map_err(HotColdDBError::HotStateSummaryError) + } + }; + let storage_strategy = match hdiff_moduli.storage_strategy(state.slot())? { + StorageStrategy::ReplayFrom(slot) => { + StorageStrategyHot::ReplayFrom(get_state_root(slot)?) + } + StorageStrategy::DiffFrom(slot) => StorageStrategyHot::DiffFrom(get_state_root(slot)?), + StorageStrategy::Snapshot => StorageStrategyHot::Snapshot(0), }; Ok(HotStateSummary { slot: state.slot(), latest_block_root, - storage_strategy: todo!(), + storage_strategy, }) } } diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 1f970b04dc9..e007224f868 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -18,6 +18,8 @@ pub fn store_full_state( Ok(()) } +// FIXME(tree-states): delete/move to migration +#[allow(dead_code)] pub fn get_full_state, E: EthSpec>( db: &KV, state_root: &Hash256, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 2a7b7ac6dec..033008c09f1 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -262,11 +262,20 @@ pub enum DBColumn { #[strum(serialize = "bdc")] BeaconDataColumn, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). + /// + /// DEPRECATED. #[strum(serialize = "ste")] BeaconState, - /// For compact `BeaconStateDiff`'s in the hot DB - #[strum(serialize = "bhd")] + /// For compact `BeaconStateDiff`'s in the hot DB. + /// + /// hsd = Hot State Diff. + #[strum(serialize = "hsd")] BeaconStateHotDiff, + /// For beacon state snapshots in the hot DB. + /// + /// hsn = Hot Snapshot. + #[strum(serialize = "hsn")] + BeaconStateHotSnapshot, /// For beacon state snapshots in the freezer DB. #[strum(serialize = "bsn")] BeaconStateSnapshot, @@ -381,6 +390,7 @@ impl DBColumn { | Self::BeaconBlob | Self::BeaconStateSummary | Self::BeaconStateHotDiff + | Self::BeaconStateHotSnapshot | Self::BeaconColdStateSummary | Self::BeaconStateTemporary | Self::ExecPayload