Skip to content

Commit

Permalink
Implement DB schema upgrade for hierarchical state diffs (#6193)
Browse files Browse the repository at this point in the history
* DB upgrade

* Add flag

* Delete RestorePointHash

* Update docs

* Update docs

* Implement hierarchical state diffs config migration (#6245)

* Implement hierarchical state diffs config migration

* Review PR

* Remove TODO

* Set CURRENT_SCHEMA_VERSION correctly

* Fix genesis state loading

* Re-delete some PartialBeaconState stuff

---------

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
  • Loading branch information
dapplion and michaelsproul authored Aug 19, 2024
1 parent d2049ca commit 57b73df
Show file tree
Hide file tree
Showing 18 changed files with 787 additions and 127 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 13 additions & 22 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,38 @@
//! Utilities for managing database schema changes.
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;

use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
use slog::Logger;
use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
use types::Hash256;

/// Migrate the database from one schema version to another, applying all requisite mutations.
#[allow(clippy::only_used_in_recursion)] // spec is not used but likely to be used in future
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
deposit_contract_deploy_block: u64,
genesis_state_root: Option<Hash256>,
from: SchemaVersion,
to: SchemaVersion,
log: Logger,
spec: &ChainSpec,
) -> Result<(), StoreError> {
match (from, to) {
// Migrating from the current schema version to itself is always OK, a no-op.
(_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()),
// Upgrade across multiple versions by recursively migrating one step at a time.
(_, _) if from.as_u64() + 1 < to.as_u64() => {
let next = SchemaVersion(from.as_u64() + 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}
// Downgrade across multiple versions by recursively migrating one step at a time.
(_, _) if to.as_u64() + 1 < from.as_u64() => {
let next = SchemaVersion(from.as_u64() - 1);
migrate_schema::<T>(
db.clone(),
deposit_contract_deploy_block,
from,
next,
log.clone(),
spec,
)?;
migrate_schema::<T>(db, deposit_contract_deploy_block, next, to, log, spec)
migrate_schema::<T>(db.clone(), genesis_state_root, from, next, log.clone())?;
migrate_schema::<T>(db, genesis_state_root, next, to, log)
}

//
Expand All @@ -69,6 +54,12 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
let ops =
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)?;
db.store_schema_version_atomically(to, ops)
}
// FIXME(sproul): consider downgrade
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
111 changes: 111 additions & 0 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::error;
use slog::{info, Logger};
use std::sync::Arc;
use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRoots, get_key_for_col, partial_beacon_state::PartialBeaconState,
DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
};
use types::{BeaconState, Hash256, Slot};

const LOG_EVERY: usize = 200_000;

fn load_old_schema_frozen_state<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
state_root: Hash256,
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
let Some(partial_state_bytes) = db
.cold_db
.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())?
else {
return Ok(None);
};
let mut partial_state: PartialBeaconState<T::EthSpec> =
PartialBeaconState::from_ssz_bytes(&partial_state_bytes, db.get_chain_spec())?;

// Fill in the fields of the partial state.
partial_state.load_block_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_state_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_randao_mixes(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_summaries(&db.cold_db, db.get_chain_spec())?;

partial_state.try_into().map(Some)
}

pub fn upgrade_to_v22<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
genesis_state_root: Option<Hash256>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v21 to v22");

let anchor = db.get_anchor_info().ok_or(Error::NoAnchorInfo)?;
let split_slot = db.get_split_slot();
let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?;

if !db.get_config().allow_tree_states_migration && !anchor.no_historic_states_stored(split_slot)
{
error!(
log,
"You are attempting to migrate to tree-states but this is a destructive operation. \
Upgrading will require FIXME(sproul) minutes of downtime before Lighthouse starts again. \
All current historic states will be deleted. Reconstructing the states in the new \
schema will take up to 2 weeks. \
\
To proceed add the flag --allow-tree-states-migration OR run lighthouse db prune-states"
);
return Err(Error::DestructiveFreezerUpgrade);
}

let mut ops = vec![];

rewrite_block_roots::<T>(&db, anchor.oldest_block_slot, split_slot, &mut ops, &log)?;

let mut genesis_state = load_old_schema_frozen_state::<T>(&db, genesis_state_root)?
.ok_or(Error::MissingGenesisState)?;
let genesis_state_root = genesis_state.update_tree_hash_cache()?;

db.prune_historic_states(genesis_state_root, &genesis_state)?;

Ok(ops)
}

pub fn rewrite_block_roots<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
oldest_block_slot: Slot,
split_slot: Slot,
ops: &mut Vec<KeyValueStoreOp>,
log: &Logger,
) -> Result<(), Error> {
// Block roots are available from the `oldest_block_slot` to the `split_slot`.
let start_vindex = oldest_block_slot.as_usize();
let block_root_iter = ChunkedVectorIter::<BlockRoots, _, _, _>::new(
db,
start_vindex,
split_slot,
db.get_chain_spec(),
);

// OK to hold these in memory (10M slots * 43 bytes per KV ~= 430 MB).
for (i, (slot, block_root)) in block_root_iter.enumerate() {
ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(
DBColumn::BeaconBlockRoots.into(),
&(slot as u64).to_be_bytes(),
),
block_root.as_bytes().to_vec(),
));

if i > 0 && i % LOG_EVERY == 0 {
info!(
log,
"Beacon block root migration in progress";
"roots_migrated" => i
);
}
}

Ok(())
}
16 changes: 8 additions & 8 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,25 +1057,25 @@ where
.chain_spec
.clone()
.ok_or("disk_store requires a chain spec")?;
let network_config = context
.eth2_network_config
.as_ref()
.ok_or("disk_store requires a network config")?;

self.db_path = Some(hot_path.into());
self.freezer_db_path = Some(cold_path.into());

let inner_spec = spec.clone();
let deposit_contract_deploy_block = context
.eth2_network_config
.as_ref()
.map(|config| config.deposit_contract_deploy_block)
.unwrap_or(0);
let genesis_state_root = network_config
.genesis_state_root::<E>()
.map_err(|e| format!("error determining genesis state root: {e:?}"))?;

let schema_upgrade = |db, from, to| {
migrate_schema::<Witness<TSlotClock, TEth1Backend, _, _, _>>(
db,
deposit_contract_deploy_block,
genesis_state_root,
from,
to,
log,
&inner_spec,
)
};

Expand Down
9 changes: 9 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,15 @@ pub fn cli_app() -> Command {
.default_value("0")
.display_order(0)
)
.arg(
Arg::new("allow-tree-states-migration")
.long("allow-tree-states-migration")
.value_name("BOOLEAN")
.help("Whether to allow a destructive freezer DB migration for hierarchical state diffs")
.action(ArgAction::Set)
.default_value("false")
.display_order(0)
)

/*
* Misc.
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ pub fn get_config<E: EthSpec>(
client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs;
}

if let Some(allow_tree_states_migration) =
clap_utils::parse_optional(cli_args, "allow-tree-states-migration")?
{
client_config.store.allow_tree_states_migration = allow_tree_states_migration;
}

/*
* Zero-ports
*
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ parking_lot = { workspace = true }
itertools = { workspace = true }
ethereum_ssz = { workspace = true }
ethereum_ssz_derive = { workspace = true }
superstruct = { workspace = true }
types = { workspace = true }
state_processing = { workspace = true }
slog = { workspace = true }
Expand Down
Loading

0 comments on commit 57b73df

Please sign in to comment.