Skip to content

Commit

Permalink
Store linear blocks in freezer db
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
  • Loading branch information
dapplion and michaelsproul committed Feb 8, 2024
1 parent e470596 commit 6156f6f
Show file tree
Hide file tree
Showing 22 changed files with 222 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
warp = { git = "https://github.com/seanmonstar/warp.git", default-features = false, features = ["tls"] }
zeroize = { version = "1", features = ["zeroize_derive"] }
zip = "0.6"
zstd = "0.11.2"

# Local crates.
account_utils = { path = "common/account_utils" }
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
continue;
}

match self.beacon_chain.store.try_get_full_block(&root) {
match self.beacon_chain.store.try_get_full_block(&root, None) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Expand Down
15 changes: 11 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub const INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON: &str =
"Finalized merge transition block is invalid.";

/// Defines the behaviour when a block/block-root for a skipped slot is requested.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WhenSlotSkipped {
/// If the slot is a skip slot, return `None`.
///
Expand Down Expand Up @@ -925,8 +926,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
let root = self.block_root_at_slot(request_slot, skips)?;

// Only hint the slot if expect a block at this exact slot.
let slot_hint = match skips {
WhenSlotSkipped::Prev => None,
WhenSlotSkipped::None => Some(request_slot),
};

if let Some(block_root) = root {
Ok(self.store.get_blinded_block(&block_root)?)
Ok(self.store.get_blinded_block(&block_root, slot_hint)?)
} else {
Ok(None)
}
Expand Down Expand Up @@ -1189,7 +1196,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
// Load block from database, returning immediately if we have the full block w payload
// stored.
let blinded_block = match self.store.try_get_full_block(block_root)? {
let blinded_block = match self.store.try_get_full_block(block_root, None)? {
Some(DatabaseBlock::Full(block)) => return Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => block,
None => return Ok(None),
Expand Down Expand Up @@ -1257,7 +1264,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
Ok(self.store.get_blinded_block(block_root)?)
Ok(self.store.get_blinded_block(block_root, None)?)
}

/// Returns the state at the given root, if any.
Expand Down Expand Up @@ -6365,7 +6372,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let beacon_block = self
.store
.get_blinded_block(&beacon_block_root)?
.get_blinded_block(&beacon_block_root, None)?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ where
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
let justified_block = self
.store
.get_blinded_block(&self.justified_checkpoint.root)
.get_blinded_block(&self.justified_checkpoint.root, None)
.map_err(Error::FailedToReadBlock)?
.ok_or(Error::MissingBlock(self.justified_checkpoint.root))?
.deconstruct()
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ where
.ok_or("Fork choice not found in store")?;

let genesis_block = store
.get_blinded_block(&chain.genesis_block_root)
.get_blinded_block(&chain.genesis_block_root, Some(Slot::new(0)))
.map_err(|e| descriptive_db_error("genesis block", &e))?
.ok_or("Genesis block not found in store")?;
let genesis_state = store
Expand Down Expand Up @@ -710,7 +710,7 @@ where
// Try to decode the head block according to the current fork, if that fails, try
// to backtrack to before the most recent fork.
let (head_block_root, head_block, head_reverted) =
match store.get_full_block(&initial_head_block_root) {
match store.get_full_block(&initial_head_block_root, None) {
Ok(Some(block)) => (initial_head_block_root, block, false),
Ok(None) => return Err("Head block not found in store".into()),
Err(StoreError::SszDecodeError(_)) => {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
let fork_choice_view = fork_choice.cached_fork_choice_view();
let beacon_block_root = fork_choice_view.head_block_root;
let beacon_block = store
.get_full_block(&beacon_block_root)?
.get_full_block(&beacon_block_root, None)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let current_slot = fork_choice.fc_store().get_current_slot();
let (_, beacon_state) = store
Expand Down Expand Up @@ -667,7 +667,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.unwrap_or_else(|| {
let beacon_block = self
.store
.get_full_block(&new_view.head_block_root)?
.get_full_block(&new_view.head_block_root, None)?
.ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?;

let (_, beacon_state) = self
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
let finalized_checkpoint = head_state.finalized_checkpoint();
let finalized_block_root = finalized_checkpoint.root;
let finalized_block = store
.get_full_block(&finalized_block_root)
.get_full_block(&finalized_block_root, None)
.map_err(|e| format!("Error loading finalized block: {:?}", e))?
.ok_or_else(|| {
format!(
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/light_client_server_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
let signature_slot = block_slot;
let attested_block_root = block_parent_root;

let attested_block = store.get_blinded_block(attested_block_root)?.ok_or(
let attested_block = store.get_blinded_block(attested_block_root, None)?.ok_or(
BeaconChainError::DBInconsistent(format!(
"Block not available {:?}",
attested_block_root
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
if is_latest_finality & !cached_parts.finalized_block_root.is_zero() {
// Immediately after checkpoint sync the finalized block may not be available yet.
if let Some(finalized_block) =
store.get_blinded_block(&cached_parts.finalized_block_root)?
store.get_blinded_block(&cached_parts.finalized_block_root, None)?
{
*self.latest_finality_update.write() = Some(LightClientFinalityUpdate {
// TODO: may want to cache this result from latest_optimistic_update if producing a
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// so delete it from the head tracker but leave it and its states in the database
// This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync
// can be used to reclaim the space.
let head_state_root = match store.get_blinded_block(&head_hash) {
let head_state_root = match store.get_blinded_block(&head_hash, Some(head_slot)) {
Ok(Some(block)) => block.state_root(),
Ok(None) => {
return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into())
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/pre_finalization_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// 2. Check on disk.
if self.store.get_blinded_block(&block_root)?.is_some() {
if self.store.get_blinded_block(&block_root, None)?.is_some() {
cache.block_roots.put(block_root, ());
return Ok(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn get_slot_clock<T: BeaconChainTypes>(
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero())? else {
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero(), Some(Slot::new(0)))? else {
error!(log, "Missing genesis block");
return Ok(None);
};
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("compression-level")
.long("compression-level")
.value_name("LEVEL")
.help("Compression level (-99 to 22) for zstd compression applied to states on disk \
[default: 1]. You may change the compression level freely without re-syncing.")
.takes_value(true)
)
.arg(
Arg::with_name("prune-payloads")
.long("prune-payloads")
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ lru = { workspace = true }
sloggers = { workspace = true }
directory = { workspace = true }
strum = { workspace = true }
zstd = { workspace = true }
27 changes: 27 additions & 0 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use types::{EthSpec, MinimalEthSpec};
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(5);
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
const EST_COMPRESSION_FACTOR: usize = 2;
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1);
pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1;
pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;
Expand All @@ -22,6 +24,8 @@ pub struct StoreConfig {
pub slots_per_restore_point_set_explicitly: bool,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: NonZeroUsize,
/// Compression level for blocks, state diffs and other compressed values.
pub compression_level: i32,
/// Maximum number of states from freezer database to store in the in-memory state cache.
pub historic_state_cache_size: NonZeroUsize,
/// Whether to compact the database on initialization.
Expand All @@ -30,6 +34,8 @@ pub struct StoreConfig {
pub compact_on_prune: bool,
/// Whether to prune payloads on initialization and finalization.
pub prune_payloads: bool,
/// Whether to store finalized blocks compressed and linearised in the freezer database.
pub linear_blocks: bool,
/// Whether to prune blobs older than the blob data availability boundary.
pub prune_blobs: bool,
/// Frequency of blob pruning in epochs. Default: 1 (every epoch).
Expand Down Expand Up @@ -57,10 +63,12 @@ impl Default for StoreConfig {
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
slots_per_restore_point_set_explicitly: false,
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
compression_level: DEFAULT_COMPRESSION_LEVEL,
historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
prune_payloads: true,
linear_blocks: true,
prune_blobs: true,
epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE,
blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS,
Expand All @@ -87,6 +95,25 @@ impl StoreConfig {
}
Ok(())
}

/// Estimate the size of `len` bytes after compression at the current compression level.
pub fn estimate_compressed_size(&self, len: usize) -> usize {
if self.compression_level == 0 {
len
} else {
len / EST_COMPRESSION_FACTOR
}
}

/// Estimate the size of `len` compressed bytes after decompression at the current compression
/// level.
pub fn estimate_decompressed_size(&self, len: usize) -> usize {
if self.compression_level == 0 {
len
} else {
len * EST_COMPRESSION_FACTOR
}
}
}

impl StoreItem for OnDiskStoreConfig {
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum Error {
computed: Hash256,
},
BlockReplayError(BlockReplayError),
Compression(std::io::Error),
AddPayloadLogicError,
SlotClockUnavailableForMigration,
InvalidKey,
Expand Down
Loading

0 comments on commit 6156f6f

Please sign in to comment.