Skip to content

Commit

Permalink
Merge branch 'kpop/backup/disk_usage' into 'master'
Browse files Browse the repository at this point in the history
chore(backup): periodically log the disk stats

Currently, we log the disk usage only after *successfully* running `ic-replay`.
When `ic-replay` takes a long time to finish, the `disk_usage` graph on the `backup` grafana dashboard shows stale information. For example, the graph below shows that current disk usage is at 90%, but actually it's at 80%:
![Screenshot_2024-05-07_at_14.10.48](/uploads/d32eb81163b5979a5017dc52d072f229/Screenshot_2024-05-07_at_14.10.48.png)

Also, made some public functions private 

See merge request dfinity-lab/public/ic!19147
  • Loading branch information
kpop-dfinity committed May 7, 2024
2 parents c2ddaf4 + 88a9313 commit e9f99bc
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 46 deletions.
28 changes: 12 additions & 16 deletions rs/backup/src/backup_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl BackupHelper {
self.root_dir.join("ic_registry_local_store")
}

pub fn data_dir(&self) -> PathBuf {
pub(crate) fn data_dir(&self) -> PathBuf {
self.root_dir.join(format!("data/{}", self.subnet_id))
}

Expand Down Expand Up @@ -315,7 +315,7 @@ impl BackupHelper {
}
}

pub fn sync_files(&self, nodes: &[IpAddr]) {
pub(crate) fn sync_files(&self, nodes: &[IpAddr]) {
let start_time = Instant::now();
let total_succeeded: usize = nodes
.iter()
Expand All @@ -331,13 +331,13 @@ impl BackupHelper {
}
}

pub fn create_spool_dir(&self) {
pub(crate) fn create_spool_dir(&self) {
if !self.spool_dir().exists() {
create_dir_all(self.spool_dir()).expect("Failure creating a directory");
}
}

pub fn collect_nodes(&self, num_nodes: usize) -> Result<Vec<IpAddr>, String> {
pub(crate) fn collect_nodes(&self, num_nodes: usize) -> Result<Vec<IpAddr>, String> {
let mut shuf_nodes = self.collect_all_subnet_nodes()?;
shuf_nodes.shuffle(&mut thread_rng());
Ok(shuf_nodes
Expand Down Expand Up @@ -380,11 +380,11 @@ impl BackupHelper {
.collect()
}

pub fn last_state_checkpoint(&self) -> u64 {
pub(crate) fn last_state_checkpoint(&self) -> u64 {
last_checkpoint(&self.state_dir())
}

pub fn replay(&self) {
pub(crate) fn replay(&self) {
let start_height = self.last_state_checkpoint();
let start_time = Instant::now();
let mut current_replica_version =
Expand Down Expand Up @@ -529,7 +529,7 @@ impl BackupHelper {
Ok(())
}

pub fn retrieve_spool_top_height(&self) -> u64 {
pub(crate) fn retrieve_spool_top_height(&self) -> u64 {
let mut spool_top_height = 0;
let spool_dirs = collect_spool_dirs(&self.log, self.spool_dir());
for spool_dir in spool_dirs {
Expand Down Expand Up @@ -658,7 +658,7 @@ impl BackupHelper {
self.log_disk_stats()
}

fn log_disk_stats(&self) -> Result<(), String> {
pub(crate) fn log_disk_stats(&self) -> Result<(), String> {
let mut stats = Vec::new();
for (dir, threshold) in &[
(&self.root_dir, self.hot_disk_resource_threshold_percentage),
Expand All @@ -669,18 +669,14 @@ impl BackupHelper {
] {
let space = self.get_disk_stats(dir, *threshold, DiskStats::Space)?;
let inodes = self.get_disk_stats(dir, *threshold, DiskStats::Inodes)?;
debug!(
self.log,
"[{:?}] Space: {}% Inodes: {}%", dir, space, inodes
);
stats.push((dir.as_path(), space, inodes));
}
self.notification_client
.push_metrics_disk_stats(stats.as_slice());
Ok(())
}

pub fn need_cold_storage_move(&self) -> Result<bool, String> {
pub(crate) fn need_cold_storage_move(&self) -> Result<bool, String> {
let _guard = self
.artifacts_guard
.lock()
Expand All @@ -689,7 +685,7 @@ impl BackupHelper {
Ok(spool_dirs.len() > self.versions_hot)
}

pub fn do_move_cold_storage(&self) -> Result<(), String> {
pub(crate) fn do_move_cold_storage(&self) -> Result<(), String> {
let max_height = self.cold_store_artifacts()?;
self.cold_store_states(max_height)?;
info!(
Expand Down Expand Up @@ -904,7 +900,7 @@ impl BackupHelper {
}
}

pub fn ls_path(log: &Logger, dir: &Path) -> Result<(), String> {
pub(crate) fn ls_path(log: &Logger, dir: &Path) -> Result<(), String> {
let mut cmd = Command::new("ls");
cmd.arg(dir);
debug!(log, "Will execute: {:?}", cmd);
Expand Down Expand Up @@ -1004,7 +1000,7 @@ fn collect_spool_dirs(log: &Logger, spool_dir: PathBuf) -> Vec<DirEntry> {
}

/// Searches in spool a directory that contains a block finishing the last call to ic-replay.
pub fn retrieve_replica_version_last_replayed(
pub(crate) fn retrieve_replica_version_last_replayed(
log: &Logger,
spool_dir: PathBuf,
state_dir: PathBuf,
Expand Down
35 changes: 19 additions & 16 deletions rs/backup/src/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ struct SubnetBackup {
}

pub struct BackupManager {
pub root_dir: PathBuf,
pub local_store: Arc<LocalStoreImpl>,
pub registry_client: Arc<RegistryClientImpl>,
pub registry_replicator: Arc<RegistryReplicator>,
_local_store: Arc<LocalStoreImpl>,
_registry_client: Arc<RegistryClientImpl>,
_registry_replicator: Arc<RegistryReplicator>,
subnet_backups: Vec<SubnetBackup>,
pub log: Logger,
log: Logger,
}

impl BackupManager {
Expand Down Expand Up @@ -160,10 +159,9 @@ impl BackupManager {
}

BackupManager {
root_dir: config.root_dir,
local_store,
registry_client,
registry_replicator, // it will be used as a background task, so keep it
_local_store: local_store,
_registry_client: registry_client,
_registry_replicator: registry_replicator, // it will be used as a background task, so keep it
subnet_backups: backups,
log,
}
Expand Down Expand Up @@ -393,15 +391,20 @@ impl BackupManager {

loop {
let mut progress = Vec::new();
for i in 0..size {
let b = &self.subnet_backups[i].backup_helper;
let last_block = b.retrieve_spool_top_height();
let last_cp = b.last_state_checkpoint();
let subnet = &b.subnet_id.to_string()[..5];
for backup in &self.subnet_backups {
let backup_helper = &backup.backup_helper;
let last_block = backup_helper.retrieve_spool_top_height();
let last_cp = backup_helper.last_state_checkpoint();
let subnet = &backup_helper.subnet_id.to_string()[..5];
progress.push(format!("{}: {}/{}", subnet, last_cp, last_block));

b.notification_client.push_metrics_synced_height(last_block);
b.notification_client.push_metrics_restored_height(last_cp);
backup_helper
.notification_client
.push_metrics_synced_height(last_block);
backup_helper
.notification_client
.push_metrics_restored_height(last_cp);
let _ = backup_helper.log_disk_stats();
}
info!(self.log, "Replay/Sync - {}", progress.join(", "));

Expand Down
4 changes: 2 additions & 2 deletions rs/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ pub mod backup_helper;
pub mod backup_manager;
pub mod cmd;
pub mod config;
pub mod notification_client;
pub mod util;
mod notification_client;
mod util;
8 changes: 4 additions & 4 deletions rs/backup/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@ use serde::{de::Error, Deserialize, Deserializer, Serializer};
use std::future::Future;
use tokio::runtime::Runtime;

pub fn block_on<F: Future>(f: F) -> F::Output {
pub(crate) fn block_on<F: Future>(f: F) -> F::Output {
let rt = Runtime::new().unwrap_or_else(|err| panic!("Could not create tokio runtime: {}", err));
rt.block_on(f)
}

pub fn sleep_secs(secs: u64) {
pub(crate) fn sleep_secs(secs: u64) {
let sleep_duration = std::time::Duration::from_secs(secs);
std::thread::sleep(sleep_duration);
}

pub fn replica_from_string<'de, D>(deserializer: D) -> Result<ReplicaVersion, D::Error>
pub(crate) fn replica_from_string<'de, D>(deserializer: D) -> Result<ReplicaVersion, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
ReplicaVersion::try_from(s).map_err(D::Error::custom)
}

pub fn replica_to_string<S>(ver: &ReplicaVersion, serializer: S) -> Result<S::Ok, S::Error>
pub(crate) fn replica_to_string<S>(ver: &ReplicaVersion, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
Expand Down
13 changes: 5 additions & 8 deletions rs/tests/consensus/backup_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ Success::
end::catalog[] */
use anyhow::Result;
use ic_backup::{
backup_helper::{last_checkpoint, ls_path},
backup_helper::last_checkpoint,
config::{ColdStorage, Config, SubnetConfig},
util::sleep_secs,
};
use ic_base_types::SubnetId;
use ic_recovery::file_sync_helper::{download_binary, write_file};
Expand Down Expand Up @@ -266,7 +265,7 @@ pub fn test(env: TestEnv) {
info!(log, "A checkpoint has been archived");
break;
}
sleep_secs(5);
std::thread::sleep(std::time::Duration::from_secs(5));
}

info!(log, "Proposal to upgrade the subnet replica version");
Expand Down Expand Up @@ -316,7 +315,7 @@ pub fn test(env: TestEnv) {
info!(log, "New version was successfully backed up and archived");
break;
}
sleep_secs(5);
std::thread::sleep(std::time::Duration::from_secs(5));
}

info!(
Expand Down Expand Up @@ -384,7 +383,7 @@ pub fn test(env: TestEnv) {
if hash_mismatch {
break;
}
sleep_secs(10);
std::thread::sleep(std::time::Duration::from_secs(10));
}

info!(log, "Kill child process");
Expand Down Expand Up @@ -430,13 +429,12 @@ fn modify_byte_in_file(file_path: PathBuf) -> std::io::Result<()> {

fn cold_storage_exists(log: &Logger, cold_storage_dir: PathBuf) -> bool {
for _ in 0..12 {
_ = ls_path(log, &cold_storage_dir);
if dir_exists_and_have_file(log, &cold_storage_dir.join("states"))
&& dir_exists_and_have_file(log, &cold_storage_dir.join("artifacts"))
{
return true;
}
sleep_secs(10);
std::thread::sleep(std::time::Duration::from_secs(10));
}
false
}
Expand All @@ -448,7 +446,6 @@ fn dir_exists_and_have_file(log: &Logger, dir: &PathBuf) -> bool {
return false;
}
debug!(log, "Directory exists!");
_ = ls_path(log, dir);
let have_file = fs::read_dir(dir)
.expect("Should be able to read existing directory")
.next()
Expand Down

0 comments on commit e9f99bc

Please sign in to comment.