Skip to content

Commit

Permalink
It works :O
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Jan 15, 2024
1 parent 38f34c2 commit 9d1087c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 9 deletions.
19 changes: 10 additions & 9 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v24.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
beacon_chain::{BeaconChainTypes, BEACON_CHAIN_DB_KEY},
persisted_beacon_chain::PersistedBeaconChain,
};
use beacon_state_container::{get_full_state, StorageContainer};
use slog::{debug, info, Logger};
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -28,6 +29,8 @@ use store::{
};
use types::{BeaconState, Hash256, Slot};

mod beacon_state_container;

/// Chunk size for freezer block roots in the old database schema.
const OLD_SCHEMA_CHUNK_SIZE: u64 = 128;

Expand Down Expand Up @@ -55,15 +58,9 @@ fn get_state_by_replay<T: BeaconChainTypes>(
} = get_summary_v1::<T>(db, state_root)?;

// Load full state from the epoch boundary.
let epoch_boundary_state_bytes = db
.hot_db
.get_bytes(
DBColumn::BeaconState.into(),
epoch_boundary_state_root.as_bytes(),
)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(state_root))?;
let epoch_boundary_state =
BeaconState::from_ssz_bytes(&epoch_boundary_state_bytes, db.get_chain_spec())?;
get_full_state(&db.hot_db, &epoch_boundary_state_root, db.get_chain_spec())?
.ok_or(Error::MissingState(epoch_boundary_state_root))?;

// Replay blocks to reach the target state.
let blocks = db.load_blocks_to_replay(epoch_boundary_state.slot(), slot, latest_block_root)?;
Expand Down Expand Up @@ -250,14 +247,16 @@ fn rewrite_hot_states<T: BeaconChainTypes>(
log: &Logger,
) -> Result<(), Error> {
// Rewrite the split state and delete everything else from the `BeaconState` column.
info!(log, "Rewriting recent states");
let split = db.get_split_info();
let mut split_state_found = false;

for res in db.hot_db.iter_column::<Hash256>(DBColumn::BeaconState) {
let (state_root, state_bytes) = res?;

if state_root == split.state_root {
let state = BeaconState::from_ssz_bytes(&state_bytes, db.get_chain_spec())?;
let container = StorageContainer::from_ssz_bytes(&state_bytes, db.get_chain_spec())?;
let state = container.try_into()?;
db.store_hot_state(&state_root, &state, hot_db_ops)?;
split_state_found = true;
} else {
Expand Down Expand Up @@ -411,5 +410,7 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
};
let hot_db_ops = upgrade_hot_database::<T>(&db, &log)?;

info!(log, "Finished rewriting hot DB");

db.store_schema_version_atomically(SchemaVersion(24), hot_db_ops)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use ssz::DecodeError;
use ssz_derive::Encode;
use std::convert::TryInto;
use std::sync::Arc;
use store::*;
use types::beacon_state::{CommitteeCache, CACHED_EPOCHS};

pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec>(
db: &KV,
state_root: &Hash256,
spec: &ChainSpec,
) -> Result<Option<BeaconState<E>>, Error> {
match db.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())? {
Some(bytes) => {
let container = StorageContainer::from_ssz_bytes(&bytes, spec)?;
Ok(Some(container.try_into()?))
}
None => Ok(None),
}
}

/// A container for storing `BeaconState` components.
#[derive(Encode)]
pub struct StorageContainer<T: EthSpec> {
state: BeaconState<T>,
committee_caches: Vec<Arc<CommitteeCache>>,
}

impl<T: EthSpec> StorageContainer<T> {
pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result<Self, ssz::DecodeError> {
// We need to use the slot-switching `from_ssz_bytes` of `BeaconState`, which doesn't
// compose with the other SSZ utils, so we duplicate some parts of `ssz_derive` here.
let mut builder = ssz::SszDecoderBuilder::new(bytes);

builder.register_anonymous_variable_length_item()?;
builder.register_type::<Vec<CommitteeCache>>()?;

let mut decoder = builder.build()?;

let state = decoder.decode_next_with(|bytes| BeaconState::from_ssz_bytes(bytes, spec))?;
let committee_caches = decoder.decode_next()?;

Ok(Self {
state,
committee_caches,
})
}
}

impl<T: EthSpec> TryInto<BeaconState<T>> for StorageContainer<T> {
type Error = Error;

fn try_into(mut self) -> Result<BeaconState<T>, Error> {
let mut state = self.state;

for i in (0..CACHED_EPOCHS).rev() {
if i >= self.committee_caches.len() {
return Err(Error::SszDecodeError(DecodeError::BytesInvalid(
"Insufficient committees for BeaconState".to_string(),
)));
};

state.committee_caches_mut()[i] = self.committee_caches.remove(i);
}

Ok(state)
}
}
8 changes: 8 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.help("Specifies how many states from the freezer database should cache in memory [default: 1]")
.takes_value(true)
)
.arg(
Arg::with_name("db-linear-blocks")
.long("db-linear-blocks")
.value_name("BOOL")
.help("Customise storage of finalized blocks. Legacy databases must set this value \
to `false`, while new databases should set it to `true`.")
.takes_value(true)
)
/*
* Execution Layer Integration
*/
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ pub fn get_config<E: EthSpec>(
if let Some(compression_level) = clap_utils::parse_optional(cli_args, "compression-level")? {
client_config.store.compression_level = compression_level;
}
if let Some(linear_blocks) = clap_utils::parse_optional(cli_args, "db-linear-blocks")? {
client_config.store.linear_blocks = linear_blocks;
}

if let Some(historic_state_cache_size) = cli_args.value_of("historic-state-cache-size") {
client_config.store.historic_state_cache_size = historic_state_cache_size
Expand Down

0 comments on commit 9d1087c

Please sign in to comment.