Skip to content

Commit

Permalink
Some progress
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Nov 21, 2024
1 parent aa854ec commit d2ed2a8
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 116 deletions.
230 changes: 116 additions & 114 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::{OnDiskStoreConfig, StoreConfig};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::hdiff::{HDiff, HDiffBuffer, HierarchyModuli, StorageStrategy};
use crate::historic_state_cache::HistoricStateCache;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::impls::beacon_state::store_full_state;
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::leveldb_store::{BytesKey, LevelDB};
use crate::memory_store::MemoryStore;
Expand Down Expand Up @@ -164,7 +164,7 @@ pub enum HotColdDBError {
MissingEpochBoundaryState(Hash256),
MissingPrevState(Hash256),
MissingSplitState(Hash256, Slot),
MissingStateDiff(Hash256),
MissingHotHDiff(Hash256),
MissingHDiff(Slot),
MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
Expand Down Expand Up @@ -1118,36 +1118,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)
}

/// Load an epoch boundary state by using the hot state summary look-up.
///
/// Will fall back to the cold DB if a hot state summary is not found.
pub fn load_epoch_boundary_state(
&self,
state_root: &Hash256,
) -> Result<Option<BeaconState<E>>, Error> {
if let Some(HotStateSummary {
epoch_boundary_state_root,
..
}) = self.load_hot_state_summary(state_root)?
{
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
Ok(Some(state))
} else {
// Try the cold DB
match self.load_cold_state_slot(state_root)? {
Some(state_slot) => {
let epoch_boundary_slot =
state_slot / E::slots_per_epoch() * E::slots_per_epoch();
self.load_cold_state_by_slot(epoch_boundary_slot).map(Some)
}
None => Ok(None),
}
}
}

pub fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
self.hot_db.put(key, item)
}
Expand Down Expand Up @@ -1483,7 +1453,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// Store a summary of the state.
// We store one even for the epoch boundary states, as we may need their slots
// when doing a look up by state root.
let hot_state_summary = HotStateSummary::new(state_root, state)?;
let hot_state_summary = HotStateSummary::new(state_root, state, &self.hierarchy_hot)?;
other_ops.push(hot_state_summary.as_kv_store_op(*state_root));

// On the epoch boundary, store the full state.
Expand Down Expand Up @@ -1541,10 +1511,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let target_buffer = HDiffBuffer::from_state(state.clone());
let diff = HDiff::compute(&base_buffer, &target_buffer, &self.config)?;
let diff_bytes = diff.as_ssz_bytes();
let key = get_key_for_col(
DBColumn::BeaconStateHotDiff.into(),
&state_root.as_slice().to_vec(),
);
let key = get_key_for_col(DBColumn::BeaconStateHotDiff.into(), state_root.as_slice());
ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes));
Ok(())
}
Expand Down Expand Up @@ -1585,37 +1552,45 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result<HDiffBuffer, Error> {
// TODO: Add cache of hot hdiff buffers

// FIXME(tree-states): Add cache of hot hdiff buffers
let Some(HotStateSummary {
slot,
latest_block_root,
storage_strategy,
..
storage_strategy, ..
}) = self.load_hot_state_summary(&state_root)?
else {
return Err(Error::MissingState(state_root));
};

match storage_strategy {
StorageStrategyHot::Snapshot(_) => {
let state = get_full_state(&self.hot_db, &state_root, &self.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(state_root.into()))?;
let buffer = HDiffBuffer::from_state(state.clone());
// FIXME(tree-states): rename error
let state = self
.load_hot_state_as_snapshot(state_root)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(state_root))?;
let buffer = HDiffBuffer::from_state(state);
Ok(buffer)
}
StorageStrategyHot::DiffFrom(from) => {
let mut buffer = self.load_hot_hdiff_buffer(from)?;
let diff = self.load_hot_diff(state_root)?;
let diff = self.load_hot_hdiff(state_root)?;
diff.apply(&mut buffer, &self.config)?;
Ok(buffer)
}
StorageStrategyHot::ReplayFrom(from) => self.load_hot_hdiff_buffer(from),
}
}

fn load_hot_diff(&self, _state_root: Hash256) -> Result<HDiff, Error> {
todo!();
fn load_hot_hdiff(&self, state_root: Hash256) -> Result<HDiff, Error> {
let bytes = {
let _t = metrics::start_timer(&metrics::BEACON_HDIFF_READ_TIMES);
self.hot_db
.get_bytes(DBColumn::BeaconStateHotDiff.into(), state_root.as_slice())?
.ok_or(HotColdDBError::MissingHotHDiff(state_root))?
};
let hdiff = {
let _t = metrics::start_timer(&metrics::BEACON_HDIFF_DECODE_TIMES);
HDiff::from_ssz_bytes(&bytes)?
};
Ok(hdiff)
}

/// Load a post-finalization state from the hot database.
Expand All @@ -1642,65 +1617,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
storage_strategy,
}) = self.load_hot_state_summary(state_root)?
{
match storage_strategy {
StorageStrategyHot::Snapshot | StorageStrategy::DiffFrom(_) => {
let buffer = self.load_hot_hdiff_buffer(state_root)?;
let mut state = match storage_strategy {
StorageStrategyHot::Snapshot(_) | StorageStrategyHot::DiffFrom(_) => {
let buffer = self.load_hot_hdiff_buffer(*state_root)?;
buffer.as_state(&self.spec)?
}
StorageStrategyHot::ReplayFrom(from) => {
let state = self.load_hot_state_something(state_root)?;
// FIXME(tree-states): better error
let (mut base_state, _) = self
.load_hot_state(&from)?
.ok_or(HotColdDBError::MissingHotStateSummary(from))?;

// Immediately rebase the state from disk on the finalized state so that we can
// reuse parts of the tree for state root calculation in `replay_blocks`.
self.state_cache
.lock()
.rebase_on_finalized(&mut base_state, &self.spec)?;

self.load_hot_state_using_replay(base_state, slot, latest_block_root)?
}
}
let mut boundary_state =
get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;

// Immediately rebase the state from disk on the finalized state so that we can reuse
// parts of the tree for state root calculation in `replay_blocks`.
self.state_cache
.lock()
.rebase_on_finalized(&mut boundary_state, &self.spec)?;

// Optimization to avoid even *thinking* about replaying blocks if we're already
// on an epoch boundary.
// TODO: review this condition
let mut state = if slot % E::slots_per_epoch() == 0 {
boundary_state
} else {
// Cache ALL intermediate states that are reached during block replay. We may want
// to restrict this in future to only cache epoch boundary states. At worst we will
// cache up to 32 states for each state loaded, which should not flush out the cache
// entirely.
let state_cache_hook = |state_root, state: &mut BeaconState<E>| {
// Ensure all caches are built before attempting to cache.
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;

let latest_block_root = state.get_latest_block_root(state_root);
if let PutStateOutcome::New =
self.state_cache
.lock()
.put_state(state_root, latest_block_root, state)?
{
debug!(
self.log,
"Cached ancestor state";
"state_root" => ?state_root,
"slot" => slot,
);
}
Ok(())
};
let blocks =
self.load_blocks_to_replay(boundary_state.slot(), slot, latest_block_root)?;
let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME);
self.replay_blocks(
boundary_state,
blocks,
slot,
no_state_root_iter(),
Some(Box::new(state_cache_hook)),
)?
};
state.apply_pending_mutations()?;

Expand All @@ -1710,6 +1645,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}

fn load_hot_state_using_replay(
&self,
base_state: BeaconState<E>,
slot: Slot,
latest_block_root: Hash256,
) -> Result<BeaconState<E>, Error> {
if base_state.slot() == slot {
return Ok(base_state);
}

let blocks = self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME);
// FIXME(tree-states): reconsider caching
self.replay_blocks(base_state, blocks, slot, no_state_root_iter(), None)
}

pub fn store_cold_state_summary(
&self,
state_root: &Hash256,
Expand Down Expand Up @@ -1816,13 +1767,46 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}

fn load_hot_state_bytes_as_snapshot(
&self,
state_root: Hash256,
) -> Result<Option<Vec<u8>>, Error> {
match self.hot_db.get_bytes(
DBColumn::BeaconStateHotSnapshot.into(),
state_root.as_slice(),
)? {
Some(bytes) => {
let _timer =
metrics::start_timer(&metrics::STORE_BEACON_STATE_FREEZER_DECOMPRESS_TIME);
let mut ssz_bytes =
Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len()));
let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?;
decoder
.read_to_end(&mut ssz_bytes)
.map_err(Error::Compression)?;
Ok(Some(ssz_bytes))
}
None => Ok(None),
}
}

fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result<Option<BeaconState<E>>, Error> {
Ok(self
.load_cold_state_bytes_as_snapshot(slot)?
.map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec))
.transpose()?)
}

fn load_hot_state_as_snapshot(
&self,
state_root: Hash256,
) -> Result<Option<BeaconState<E>>, Error> {
Ok(self
.load_hot_state_bytes_as_snapshot(state_root)?
.map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec))
.transpose()?)
}

pub fn store_cold_state_as_diff(
&self,
state: &BeaconState<E>,
Expand Down Expand Up @@ -3329,7 +3313,11 @@ fn no_state_root_iter() -> Option<std::iter::Empty<Result<(Hash256, Slot), Error
pub struct HotStateSummary {
pub slot: Slot,
pub latest_block_root: Hash256,
// FIXME(tree-states): consider not storing the storage strategy and storing a state root instead
// diff_base_state_root: (Slot, Hash256),
pub storage_strategy: StorageStrategyHot,
// FIXME(tree-states): should add this as part of this migration
// pub previous_state_root: Hash256,
}

#[derive(Debug, Clone, Copy, Encode, Decode)]
Expand Down Expand Up @@ -3357,23 +3345,37 @@ impl StoreItem for HotStateSummary {

impl HotStateSummary {
/// Construct a new summary of the given state.
pub fn new<E: EthSpec>(state_root: &Hash256, state: &BeaconState<E>) -> Result<Self, Error> {
pub fn new<E: EthSpec>(
state_root: &Hash256,
state: &BeaconState<E>,
hdiff_moduli: &HierarchyModuli,
) -> Result<Self, Error> {
// Fill in the state root on the latest block header if necessary (this happens on all
// slots where there isn't a skip).
let latest_block_root = state.get_latest_block_root(*state_root);
let epoch_boundary_slot = state.slot() / E::slots_per_epoch() * E::slots_per_epoch();
let epoch_boundary_state_root = if epoch_boundary_slot == state.slot() {
*state_root
} else {
*state
.get_state_root(epoch_boundary_slot)
.map_err(HotColdDBError::HotStateSummaryError)?

let get_state_root = |slot| {
if slot == state.slot() {
Ok(*state_root)
} else {
state
.get_state_root(slot)
.copied()
.map_err(HotColdDBError::HotStateSummaryError)
}
};
let storage_strategy = match hdiff_moduli.storage_strategy(state.slot())? {
StorageStrategy::ReplayFrom(slot) => {
StorageStrategyHot::ReplayFrom(get_state_root(slot)?)
}
StorageStrategy::DiffFrom(slot) => StorageStrategyHot::DiffFrom(get_state_root(slot)?),
StorageStrategy::Snapshot => StorageStrategyHot::Snapshot(0),
};

Ok(HotStateSummary {
slot: state.slot(),
latest_block_root,
storage_strategy: todo!(),
storage_strategy,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/store/src/impls/beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub fn store_full_state<E: EthSpec>(
Ok(())
}

// FIXME(tree-states): delete/move to migration
#[allow(dead_code)]
pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec>(
db: &KV,
state_root: &Hash256,
Expand Down
14 changes: 12 additions & 2 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,20 @@ pub enum DBColumn {
#[strum(serialize = "bdc")]
BeaconDataColumn,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
///
/// DEPRECATED.
#[strum(serialize = "ste")]
BeaconState,
/// For compact `BeaconStateDiff`'s in the hot DB
#[strum(serialize = "bhd")]
/// For compact `BeaconStateDiff`'s in the hot DB.
///
/// hsd = Hot State Diff.
#[strum(serialize = "hsd")]
BeaconStateHotDiff,
/// For beacon state snapshots in the hot DB.
///
/// hsn = Hot Snapshot.
#[strum(serialize = "hsn")]
BeaconStateHotSnapshot,
/// For beacon state snapshots in the freezer DB.
#[strum(serialize = "bsn")]
BeaconStateSnapshot,
Expand Down Expand Up @@ -381,6 +390,7 @@ impl DBColumn {
| Self::BeaconBlob
| Self::BeaconStateSummary
| Self::BeaconStateHotDiff
| Self::BeaconStateHotSnapshot
| Self::BeaconColdStateSummary
| Self::BeaconStateTemporary
| Self::ExecPayload
Expand Down

0 comments on commit d2ed2a8

Please sign in to comment.