Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archival manager #651

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
10 changes: 9 additions & 1 deletion components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use kaspa_consensus_core::{
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath, Hash,
ArchivalBlock, BlockHashSet, BlueWorkType, ChainPath, Hash,
};
use kaspa_utils::sync::rwlock::*;
use std::{ops::Deref, sync::Arc};
Expand Down Expand Up @@ -458,6 +458,14 @@ impl ConsensusSessionOwned {
pub async fn async_finality_point(&self) -> Hash {
self.clone().spawn_blocking(move |c| c.finality_point()).await
}

pub async fn async_get_pruning_window_roots(&self) -> Vec<(u64, Vec<Hash>)> {
self.clone().spawn_blocking(move |c| c.get_pruning_window_roots()).await
}

pub async fn async_add_archival_blocks(&self, blocks: Vec<ArchivalBlock>) -> ConsensusResult<()> {
self.clone().spawn_blocking(move |c| c.add_archival_blocks(blocks)).await
}
}

pub type ConsensusProxy = ConsensusSessionOwned;
10 changes: 9 additions & 1 deletion consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath,
ArchivalBlock, BlockHashSet, BlueWorkType, ChainPath,
};
use kaspa_hashes::Hash;

Expand Down Expand Up @@ -370,6 +370,14 @@ pub trait ConsensusApi: Send + Sync {
fn finality_point(&self) -> Hash {
unimplemented!()
}

fn get_pruning_window_roots(&self) -> Vec<(u64, Vec<Hash>)> {
unimplemented!()
}

fn add_archival_blocks(&self, blocks: Vec<ArchivalBlock>) -> ConsensusResult<()> {
unimplemented!()
}
}

pub type DynConsensus = Arc<dyn ConsensusApi>;
24 changes: 24 additions & 0 deletions consensus/core/src/errors/archival.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use kaspa_hashes::Hash;
use thiserror::Error;

use super::block::RuleError;

#[derive(Error, Debug, Clone)]
pub enum ArchivalError {
#[error("child {0} was not found")]
ChildNotFound(Hash),

#[error("{0} is not a parent of {1}")]
NotParentOf(Hash, Hash),

#[error("node is not on archival mode")]
NotArchival,

#[error("rule error: {0}")]
DifficultyError(#[from] RuleError),

#[error("header of {0} was not found")]
NoHeader(Hash),
}

pub type ArchivalResult<T> = std::result::Result<T, ArchivalError>;
5 changes: 4 additions & 1 deletion consensus/core/src/errors/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use kaspa_hashes::Hash;
use thiserror::Error;

use super::{difficulty::DifficultyError, sync::SyncManagerError, traversal::TraversalError};
use super::{archival::ArchivalError, difficulty::DifficultyError, sync::SyncManagerError, traversal::TraversalError};

#[derive(Error, Debug, Clone)]
pub enum ConsensusError {
Expand Down Expand Up @@ -32,6 +32,9 @@ pub enum ConsensusError {
#[error("difficulty error: {0}")]
DifficultyError(#[from] DifficultyError),

#[error("archival error: {0}")]
ArchivalError(#[from] ArchivalError),

#[error("{0}")]
General(&'static str),

Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/errors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod archival;
pub mod block;
pub mod coinbase;
pub mod config;
Expand Down
7 changes: 7 additions & 0 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern crate self as consensus_core;
use std::collections::{HashMap, HashSet};
use std::hash::{BuildHasher, Hasher};

use block::Block;
pub use kaspa_hashes::Hash;

pub mod acceptance_data;
Expand Down Expand Up @@ -131,6 +132,12 @@ impl BuildHasher for BlockHasher {

pub type BlockLevel = u8;

#[derive(Clone)]
pub struct ArchivalBlock {
pub block: Block,
pub child: Option<Hash>,
}

#[cfg(test)]
mod tests {
use super::BlockHasher;
Expand Down
32 changes: 31 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
ProcessingCounters,
},
processes::{
archival::ArchivalManager,
ghostdag::ordering::SortableBlock,
window::{WindowManager, WindowType},
},
Expand Down Expand Up @@ -71,7 +72,7 @@ use kaspa_consensus_core::{
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath, HashMapCustomHasher,
ArchivalBlock, BlockHashSet, BlueWorkType, ChainPath, HashMapCustomHasher,
};
use kaspa_consensus_notify::root::ConsensusNotificationRoot;

Expand Down Expand Up @@ -120,6 +121,8 @@ pub struct Consensus {
pub(super) virtual_processor: Arc<VirtualStateProcessor>,
pub(super) pruning_processor: Arc<PruningProcessor>,

archival_manager: ArchivalManager,

// Storage
pub(super) storage: Arc<ConsensusStorage>,

Expand Down Expand Up @@ -290,6 +293,24 @@ impl Consensus {
virtual_processor.process_genesis();
}

let archival_manager = ArchivalManager::new(
params.max_block_level,
config.params.genesis.hash,
config.is_archival,
config.params.crescendo_activation,
storage.clone(),
Arc::new(rayon::ThreadPoolBuilder::new()
.num_threads(0) // TODO: set the number of threads
.thread_name(|i| format!("archival-pool-{i}"))
.build()
.unwrap()),
);

// TODO: Don't run this unless we pass some compile or runtime flag.
if std::env::var("CHECK_PRUNING_WINDOW_ROOTS_CONSISTENCY").is_ok() {
archival_manager.check_pruning_window_roots_consistency();
}

let this = Self {
db,
block_sender: sender,
Expand All @@ -305,6 +326,7 @@ impl Consensus {
config,
creation_timestamp,
is_consensus_exiting,
archival_manager,
};

// Run database upgrades if any
Expand Down Expand Up @@ -1135,4 +1157,12 @@ impl ConsensusApi for Consensus {
fn finality_point(&self) -> Hash {
self.virtual_processor.virtual_finality_point(&self.lkg_virtual_state.load().ghostdag_data, self.pruning_point())
}

fn get_pruning_window_roots(&self) -> Vec<(u64, Vec<Hash>)> {
self.archival_manager.get_pruning_window_roots()
}

fn add_archival_blocks(&self, blocks: Vec<ArchivalBlock>) -> ConsensusResult<()> {
Ok(self.archival_manager.add_archival_blocks(blocks)?)
}
}
9 changes: 9 additions & 0 deletions consensus/src/consensus/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
pruning::DbPruningStore,
pruning_samples::DbPruningSamplesStore,
pruning_utxoset::PruningUtxosetStores,
pruning_window_root::DbPruningWindowRootStore,
reachability::{DbReachabilityStore, ReachabilityData},
relations::DbRelationsStore,
selected_chain::DbSelectedChainStore,
Expand Down Expand Up @@ -68,6 +69,9 @@ pub struct ConsensusStorage {
pub block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
pub block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Archival
pub pruning_window_root_store: Arc<RwLock<DbPruningWindowRootStore>>,

// "Last Known Good" caches
/// The "last known good" virtual state. To be used by any logic which does not want to wait
/// for a possible virtual state write to complete but can rather settle with the last known state
Expand Down Expand Up @@ -167,6 +171,7 @@ impl ConsensusStorage {
let transactions_builder = PolicyBuilder::new().bytes_budget(transactions_budget).tracked_bytes();
let acceptance_data_builder = PolicyBuilder::new().bytes_budget(acceptance_data_budget).tracked_bytes();
let past_pruning_points_builder = PolicyBuilder::new().max_items(1024).untracked();
let pruning_window_root_builder = PolicyBuilder::new().max_items(1024).untracked();

// TODO: consider tracking UtxoDiff byte sizes more accurately including the exact size of ScriptPublicKey

Expand Down Expand Up @@ -233,6 +238,9 @@ impl ConsensusStorage {
let virtual_stores =
Arc::new(RwLock::new(VirtualStores::new(db.clone(), lkg_virtual_state.clone(), utxo_set_builder.build())));

let pruning_window_root_store =
Arc::new(RwLock::new(DbPruningWindowRootStore::new(db.clone(), pruning_window_root_builder.build())));

// Ensure that reachability stores are initialized
reachability::init(reachability_store.write().deref_mut()).unwrap();
relations::init(reachability_relations_store.write().deref_mut());
Expand Down Expand Up @@ -262,6 +270,7 @@ impl ConsensusStorage {
block_window_cache_for_difficulty,
block_window_cache_for_past_median_time,
lkg_virtual_state,
pruning_window_root_store,
})
}
}
1 change: 1 addition & 0 deletions consensus/src/model/stores/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod past_pruning_points;
pub mod pruning;
pub mod pruning_samples;
pub mod pruning_utxoset;
pub mod pruning_window_root;
pub mod reachability;
pub mod relations;
pub mod selected_chain;
Expand Down
92 changes: 92 additions & 0 deletions consensus/src/model/stores/pruning_window_root.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use kaspa_consensus_core::blockhash::BlockHashes;
use kaspa_consensus_core::BlockHasher;
use kaspa_database::registry::DatabaseStorePrefixes;
use parking_lot::{RwLock, RwLockWriteGuard};
use rocksdb::WriteBatch;
use std::sync::Arc;

use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter};
use kaspa_database::prelude::{CachePolicy, DB};
use kaspa_database::prelude::{StoreError, StoreResult};
use kaspa_hashes::Hash;

/// Reader API for `PruningWindowRootStore`.
pub trait PruningWindowRootStoreReader {
fn get(&self, pruning_point: Hash) -> StoreResult<Vec<Hash>>;
fn has(&self, pruning_point: Hash) -> StoreResult<bool>;
}

/// Write API for `PruningWindowRootStore`. The set function is deliberately `mut`
/// since pruning window root is not append-only and thus needs to be guarded.
/// TODO: can be optimized to avoid the locking if needed.
pub trait PruningWindowRootStore: PruningWindowRootStoreReader {
fn set(&mut self, pruning_point: Hash, roots: Vec<Hash>) -> StoreResult<()>;
fn delete(&self, pruning_point: Hash) -> Result<(), StoreError>;
}

/// A DB + cache implementation of `PruningWindowRootStore` trait, with concurrent readers support.
#[derive(Clone)]
pub struct DbPruningWindowRootStore {
db: Arc<DB>,
access: CachedDbAccess<Hash, BlockHashes, BlockHasher>,
}

impl DbPruningWindowRootStore {
pub fn new(db: Arc<DB>, cache_policy: CachePolicy) -> Self {
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_policy, DatabaseStorePrefixes::PruningWindowRoot.into()) }
}

pub fn clone_with_new_cache(&self, cache_policy: CachePolicy) -> Self {
Self::new(Arc::clone(&self.db), cache_policy)
}

pub fn set_batch(&mut self, batch: &mut WriteBatch, pruning_point: Hash, roots: Vec<Hash>) -> StoreResult<()> {
self.access.write(BatchDbWriter::new(batch), pruning_point, roots.into())
}

pub fn delete_batch(&self, batch: &mut WriteBatch, pruning_point: Hash) -> Result<(), StoreError> {
self.access.delete(BatchDbWriter::new(batch), pruning_point)
}
}

pub trait PruningWindowRootStoreBatchExtensions {
fn set_batch(
&self,
batch: &mut WriteBatch,
pruning_point: Hash,
roots: Vec<Hash>,
) -> Result<RwLockWriteGuard<DbPruningWindowRootStore>, StoreError>;
}

impl PruningWindowRootStoreBatchExtensions for Arc<RwLock<DbPruningWindowRootStore>> {
fn set_batch(
&self,
batch: &mut WriteBatch,
pruning_point: Hash,
roots: Vec<Hash>,
) -> Result<RwLockWriteGuard<DbPruningWindowRootStore>, StoreError> {
let write_guard = self.write();
write_guard.access.write(BatchDbWriter::new(batch), pruning_point, roots.into())?;
Ok(write_guard)
}
}

impl PruningWindowRootStoreReader for DbPruningWindowRootStore {
fn get(&self, pruning_point: Hash) -> StoreResult<Vec<Hash>> {
Ok((*self.access.read(pruning_point)?).clone())
}

fn has(&self, pruning_point: Hash) -> StoreResult<bool> {
self.access.has(pruning_point)
}
}

impl PruningWindowRootStore for DbPruningWindowRootStore {
fn set(&mut self, pruning_point: Hash, roots: Vec<Hash>) -> StoreResult<()> {
self.access.write(DirectDbWriter::new(&self.db), pruning_point, roots.into())
}

fn delete(&self, pruning_point: Hash) -> Result<(), StoreError> {
self.access.delete(DirectDbWriter::new(&self.db), pruning_point)
}
}
Loading
Loading