diff --git a/.gitignore b/.gitignore index 36f27c8d3..00fbb7737 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,7 @@ ledger_db/ state_merkle_db/ .etc .movement -.movement-* +.movement* .idea .vscode .data diff --git a/Cargo.lock b/Cargo.lock index 33f45cc49..bcc2d63b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12129,6 +12129,8 @@ dependencies = [ "rocksdb", "serde_json", "sha2 0.10.8", + "syncador", + "syncup", "tokio", "tokio-stream", "tonic 0.12.3", @@ -16639,6 +16641,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tracing", + "tracing-subscriber 0.3.18", "uuid", ] diff --git a/docker/compose/movement-full-node/docker-compose.follower.backup.yml b/docker/compose/movement-full-node/docker-compose.follower.backup.yml new file mode 100644 index 000000000..6bf171180 --- /dev/null +++ b/docker/compose/movement-full-node/docker-compose.follower.backup.yml @@ -0,0 +1,121 @@ +services: + setup: + image: ghcr.io/movementlabsxyz/movement-full-node-setup:${CONTAINER_REV} + container_name: setup + environment: + DOT_MOVEMENT_PATH: /.movement + # needs to have a connection to the movement-celestia-da-light-node + MOVEMENT_DA_LIGHT_NODE_CONNECTION_PROTOCOL: ${MOVEMENT_DA_LIGHT_NODE_CONNECTION_PROTOCOL:?MOVEMENT_DA_LIGHT_NODE_CONNECTION_PROTOCOL is not set} + MOVEMENT_DA_LIGHT_NODE_CONNECTION_HOSTNAME: ${MOVEMENT_DA_LIGHT_NODE_CONNECTION_HOSTNAME:?MOVEMENT_DA_LIGHT_NODE_CONNECTION_HOSTNAME is not set} + MOVEMENT_DA_LIGHT_NODE_CONNECTION_PORT: ${MOVEMENT_DA_LIGHT_NODE_CONNECTION_PORT:?MOVEMENT_DA_LIGHT_NODE_CONNECTION_PORT is not set} + INDEXER_PROCESSOR_POSTGRES_CONNECTION_STRING: postgres://postgres:password@postgres:5432/postgres + AWS_REGION: ${AWS_REGION:?AWS_REGION is not set} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} + MOVEMENT_SYNC: ${MOVEMENT_SYNC} #:?MOVEMENT_SYNC is not set} + MAYBE_RUN_LOCAL: "false" + MOVEMENT_DA_LIGHT_NODE_HTTP1: ${MOVEMENT_DA_LIGHT_NODE_HTTP1} + RUST_LOG: info,aws_sdk_s3=debug + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + # mount if exists + - ~/.aws/:/root/.aws:ro + + healthcheck: + test: [ "CMD-SHELL", "echo 'health check'" ] + retries: 10 + interval: 10s + timeout: 5s + + # turn off underlying da light nodes + celestia-light-node: + image: busybox + container_name: celestia-light-node + command: sleep infinity + environment: + - DOT_MOVEMENT_PATH=/.movement + - CELESTIA_RPC_ADDRESS=celestia-light-node:26657 + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + depends_on: + setup: + condition: service_healthy + healthcheck: + test: [ "CMD-SHELL", "echo 'health check'" ] + retries: 3 + start_period: 3s + restart: on-failure:3 + + # turn off celestia-light-node-synced + celestia-light-node-synced: + image: busybox + container_name: celestia-light-node-synced + command: echo "No sync check when following." + environment: + - DOT_MOVEMENT_PATH=/.movement + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + depends_on: + celestia-light-node: + condition: service_healthy + + # turn off movement-celestia-da-light-node + movement-celestia-da-light-node: + image: busybox + container_name: movement-celestia-da-light-node + command: sleep infinity + healthcheck: + test: [ "CMD-SHELL", "echo 'health check'" ] + retries: 3 + start_period: 3s + + # turn off movement-faucet-service + movement-faucet-service: + image: busybox + container_name: movement-faucet-service + command: sleep infinity + healthcheck: + test: [ "CMD-SHELL", "echo 'health check'" ] + retries: 3 + start_period: 3s + + movement-snapshot-node: + image: ghcr.io/movementlabsxyz/movement-full-node:${CONTAINER_REV} + container_name: movement-snapshot-node + environment: + - DOT_MOVEMENT_PATH=/.movement + - MOVEMENT_TIMING=info + - SUZUKA_TIMING_LOG=/.movement/movement-timing.log + - RUST_BACKTRACE=1 + - SYNC_PATTERN=${SYNC_PATTERN} + - SYNC_BUCKET=${SYNC_BUCKET} + - SYNC_ARCHIVE=${SYNC_ARCHIVE} + entrypoint: /bin/sh + command: | + sh -c ' + while true; do + timeout 60s movement-full-node run || echo "Node run timed out" + echo "Taking snapshot..." + movement-full-node backup save-and-push $SYNC_BUCKET $SYNC_PATTERN $SYNC_ARCHIVE || echo "Snapshot failed" + done + ' + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + depends_on: + - movement-celestia-da-light-node + ports: + - "30731:30731" + - "30734:30734" + healthcheck: + test: [ "CMD-SHELL", "nc -zv 0.0.0.0 39731" ] + retries: 10 + interval: 10s + timeout: 5s + restart: on-failure:5 + + movement-full-node: + image: ghcr.io/movementlabsxyz/movement-full-node:${CONTAINER_REV} + container_name: movement-full-node + depends_on: + movement-snapshot-node: + condition: service_healthy \ No newline at end of file diff --git a/docker/compose/movement-full-node/docker-compose.follower.yml b/docker/compose/movement-full-node/docker-compose.follower.yml index 9f71c34d8..93467ebdd 100644 --- a/docker/compose/movement-full-node/docker-compose.follower.yml +++ b/docker/compose/movement-full-node/docker-compose.follower.yml @@ -12,7 +12,7 @@ services: AWS_REGION: ${AWS_REGION:?AWS_REGION is not set} AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} - MOVEMENT_SYNC: ${MOVEMENT_SYNC:?MOVEMENT_SYNC is not set} + MOVEMENT_SYNC: ${MOVEMENT_SYNC} MAYBE_RUN_LOCAL: "false" RUST_LOG: info,aws_sdk_s3=debug volumes: diff --git a/docker/compose/movement-full-node/docker-compose.leader.yml b/docker/compose/movement-full-node/docker-compose.leader.yml index 4ec20eaa8..36d68e6d5 100644 --- a/docker/compose/movement-full-node/docker-compose.leader.yml +++ b/docker/compose/movement-full-node/docker-compose.leader.yml @@ -1,7 +1,7 @@ services: setup: environment: - MOVEMENT_SYNC: ${MOVEMENT_SYNC:?MOVEMENT_SYNC is not set} + MOVEMENT_SYNC: ${MOVEMENT_SYNC} AWS_REGION: ${AWS_REGION:?AWS_REGION is not set} AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} diff --git a/docker/compose/movement-full-node/snapshot/docker-compose.push.yml b/docker/compose/movement-full-node/snapshot/docker-compose.push.yml new file mode 100644 index 000000000..da8f6f48a --- /dev/null +++ b/docker/compose/movement-full-node/snapshot/docker-compose.push.yml @@ -0,0 +1,20 @@ +version: "3.8" + +services: + + movement-backup-db: + image: ghcr.io/movementlabsxyz/movement-full-node:${CONTAINER_REV} + container_name: movement-backup-db + environment: + - DOT_MOVEMENT_PATH=/.movement + - MOVEMENT_TIMING=info + - SUZUKA_TIMING_LOG=/.movement/movement-timing.log + - RUST_BACKTRACE=1 + - AWS_REGION=us-west-1 + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - SYNC_BUCKET=${SYNC_BUCKET} + - SYNC_ARCHIVE=${SYNC_BUCKET} + command: backup push $SYNC_BUCKET $SYNC_ARCHIVE + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement \ No newline at end of file diff --git a/docker/compose/movement-full-node/snapshot/docker-compose.restore.yml b/docker/compose/movement-full-node/snapshot/docker-compose.restore.yml new file mode 100644 index 000000000..7fa3a8e90 --- /dev/null +++ b/docker/compose/movement-full-node/snapshot/docker-compose.restore.yml @@ -0,0 +1,20 @@ +version: "3.8" + +services: + + movement-restore-db: + image: ghcr.io/movementlabsxyz/movement-full-node:${CONTAINER_REV} + container_name: movement-restore-db + environment: + - DOT_MOVEMENT_PATH=/.movement + - MOVEMENT_TIMING=info + - SUZUKA_TIMING_LOG=/.movement/movement-timing.log + - RUST_BACKTRACE=1 + - AWS_REGION=us-west-1 + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - SYNC_BUCKET=${SYNC_BUCKET} + - SYNC_PATTERN=${SYNC_PATTERN} + command: backup restore $SYNC_BUCKET $SYNC_PATTERN + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement \ No newline at end of file diff --git a/docker/compose/movement-full-node/snapshot/docker-compose.save_and_push.yml b/docker/compose/movement-full-node/snapshot/docker-compose.save_and_push.yml new file mode 100644 index 000000000..d7dd5c554 --- /dev/null +++ b/docker/compose/movement-full-node/snapshot/docker-compose.save_and_push.yml @@ -0,0 +1,21 @@ +version: "3.8" + +services: + + movement-backup-db: + image: ghcr.io/movementlabsxyz/movement-full-node:${CONTAINER_REV} + container_name: movement-backup-db + environment: + - DOT_MOVEMENT_PATH=/.movement + - MOVEMENT_TIMING=info + - SUZUKA_TIMING_LOG=/.movement/movement-timing.log + - RUST_BACKTRACE=1 + - AWS_REGION=us-west-1 + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - SYNC_PATTERN=${SYNC_PATTERN} + - SYNC_BUCKET=${SYNC_BUCKET} + - SYNC_ARCHIVE=${SYNC_BUCKET} + command: backup save-and-push $SYNC_BUCKET $SYNC_PATTERN $SYNC_ARCHIVE + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement \ No newline at end of file diff --git a/docker/compose/movement-full-node/snapshot/docker-compose.save_db.yml b/docker/compose/movement-full-node/snapshot/docker-compose.save_db.yml new file mode 100644 index 000000000..8957fee5c --- /dev/null +++ b/docker/compose/movement-full-node/snapshot/docker-compose.save_db.yml @@ -0,0 +1,20 @@ +version: "3.8" + +services: + + movement-backup-db: + image: ghcr.io/movementlabsxyz/movement-full-node:${CONTAINER_REV} + container_name: movement-backup-db + environment: + - DOT_MOVEMENT_PATH=/.movement + - MOVEMENT_TIMING=info + - SUZUKA_TIMING_LOG=/.movement/movement-timing.log + - RUST_BACKTRACE=1 + - AWS_REGION=us-west-1 + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - MOVEMENT_SYNC=${MOVEMENT_SYNC} + - SYNC_PATTERN=${SYNC_PATTERN} + command: backup save $SYNC_PATTERN + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement \ No newline at end of file diff --git a/networks/movement/movement-full-node/Cargo.toml b/networks/movement/movement-full-node/Cargo.toml index 75098d6d7..f1731bbbb 100644 --- a/networks/movement/movement-full-node/Cargo.toml +++ b/networks/movement/movement-full-node/Cargo.toml @@ -45,6 +45,8 @@ movement-da-light-node-client = { workspace = true} aptos-framework-elsa-to-biarritz-rc1-migration = { workspace = true } movement-signer = { workspace = true } movement-signer-loader = { workspace = true } +syncador = { workspace = true } +syncup = { workspace = true } [features] default = [] diff --git a/networks/movement/movement-full-node/src/backup/mod.rs b/networks/movement/movement-full-node/src/backup/mod.rs new file mode 100644 index 000000000..767f42fd2 --- /dev/null +++ b/networks/movement/movement-full-node/src/backup/mod.rs @@ -0,0 +1,264 @@ +use clap::Parser; +use clap::Subcommand; +use movement_config::Config; +use std::path::PathBuf; +use syncador::PullOperations; +use syncador::PushOperations; +use syncup::Syncupable; + +#[derive(Subcommand, Debug)] +#[clap(rename_all = "kebab-case", about = "Commands for syncing")] +pub enum Backup { + Save(SaveDbParam), + Push(PushParam), + SaveAndPush(SaveAndPush), + Restore(RestoreParam), +} + +impl Backup { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + match self { + Backup::Save(param) => param.execute().await, + Backup::Push(param) => param.execute().await, + Backup::Restore(param) => param.execute().await, + Backup::SaveAndPush(param) => param.execute().await, + } + } +} + +#[derive(Debug, Parser, Clone)] +#[clap(rename_all = "kebab-case", about = "Save the db using db_sync pattern in root_dir.")] +pub struct SaveDbParam { + #[clap(default_value = "{maptos,maptos-storage,movement-da-db}/**", value_name = "DB PATTERN")] + pub db_sync: String, + #[clap(value_name = "ROOT DIRECTORY")] + pub root_dir: Option, +} + +impl SaveDbParam { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + let root_path = get_root_path(self.root_dir.as_ref())?; + + tracing::info!("Save db with parameters: sync:{} root dir:{:?}", self.db_sync, root_path); + + let archive_pipe = syncador::backend::pipeline::push::Pipeline::new(vec![ + Box::new(syncador::backend::glob::file::FileGlob::try_new( + &self.db_sync.clone(), + root_path.clone(), + )?), + Box::new(syncador::backend::archive::gzip::push::Push::new(root_path)), + ]); + + match archive_pipe.push(syncador::Package::null()).await { + Ok(package) => { + tracing::info!("Backup done in file: {:?}", package); + } + Err(err) => { + tracing::warn!("Error during backup: {:?}", err); + } + } + + Ok(()) + } +} + +#[derive(Debug, Parser, Clone)] +#[clap(rename_all = "kebab-case", about = "Push the archived db to the bucket")] +pub struct PushParam { + #[clap(default_value = "follower-test-ci-backup", value_name = "BUCKET NAME")] + pub bucket: String, + #[clap(default_value = "0.tar.gz", value_name = "ARCHIVE FILENAME")] + pub archive_file: String, + #[clap(value_name = "ROOT DIRECTORY")] + pub root_dir: Option, +} + +impl PushParam { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + //Load node config. + let dot_movement = dot_movement::DotMovement::try_from_env()?; + let config = dot_movement.try_get_config_from_json::()?; + let application_id = config.syncing.try_application_id()?; + let syncer_id = config.syncing.try_syncer_id()?; + let root_path = get_root_path(self.root_dir.as_ref())?; + + tracing::info!( + "Push db with parameters: bucket:{} archive_file:{} root dir:{:?}", + self.bucket, + self.archive_file, + root_path + ); + + let s3_push = syncador::backend::s3::shared_bucket::create_push_with_load_from_env( + self.bucket.clone(), + syncador::backend::s3::shared_bucket::metadata::Metadata::default() + .with_application_id(application_id) + .with_syncer_id(syncer_id), + ) + .await?; + + let push_pipe = syncador::backend::pipeline::push::Pipeline::new(vec![Box::new(s3_push)]); + + let archive_file = root_path.join(&self.archive_file); + + let package = syncador::Package(vec![syncador::PackageElement { + sync_files: vec![archive_file], + root_dir: root_path, + }]); + + match push_pipe.push(package).await { + Ok(package) => { + tracing::info!("Push done {:?}", package); + } + Err(err) => { + tracing::warn!("Error during archive push: {:?}", err); + } + } + + Ok(()) + } +} + +#[derive(Debug, Parser, Clone)] +#[clap( + rename_all = "kebab-case", + about = "Save the db using db_sync pattern in root_dir then push it to the bucket." +)] +pub struct SaveAndPush { + #[clap(default_value = "follower-test-ci-backup", value_name = "BUCKET NAME")] + pub bucket: String, + #[clap(default_value = "{maptos,maptos-storage,movement-da-db}/**", value_name = "DB PATTERN")] + pub db_sync: String, + #[clap(default_value = "0.tar.gz", value_name = "ARCHIVE FILENAME")] + pub archive_file: String, + #[clap(value_name = "ROOT DIRECTORY")] + pub root_dir: Option, +} + +impl SaveAndPush { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + let root_path = get_root_path(self.root_dir.as_ref())?; + + let dot_movement = dot_movement::DotMovement::try_from_env()?; + let config = dot_movement.try_get_config_from_json::()?; + let application_id = config.syncing.try_application_id()?; + let syncer_id = config.syncing.try_syncer_id()?; + let s3_push = syncador::backend::s3::shared_bucket::create_push_with_load_from_env( + self.bucket.clone(), + syncador::backend::s3::shared_bucket::metadata::Metadata::default() + .with_application_id(application_id) + .with_syncer_id(syncer_id), + ) + .await?; + + tracing::info!( + "Save and Push db with parameters: bucket:{} sync:{} archive_file:{} root dir:{:?}", + self.bucket, + self.db_sync, + self.archive_file, + root_path + ); + + let push_pipe = syncador::backend::pipeline::push::Pipeline::new(vec![ + Box::new(syncador::backend::glob::file::FileGlob::try_new( + &self.db_sync.clone(), + root_path.clone(), + )?), + Box::new(syncador::backend::archive::gzip::push::Push::new(root_path)), + Box::new(s3_push), + ]); + + match push_pipe.push(syncador::Package::null()).await { + Ok(package) => { + tracing::info!("Backup done in file: {:?}", package); + } + Err(err) => { + tracing::warn!("Error during backup: {:?}", err); + } + } + + Ok(()) + } +} + +#[derive(Debug, Parser, Clone)] +#[clap( + rename_all = "kebab-case", + about = "Restore from the specified bucket in the root_dir. Db pattern is used to clean before the update." +)] +pub struct RestoreParam { + #[clap(default_value = "follower-test-ci-backup", value_name = "BUCKET NAME")] + pub bucket: String, + #[clap(default_value = "{maptos,maptos-storage,movement-da-db}/**", value_name = "DB PATTERN")] + pub db_sync: String, + #[clap(value_name = "ROOT DIRECTORY")] + pub root_dir: Option, +} + +impl RestoreParam { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + let root_path = get_root_path(self.root_dir.as_ref())?; + + //Load node config. + let dot_movement = dot_movement::DotMovement::try_from_env()?; + let config = dot_movement.try_get_or_create_config_from_json::()?; + let application_id = config.syncing.try_application_id()?; + let syncer_id = config.syncing.try_syncer_id()?; + + tracing::info!( + "Restore db with parameters: bucket:{} sync:{} root dir:{:?}", + self.bucket, + self.db_sync, + root_path + ); + + let s3_pull = syncador::backend::s3::shared_bucket::create_pull_with_load_from_env( + self.bucket.clone(), + syncador::backend::s3::shared_bucket::metadata::Metadata::default() + .with_application_id(application_id) + .with_syncer_id(syncer_id), + root_path.clone(), + ) + .await?; + + let push_pipe = syncador::backend::pipeline::pull::Pipeline::new(vec![ + Box::new(s3_pull), + Box::new(syncador::backend::clear::glob::pull::ClearGlob::try_new( + &self.db_sync, + root_path.clone(), + )?), + Box::new(syncador::backend::archive::gzip::pull::Pull::new(root_path.clone())), + ]); + + match push_pipe.pull(Some(syncador::Package::null())).await { + Ok(package) => { + tracing::info!("Files restored"); + } + Err(err) => { + tracing::warn!("Error during archive push: {:?}", err); + } + } + + Ok(()) + } +} + +fn get_root_path(initial_dir: Option<&String>) -> Result { + match initial_dir { + Some(path) => { + let path = std::path::Path::new(&path); + if path.exists() { + Ok(path.to_path_buf()) + } else { + let mut root_path = + std::env::current_dir().expect("Current working dir not defined."); + root_path.push(&path); + Ok(root_path) + } + } + None => { + let dot_movement = dot_movement::DotMovement::try_from_env()?; + Ok(dot_movement.get_path().to_path_buf()) + } + } +} \ No newline at end of file diff --git a/networks/movement/movement-full-node/src/lib.rs b/networks/movement/movement-full-node/src/lib.rs index 017997fb4..a0d89a08b 100644 --- a/networks/movement/movement-full-node/src/lib.rs +++ b/networks/movement/movement-full-node/src/lib.rs @@ -1,4 +1,5 @@ pub mod admin; +pub mod backup; pub mod common_args; pub mod da; pub mod node; @@ -19,6 +20,8 @@ pub enum MovementFullNode { State(state::State), #[clap(subcommand)] Da(da::Da), + #[clap(subcommand)] + Backup(backup::Backup), } impl MovementFullNode { @@ -28,6 +31,7 @@ impl MovementFullNode { Self::Run(run) => run.execute().await, Self::State(state) => state.execute().await, Self::Da(da) => da.execute().await, + Self::Backup(backup) => backup.execute().await, } } } diff --git a/networks/movement/setup/src/main.rs b/networks/movement/setup/src/main.rs index a36417a99..dbf7ddfdf 100644 --- a/networks/movement/setup/src/main.rs +++ b/networks/movement/setup/src/main.rs @@ -2,9 +2,6 @@ use anyhow::Context; use godfig::{backend::config_file::ConfigFile, Godfig}; use movement_config::Config; use movement_full_node_setup::{local::Local, MovementFullNodeSetupOperations}; -use std::future::Future; -use std::pin::Pin; -use syncup::SyncupOperations; use tokio::signal::unix::signal; use tokio::signal::unix::SignalKind; use tokio::sync::watch; @@ -52,7 +49,7 @@ async fn main() -> Result<(), anyhow::Error> { let godfig: Godfig = Godfig::new(ConfigFile::new(config_file), vec![]); // Apply all of the setup steps - let (anvil_join_handle, sync_task) = godfig + let anvil_join_handle = godfig .try_transaction_with_result(|config| async move { tracing::info!("Config option: {:?}", config); let config = config.unwrap_or_default(); @@ -61,29 +58,7 @@ async fn main() -> Result<(), anyhow::Error> { // set up anvil let (config, anvil_join_handle) = Local::default().setup(dot_movement, config).await?; - // Wrap the syncing_config in an Arc - // This may be overkill because cloning sync_config is cheap - let syncing_config = config.syncing.clone(); - - // set up sync - let sync_task: Pin> + Send>> = - if syncing_config.wants_movement_sync() { - let sync_task = syncing_config.syncup().await?; - Box::pin(async move { - match sync_task.await { - Ok(_) => info!("Sync task finished successfully."), - Err(err) => info!("Sync task failed: {:?}", err), - } - Ok(()) - }) - } else { - Box::pin(async { - info!("No sync task configured, skipping."); - futures::future::pending::>().await - }) - }; - - Ok((Some(config.clone()), (anvil_join_handle, sync_task))) + Ok((Some(config.clone()), anvil_join_handle)) }) .await?; @@ -98,11 +73,6 @@ async fn main() -> Result<(), anyhow::Error> { _ = stop_rx.changed() => { tracing::info!("Cancellation received, killing anvil task."); } - // sync task - res = sync_task => { - tracing::info!("Sync task finished."); - res?; - } } Ok(()) diff --git a/process-compose/movement-full-node/process-compose.with-snapshot-node.yml b/process-compose/movement-full-node/process-compose.with-snapshot-node.yml new file mode 100644 index 000000000..64ef6da9b --- /dev/null +++ b/process-compose/movement-full-node/process-compose.with-snapshot-node.yml @@ -0,0 +1,18 @@ +version: "3" + +processes: + + movement-snapshot-node: + command: | + ./scripts/movement-full-node/snapshot + depends_on: + movement-celestia-da-light-node: + condition: process_healthy + availability: + exit_on_end: true + + # movement-full-node now depends on movement-snapshot-node having started + movement-full-node: + depends_on: + movement-snapshot-node: + condition: process_healthy \ No newline at end of file diff --git a/protocol-units/execution/maptos/framework/releases/release-script/src/lib.rs b/protocol-units/execution/maptos/framework/releases/release-script/src/lib.rs index d3bb29428..a0d302990 100644 --- a/protocol-units/execution/maptos/framework/releases/release-script/src/lib.rs +++ b/protocol-units/execution/maptos/framework/releases/release-script/src/lib.rs @@ -190,7 +190,6 @@ macro_rules! generate_script_module { impl $struct_name { pub fn new() -> Self { let script = $script_stanza; - Self { with_script: RunScript::new(super::$struct_name::new(), script) } } } diff --git a/protocol-units/syncing/syncup/src/lib.rs b/protocol-units/syncing/syncup/src/lib.rs index 38a80a225..0f269aebd 100644 --- a/protocol-units/syncing/syncup/src/lib.rs +++ b/protocol-units/syncing/syncup/src/lib.rs @@ -137,7 +137,6 @@ pub async fn syncup( info!("Non-leader upsyncing is disabled"); } } - Ok::<(), anyhow::Error>(()) }; Ok(upsync_task) diff --git a/scripts/movement-full-node/snapshot b/scripts/movement-full-node/snapshot new file mode 100644 index 000000000..f60ce54eb --- /dev/null +++ b/scripts/movement-full-node/snapshot @@ -0,0 +1,45 @@ +#!/bin/bash -e +# Copy the snapshot + +export AWS_REGION="us-west-1"; + +echo "Starting node for 60 seconds..." +# Run the node for 60 seconds, terminating if it exceeds the time limit +if timeout 60s movement-full-node run; then + echo "Node run completed within the time limit." +else + echo "Node run did not complete within the time limit. Forcefully terminated." +fi +echo "Taking snapshot..." +if movement-full-node backup save-and-push; then + echo "Snapshot taken successfully." +else + echo "Snapshot process failed. Exiting loop." + break +fi + +echo "Remove node DB" +rm -rf "${DOT_MOVEMENT_PATH}/maptos*"; +rm -rf "${DOT_MOVEMENT_PATH}/movement-da-db"; + +echo "Restore snapshot..." +export RUST_BACKTRACE=1; +if movement-full-node backup restore; then + echo "Snapshot restored successfully." +else + echo "Snapshot restoration failed. Exiting loop." + break +fi + +export MAYBE_RUN_LOCAL=true; + +echo "Starting new node with the backup db for 60 seconds..." +if timeout 60s movement-full-node run; then + echo "Node run completed within the time limit." +else + echo "Node run did not complete within the time limit. Forcefully terminated." +fi + + + +done \ No newline at end of file diff --git a/util/signing/interface/src/key/mod.rs b/util/signing/interface/src/key/mod.rs index fcb13b1b6..2094c355b 100644 --- a/util/signing/interface/src/key/mod.rs +++ b/util/signing/interface/src/key/mod.rs @@ -40,14 +40,16 @@ pub enum Environment { Prod, Dev, Staging, + Test, } impl ToCanonicalString for Environment { fn to_canonical_string(&self) -> String { match self { Environment::Prod => "prod".to_string(), - Environment::Dev => "devNet".to_string(), + Environment::Dev => "devnet".to_string(), Environment::Staging => "staging".to_string(), + Environment::Test => "testnet".to_string(), } } } @@ -56,8 +58,9 @@ impl TryFromCanonicalString for Environment { fn try_from_canonical_string(s: &str) -> Result { match s { "prod" => Ok(Environment::Prod), - "devNet" => Ok(Environment::Dev), + "devnet" => Ok(Environment::Dev), "staging" => Ok(Environment::Staging), + "testnet" => Ok(Environment::Test), _ => Err(format!("invalid environment: {}", s)), } } diff --git a/util/syncador/Cargo.toml b/util/syncador/Cargo.toml index 2504fe183..9397b0cf1 100644 --- a/util/syncador/Cargo.toml +++ b/util/syncador/Cargo.toml @@ -35,6 +35,7 @@ tracing = { workspace = true } [dev-dependencies] uuid = { workspace = true } +tracing-subscriber = { workspace = true } [lints] workspace = true diff --git a/util/syncador/src/backend/archive/gzip/mod.rs b/util/syncador/src/backend/archive/gzip/mod.rs index 230df2329..a09e3324d 100644 --- a/util/syncador/src/backend/archive/gzip/mod.rs +++ b/util/syncador/src/backend/archive/gzip/mod.rs @@ -1,95 +1,2 @@ pub mod pull; pub mod push; - -pub(crate) const DEFAULT_CHUNK_SIZE: usize = 500 * 1024 * 1024; // 500 MB per chunk (adjustable) -pub(crate) const BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10 MB buffer for each read/write operation - -#[cfg(test)] -pub mod test { - - use crate::backend::archive::gzip::pull::Pull; - use crate::backend::archive::gzip::push::Push; - use crate::backend::PullOperations; - use crate::backend::PushOperations; - use crate::files::package::{Package, PackageElement}; - use std::fs::File; - use std::io::BufWriter; - use std::io::Write; - use std::path::PathBuf; - - #[tokio::test] - pub async fn test_archive_split() -> Result<(), anyhow::Error> { - // 1) Chunk size is bigger than the archive. No split in chunk. - process_archive_test("test_archive_split.tmp", 10 * 1024, 1024).await?; - // 2) Chunk size is smaller than the archive. Several chunk is create and reconstructed. - process_archive_test("test_archive_split2.tmp", 2024, 1024).await?; - Ok(()) - } - - async fn process_archive_test( - temp_file_name: &str, - chunk_size: usize, - buffer_size: usize, - ) -> Result<(), anyhow::Error> { - //Create source and destination temp dir. - let source_dir = tempfile::tempdir()?; - let destination_dir = tempfile::tempdir()?; - - //1) First test file too small doesn't archive. - - let archive_file_path = source_dir.path().join(temp_file_name); - { - let file = File::create(&archive_file_path)?; - let mut writer = BufWriter::new(file); - //Fill with some data. 10 Mb - let data: Vec = vec![2; 1024 * 1024]; - (0..10).try_for_each(|_| writer.write_all(&data))?; - } - - let push = Push { archives_dir: source_dir.path().to_path_buf(), chunk_size, buffer_size }; - - let element = PackageElement { - sync_files: vec![archive_file_path], - root_dir: source_dir.path().to_path_buf(), - }; - let package = Package(vec![element]); - let archive_package = push.push(package).await?; - println!("TEST archive_package: {:?}", archive_package); - - let file_metadata = std::fs::metadata(&archive_package.0[0].sync_files[0])?; - let file_size = file_metadata.len() as usize; - println!("TEST Dest chunk file size: {file_size}",); - - // Unarchive and verify - //move archive to dest folder. - let dest_files = archive_package - .0 - .into_iter() - .flat_map(|element| element.sync_files) - .map(|absolute_path| { - let dest = destination_dir.path().join(absolute_path.file_name().unwrap()); - println!("TEST move file source:{absolute_path:?} dest:{dest:?}"); - std::fs::rename(&absolute_path, &dest)?; - Ok(dest) - }) - .collect::>>()?; - - let pull = Pull { destination_dir: destination_dir.path().to_path_buf() }; - let element = PackageElement { - sync_files: dest_files, - root_dir: destination_dir.path().to_path_buf(), - }; - let package = Package(vec![element]); - - let dest_package = pull.pull(Some(package)).await; - println!("ICICICIC dest_package: {:?}", dest_package); - - //verify the dest file has the right size - let file_metadata = std::fs::metadata(&destination_dir.path().join(temp_file_name))?; - let file_size = file_metadata.len() as usize; - println!("Dest fiel size: {file_size}",); - assert_eq!(file_size, 10 * 1024 * 1024, "dest file hasn't the right size: {file_size}"); - - Ok(()) - } -} diff --git a/util/syncador/src/backend/archive/gzip/pull.rs b/util/syncador/src/backend/archive/gzip/pull.rs index 56a08b77a..791ac1266 100644 --- a/util/syncador/src/backend/archive/gzip/pull.rs +++ b/util/syncador/src/backend/archive/gzip/pull.rs @@ -1,12 +1,8 @@ -use crate::backend::archive::gzip::BUFFER_SIZE; use crate::backend::PullOperations; use crate::files::package::{Package, PackageElement}; use flate2::read::GzDecoder; use std::collections::VecDeque; use std::fs::File; -use std::fs::OpenOptions; -use std::io::BufReader; -use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use tar::Archive; use tokio::{fs, task}; @@ -55,29 +51,8 @@ impl Pull { // Create the destination directory if it doesn't exist fs::create_dir_all(&destination).await?; - println!("PULL manifest:{:?}", manifest); - - let mut unsplit_manifest = - PackageElement { sync_files: vec![], root_dir: manifest.root_dir.clone() }; - - // Unpack each archive in the manifest + // Unpack each archive in the unsplit_manifest for (_relative_path, absolute_path) in manifest.try_path_tuples()? { - // Recreate splited file if any - let path_buf = absolute_path.to_path_buf(); - let absolute_path = task::spawn_blocking(move || recreate_archive(path_buf)).await??; - - println!("PULL absolute_path {absolute_path:?}",); - println!("PULL destination {destination:?}",); - - if !unsplit_manifest.sync_files.contains(&absolute_path) { - unsplit_manifest.sync_files.push(absolute_path) - } - } - - println!("PULL unsplit_manifest:{:?}", unsplit_manifest); - - // Unpack each archive in the manifest - for (_relative_path, absolute_path) in unsplit_manifest.try_path_tuples()? { let tar_gz = File::open(&absolute_path)?; let decoder = GzDecoder::new(tar_gz); let mut archive = Archive::new(decoder); @@ -93,7 +68,6 @@ impl Pull { // Recursively add every file (not directory) in the destination directory to the new manifest let mut entries = Vec::new(); Self::collect_files(&destination, &mut entries).await?; - info!("Unarchived files: {:?}", entries.len()); for file_path in entries { new_manifest.add_sync_file(file_path); } @@ -122,69 +96,3 @@ impl PullOperations for Pull { Ok(Some(Package(manifests))) } } - -fn recreate_archive(archive_chunk: PathBuf) -> Result { - if archive_chunk - .extension() - .map(|ext| { - println!("ext:{ext:?}",); - ext != "chunk" - }) - .unwrap_or(true) - { - //not a chunk file return. - return Ok(archive_chunk); - } - - let arhive_file_name = archive_chunk - .file_name() - .and_then(|file_name| file_name.to_str()) - .and_then(|file_name_str| file_name_str.strip_suffix(".chunk")) - .and_then(|base_filename| { - let base_filename_parts: Vec<&str> = base_filename.rsplitn(2, '_').collect(); - (base_filename_parts.len() > 1).then(|| base_filename_parts[1].to_string()) - }) - .ok_or(anyhow::anyhow!(format!( - "Archive filename not found for chunk path:{:?}", - archive_chunk.to_str() - )))?; - - println!("PULL arhive_file_name:{:?}", arhive_file_name); - - let archive_path = archive_chunk.parent().map(|parent| parent.join(arhive_file_name)).ok_or( - anyhow::anyhow!(format!( - "Archive filename no root dir in path:{:?}", - archive_chunk.to_str() - )), - )?; - - println!("PULL archive_path:{:?}", archive_path); - let mut archive_file = OpenOptions::new() - .create(true) // Create the file if it doesn't exist - .append(true) // Open in append mode (do not overwrite) - .open(&archive_path)?; - - let mut buffer = vec![0; BUFFER_SIZE]; - - println!("PULL archive_chunk:{:?}", archive_chunk); - let chunk_file = File::open(&archive_chunk)?; - let mut chunk_reader = BufReader::new(chunk_file); - - loop { - // Read a part of the chunk into the buffer - let bytes_read = chunk_reader.read(&mut buffer)?; - - if bytes_read == 0 { - break; // End of chunk file - } - - // Write the buffer data to the output file - archive_file.write_all(&buffer[..bytes_read])?; - } - - let file_metadata = std::fs::metadata(&archive_path)?; - let file_size = file_metadata.len() as usize; - println!("PULL {archive_path:?} archive_chunk size: {file_size}",); - - Ok(archive_path) -} diff --git a/util/syncador/src/backend/archive/gzip/push.rs b/util/syncador/src/backend/archive/gzip/push.rs index 507ff908f..f14853274 100644 --- a/util/syncador/src/backend/archive/gzip/push.rs +++ b/util/syncador/src/backend/archive/gzip/push.rs @@ -1,57 +1,19 @@ -use crate::backend::archive::gzip::BUFFER_SIZE; -use crate::backend::archive::gzip::DEFAULT_CHUNK_SIZE; use crate::backend::PushOperations; use crate::files::package::{Package, PackageElement}; use flate2::write::GzEncoder; use flate2::Compression; use std::fs::File; -use std::io::{BufReader, Read, Write}; -use std::path::Path; use std::path::PathBuf; use tar::Builder; #[derive(Debug, Clone)] pub struct Push { pub archives_dir: PathBuf, - pub chunk_size: usize, - pub buffer_size: usize, } impl Push { pub fn new(archives_dir: PathBuf) -> Self { - Self { archives_dir, chunk_size: DEFAULT_CHUNK_SIZE, buffer_size: BUFFER_SIZE } - } - - /// Tar GZips a manifest. - fn tar_gzip_manifest( - manifest: PackageElement, - destination: PathBuf, - root_dir: PathBuf, - chunk_size: usize, - buffer_size: usize, - ) -> Result { - // create the archive builder - let file = File::create(destination.clone())?; - { - let encoder = GzEncoder::new(file, Compression::default()); - let mut tar_builder = Builder::new(encoder); - - for (relative_path, absolute_path) in manifest.try_path_tuples()? { - let file = &mut std::fs::File::open(absolute_path)?; - tar_builder.append_file(relative_path, file)?; - } - - // Finish writing the tar archive - tar_builder.finish()?; - } - - // Split the archive if needed - let destinations = split_archive(destination, &root_dir, chunk_size, buffer_size)?; - let mut new_manifest = PackageElement::new(root_dir); - for dest in destinations { - new_manifest.add_sync_file(dest); - } - Ok(new_manifest) + Self { archives_dir } } } @@ -62,16 +24,12 @@ impl PushOperations for Push { for (i, manifest) in package.0.into_iter().enumerate() { let new_manifest = tokio::task::spawn_blocking({ let archive_dir = self.archives_dir.clone(); - let chunk_size = self.chunk_size; - let buffer_size = self.buffer_size; move || { - Self::tar_gzip_manifest( + tar_gzip_manifest( manifest, archive_dir.join(format!("{}.tar.gz", i)), archive_dir, - chunk_size, - buffer_size, ) } }) @@ -82,79 +40,29 @@ impl PushOperations for Push { } } -fn split_archive>( - archive: PathBuf, - root_dir: P, - chunk_size: usize, - buffer_size: usize, -) -> Result, anyhow::Error> { - let output_dir = root_dir.as_ref(); - - // Check the file size before proceeding with the split - let file_metadata = std::fs::metadata(&archive)?; - let file_size = file_metadata.len() as usize; - println!("Push split file size{file_size} chunksize:{chunk_size}",); - - if file_size <= chunk_size { - return Ok(vec![archive]); - } - - let archive_file = File::open(&archive)?; - - let file_metadata = std::fs::metadata(&archive)?; - let file_size = file_metadata.len() as usize; - println!("PUSH {archive:?} archive_file size: {file_size}",); - - std::fs::create_dir_all(output_dir)?; - - let mut chunk_num = 0; - let mut buffer = vec![0; buffer_size]; - - let archive_relative_path = archive.strip_prefix(&output_dir)?; - let mut input_reader = BufReader::new(archive_file); - - let mut chunk_list = vec![]; - loop { - // Create a new file for the chunk - let chunk_path = output_dir.join(format!( - "{}_{:03}.chunk", - archive_relative_path.to_string_lossy(), - chunk_num - )); - - let mut chunk_file = File::create(&chunk_path)?; - - let mut all_read_bytes = 0; - let end = loop { - // Read a part of the chunk into the buffer - let bytes_read = input_reader.read(&mut buffer)?; - if bytes_read == 0 { - break true; // End of chunk file - } - - // Write the buffer data to the output file - chunk_file.write_all(&buffer[..bytes_read])?; - all_read_bytes += bytes_read; - if all_read_bytes >= chunk_size { - break false; - } - }; - - if all_read_bytes == 0 { - break; // End of chunk file and discard the current one. +/// Tar GZips a manifest. +fn tar_gzip_manifest( + manifest: PackageElement, + destination: PathBuf, + root_dir: PathBuf, +) -> Result { + // create the archive builder + let file = File::create(destination.clone())?; + { + let encoder = GzEncoder::new(file, Compression::default()); + let mut tar_builder = Builder::new(encoder); + + for (relative_path, absolute_path) in manifest.try_path_tuples()? { + let file = &mut std::fs::File::open(absolute_path)?; + tar_builder.append_file(relative_path, file)?; } - let file_metadata = std::fs::metadata(&chunk_path)?; - let file_size = file_metadata.len() as usize; - println!("{chunk_path:?} chunk_file size: {file_size}",); - - chunk_num += 1; - chunk_list.push(chunk_path); - if end { - break; // End of chunk file - } + // Finish writing the tar archive + tar_builder.finish()?; } - println!("split_archive return {chunk_list:?}",); - Ok(chunk_list) + let mut new_manifest = PackageElement::new(root_dir); + new_manifest.add_sync_file(destination); + + Ok(new_manifest) } diff --git a/util/syncador/src/backend/glob/file.rs b/util/syncador/src/backend/glob/file.rs index 209ab8f35..648f30fc0 100644 --- a/util/syncador/src/backend/glob/file.rs +++ b/util/syncador/src/backend/glob/file.rs @@ -60,7 +60,7 @@ async fn walk_directory( impl PushOperations for FileGlob { async fn push(&self, _package: Package) -> Result { // just check the matching glob files - info!("Running glob push"); + info!("Running glob push with pattern:{} in dir:{:?}", self.pattern, self.root_dir); let mut globset_builder = GlobSetBuilder::new(); globset_builder.add(self.pattern.clone()); let globset = globset_builder.build()?; diff --git a/util/syncador/src/backend/pipeline/pull.rs b/util/syncador/src/backend/pipeline/pull.rs index 93bf0501e..05cdd93b2 100644 --- a/util/syncador/src/backend/pipeline/pull.rs +++ b/util/syncador/src/backend/pipeline/pull.rs @@ -22,7 +22,7 @@ impl PullOperations for Pipeline { async fn pull(&self, package: Option) -> Result, anyhow::Error> { let mut package = package; for backend in &self.backends { - info!("Pulling from backend"); + info!("Pulling from backend package:{package:?}"); package = backend.pull(package.clone()).await?; } Ok(package) diff --git a/util/syncador/src/backend/s3/shared_bucket/mod.rs b/util/syncador/src/backend/s3/shared_bucket/mod.rs index f750d6956..2f01c936b 100644 --- a/util/syncador/src/backend/s3/shared_bucket/mod.rs +++ b/util/syncador/src/backend/s3/shared_bucket/mod.rs @@ -3,6 +3,8 @@ use aws_types::region::Region; use tracing::info; const UPLOAD_COMPLETE_MARKER_FILE_NAME: &str = "upload_complete.txt"; +pub(crate) const DEFAULT_CHUNK_SIZE: usize = 500 * 1024 * 1024; // 500 MB per chunk (adjustable) +pub(crate) const BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10 MB buffer for each read/write operation pub mod metadata; pub mod pull; @@ -25,6 +27,39 @@ pub async fn create_with_load_from_env( create(client, bucket, metadata, pull_destination).await } +pub async fn create_push_with_load_from_env( + bucket: String, + metadata: metadata::Metadata, +) -> Result { + let region = match std::env::var("AWS_REGION") { + Ok(region) => Some(Region::new(region)), + Err(_) => None, + }; + let config = aws_config::load_from_env().await.into_builder().region(region).build(); + info!("Create client used region {:?}", config.region()); + let client = aws_sdk_s3::Client::new(&config); + let bucket_connection = bucket_connection::BucketConnection::create(client, bucket).await?; + let push = push::Push::new(bucket_connection, metadata); + Ok(push) +} + +pub async fn create_pull_with_load_from_env( + bucket: String, + metadata: metadata::Metadata, + pull_destination: PathBuf, +) -> Result { + let region = match std::env::var("AWS_REGION") { + Ok(region) => Some(Region::new(region)), + Err(_) => None, + }; + let config = aws_config::load_from_env().await.into_builder().region(region).build(); + info!("Create client used region {:?}", config.region()); + let client = aws_sdk_s3::Client::new(&config); + let bucket_connection = bucket_connection::BucketConnection::create(client, bucket).await?; + let pull = pull::Pull::new(bucket_connection, metadata, pull_destination); + Ok(pull) +} + pub async fn destroy_with_load_from_env(bucket: String) -> Result<(), anyhow::Error> { let region = match std::env::var("AWS_REGION") { Ok(region) => Some(Region::new(region)), @@ -73,8 +108,158 @@ pub mod test { //! pub in case we want to reuse helpers use super::*; + use crate::backend::s3::bucket_connection::BucketConnection; + use crate::backend::s3::shared_bucket::pull::Pull; + use crate::backend::s3::shared_bucket::push::Push; use crate::backend::{PullOperations, PushOperations}; use crate::files::package::{Package, PackageElement}; + use movement_types::actor; + use std::fs::File; + use std::io::BufReader; + use std::io::BufWriter; + use std::io::Read; + use std::io::Write; + + #[tokio::test] + pub async fn test_archive_split() -> Result<(), anyhow::Error> { + use tracing_subscriber::EnvFilter; + + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), + ) + .init(); + // 1) Chunk size is bigger than the archive. No split in chunk. + process_archive_test("test_archive_split.tmp", 10 * 1024, 600).await?; + // 2) Chunk size is smaller than the archive. Several chunk is create and reconstructed. + process_archive_test("test_archive_split2.tmp", 1024, 312).await?; + Ok(()) + } + + async fn process_archive_test( + temp_file_name: &str, + chunk_size: usize, + buffer_size: usize, + ) -> Result<(), anyhow::Error> { + //Create source and destination temp dir. + let source_dir = tempfile::tempdir()?; + let destination_dir = tempfile::tempdir()?; + + //create file to Push size 10 * 1024. + let archive_file_path = source_dir.path().join(temp_file_name); + { + let data: Vec = (0..1024usize).map(|i| (i % 256) as u8).collect(); + let file = File::create(&archive_file_path)?; + let mut writer = BufWriter::new(file); + //Fill with some data. 10 Mb + (0..10).try_for_each(|_| writer.write_all(&data))?; + } + + let bucket = format!("public-test-bucket-{}", uuid::Uuid::new_v4()); + let config = aws_config::load_from_env().await; + let client = aws_sdk_s3::Client::new(&config); + + let connection = BucketConnection::new(client.clone(), bucket.clone()); + + connection.create_bucket_if_not_exists().await?; + + let application_id = application::Id::new([ + 26, 43, 60, 77, 94, 111, 122, 139, 156, 173, 190, 207, 208, 225, 242, 3, 20, 37, 54, + 71, 88, 105, 122, 139, 156, 173, 190, 207, 208, 225, 242, 3, + ]); + let syncer_id = actor::Id::new([ + 10, 64, 193, 217, 99, 233, 100, 32, 31, 1, 244, 166, 56, 79, 213, 208, 112, 158, 162, + 27, 10, 111, 130, 99, 91, 130, 103, 26, 12, 121, 210, 75, + ]); + + let metadata = metadata::Metadata::default() + .with_application_id(application_id) + .with_syncer_id(syncer_id); + + let push = Push { + bucket_connection: connection, + metadata: metadata.clone(), + chunk_size, + buffer_size, + }; + + let element = PackageElement { + sync_files: vec![archive_file_path], + root_dir: source_dir.path().to_path_buf(), + }; + let package = Package(vec![element]); + let archive_package = push.push(package).await?; + + // Pull archive + let connection = BucketConnection::new(client.clone(), bucket.clone()); + let pull = Pull::new(connection, metadata, destination_dir.path().to_path_buf()); + let element = PackageElement { + sync_files: vec![archive_package.0[0].sync_files[0].clone()], + root_dir: destination_dir.path().to_path_buf(), + }; + let package = Package(vec![element]); + + let dest_package = pull + .pull(Some(package.clone())) + .await? + .ok_or(anyhow::anyhow!("Error No file pulled."))?; + + verify_archive(&dest_package.0[0].sync_files[0])?; + + //verify that all chunk has been removed + let has_chunk = std::fs::read_dir(&destination_dir)? + .find(|entry| { + entry + .as_ref() + .ok() + .map(|entry| { + let path = entry.path(); + path.is_file() + && path.extension().and_then(|ext| ext.to_str()) == Some("chunk") + }) + .unwrap_or(false) + }) + .is_some(); + assert!(!has_chunk, "Some chunk are still present."); + + //Do a second pull to validate it manage last pull remaining files. + let dest_package = pull + .pull(Some(package)) + .await? + .ok_or(anyhow::anyhow!("Error second pull failed."))?; + verify_archive(&dest_package.0[0].sync_files[0])?; + + Ok(()) + } + + fn verify_archive + std::marker::Copy>( + archive_file: P, + ) -> Result<(), anyhow::Error> { + let file_metadata = std::fs::metadata(archive_file)?; + let file_size = file_metadata.len() as usize; + assert_eq!(file_size, 10 * 1024, "dest file hasn't the right size: {file_size}"); + + //verify that the file byte are in order. + let pulled_file = File::open(archive_file)?; + let mut reader = BufReader::new(pulled_file); + let mut buffer = [0u8; 1024]; + let mut expected_byte: u8 = 0; + + loop { + let bytes_read = reader.read(&mut buffer)?; + if bytes_read == 0 { + break; // End of file + } + + for &byte in &buffer[..bytes_read] { + if byte != expected_byte { + panic!("Pull file bytes in wrong order."); + } + expected_byte = expected_byte.wrapping_add(1); // Increment and wrap around after 255 + } + } + Ok(()) + } #[tokio::test] async fn test_create() -> Result<(), anyhow::Error> { diff --git a/util/syncador/src/backend/s3/shared_bucket/pull.rs b/util/syncador/src/backend/s3/shared_bucket/pull.rs index 7578e64f6..f4f602e71 100644 --- a/util/syncador/src/backend/s3/shared_bucket/pull.rs +++ b/util/syncador/src/backend/s3/shared_bucket/pull.rs @@ -1,11 +1,15 @@ use super::metadata::Metadata; use crate::backend::s3::bucket_connection::BucketConnection; +use crate::backend::s3::shared_bucket::BUFFER_SIZE; use crate::backend::PullOperations; use crate::files::package::{Package, PackageElement}; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; +use std::fs::File; +use std::fs::OpenOptions; +use std::io::BufReader; +use std::io::{Read, Write}; use std::path::PathBuf; use tokio::io::AsyncWriteExt; -use tracing::info; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Candidate { @@ -37,6 +41,7 @@ impl Pull { ) -> Result { let bucket = self.bucket_connection.bucket.clone(); let key = format!("{}/{}", candidate_selected.key, relative_path.to_string_lossy()); + tracing::info!("Pulling file from S3 on bucket:{bucket} with key: {key}"); let mut output = self .bucket_connection .client @@ -72,7 +77,6 @@ impl Pull { .list_all_application_file_paths_for(&self.bucket_connection) .await?; - info!("Public file paths: {:?}", public_file_paths); for file_path in public_file_paths { // the first three parts are the candidate key let parts: Vec<&str> = file_path.split('/').into_iter().take(3).collect(); @@ -102,7 +106,6 @@ impl Pull { } }) .collect(); - println!("S3 PUSH to_remove: {to_remove:?}",); to_remove.iter().for_each(|key| { candidates.remove(key); }); @@ -110,37 +113,11 @@ impl Pull { Ok(candidates.keys().cloned().collect()) } - /*pub(crate) async fn download_based_on_manifest( - &self, - candidate_selected: &Candidate, - manifest: PackageElement, - ) -> Result { - // get the path tuples - let path_tuples = manifest.try_path_tuples()?; - - // download each file - let mut manifest_futures = Vec::new(); - for (relative_path, full_path) in path_tuples { - let future = self.download_path( - candidate_selected, - relative_path.to_path_buf().clone(), - full_path.clone(), - ); - manifest_futures.push(future); - } - - // try to join all the manifest_futures - futures::future::try_join_all(manifest_futures).await?; - - // should downloaded into the locations specified in the manifest - Ok(manifest) - }*/ - pub(crate) async fn download_all_files_for_candidate( &self, candidate: &Candidate, ) -> Result { - info!("Downloading all files for candidate: {:?}", candidate); + tracing::debug!("Downloading all files for candidate: {:?}", candidate); // get all of the public file paths for this application let public_file_paths = self @@ -148,8 +125,9 @@ impl Pull { .list_all_application_file_paths_for(&self.bucket_connection) .await?; - // filter the public file paths for the candidate - let file_paths: HashSet = public_file_paths + // Filter the public file paths for the candidate. + // Use BTreeSet to order the file chunks. + let file_paths: BTreeSet = public_file_paths .into_iter() .filter(|file_path| file_path.starts_with(&candidate.key)) .collect(); @@ -178,12 +156,25 @@ impl Pull { // try to join all the manifest_futures futures::future::try_join_all(manifest_futures).await?; + //recreate splited archive if needed. + let mut unsplit_manifest = + PackageElement { sync_files: vec![], root_dir: manifest.root_dir.clone() }; + for absolute_path in &manifest.sync_files { + let path_buf = absolute_path.to_path_buf(); + let absolute_path = + tokio::task::spawn_blocking(move || recreate_archive(path_buf)).await??; + + if !unsplit_manifest.sync_files.contains(&absolute_path) { + tracing::info!("Archive file added {absolute_path:?}",); + unsplit_manifest.sync_files.push(absolute_path) + } + } + // should downloaded into the locations specified in the manifest - Ok(manifest) + Ok(unsplit_manifest) } async fn find_candidates(&self, package: &Package) -> Result, anyhow::Error> { - info!("Finding candidates for package: {:?}", package); let candidates = self.candidates_for(package).await?; Ok(candidates.into_iter().collect()) } @@ -193,7 +184,6 @@ impl Pull { _package: &Package, mut candidates: Vec, ) -> Result { - info!("Selecting from candidates: {:?}", candidates); // sort the intersection of candidates by the epoch (latest first) candidates.sort_by_key(|candidate| -(candidate.sync_epoch as i64)); @@ -210,7 +200,6 @@ impl Pull { _package: Package, candidate: Candidate, ) -> Result { - info!("Pulling candidate: {:?}", candidate); // pull all of the files for the candidate let manifest = self.download_all_files_for_candidate(&candidate).await?; let manifests = vec![manifest]; @@ -222,7 +211,7 @@ impl Pull { #[async_trait::async_trait] impl PullOperations for Pull { async fn pull(&self, package: Option) -> Result, anyhow::Error> { - info!("S3 pulling package: {:?}", package); + tracing::debug!("S3 pulling package: {:?}", package); if package.is_none() { return Ok(None); } @@ -231,7 +220,6 @@ impl PullOperations for Pull { let candidates = self.find_candidates(&package).await?; - info!("Candidates: {:?}", candidates); if candidates.is_empty() { return Ok(None); } @@ -240,3 +228,67 @@ impl PullOperations for Pull { Ok(Some(self.pull_candidate(package, candidate_selected).await?)) } } + +fn recreate_archive(archive_chunk: PathBuf) -> Result { + if archive_chunk.extension().map(|ext| ext != "chunk").unwrap_or(true) { + //not a chunk file return. + return Ok(archive_chunk); + } + + let (chunk, archive_file_name) = archive_chunk + .file_name() + .and_then(|file_name| file_name.to_str()) + .and_then(|file_name_str| file_name_str.strip_suffix(".chunk")) + .and_then(|base_filename| { + let base_filename_parts: Vec<&str> = base_filename.rsplitn(2, '_').collect(); + (base_filename_parts.len() > 1) + .then(|| (base_filename_parts[0].to_string(), base_filename_parts[1].to_string())) + }) + .ok_or(anyhow::anyhow!(format!( + "Archive filename not found for chunk path:{:?}", + archive_chunk.to_str() + )))?; + + let archive_path = archive_chunk.parent().map(|parent| parent.join(archive_file_name)).ok_or( + anyhow::anyhow!(format!( + "Archive filename no root dir in path:{:?}", + archive_chunk.to_str() + )), + )?; + + //remove old archive file + if chunk == "000" && archive_path.exists() { + std::fs::remove_file(&archive_path)?; + } + + let mut archive_file = OpenOptions::new() + .create(true) // Create the file if it doesn't exist + .append(true) // Open in append mode (do not overwrite) + .open(&archive_path)?; + + let mut buffer = vec![0; BUFFER_SIZE]; + + let chunk_file = File::open(&archive_chunk)?; + let mut chunk_reader = BufReader::new(chunk_file); + + loop { + // Read a part of the chunk into the buffer + let bytes_read = chunk_reader.read(&mut buffer)?; + + if bytes_read == 0 { + break; // End of chunk file + } + + // Write the buffer data to the output file + archive_file.write_all(&buffer[..bytes_read])?; + } + + let file_metadata = std::fs::metadata(&archive_path)?; + let file_size = file_metadata.len() as usize; + + //remove the chunk that is useless. + std::fs::remove_file(&archive_chunk)?; + tracing::debug!("PULL {archive_path:?} archive_chunk size: {file_size}",); + + Ok(archive_path) +} diff --git a/util/syncador/src/backend/s3/shared_bucket/push.rs b/util/syncador/src/backend/s3/shared_bucket/push.rs index a0a29efb2..995915b98 100644 --- a/util/syncador/src/backend/s3/shared_bucket/push.rs +++ b/util/syncador/src/backend/s3/shared_bucket/push.rs @@ -1,9 +1,14 @@ use super::metadata::Metadata; use crate::backend::s3::bucket_connection::BucketConnection; +use crate::backend::s3::shared_bucket::BUFFER_SIZE; +use crate::backend::s3::shared_bucket::DEFAULT_CHUNK_SIZE; use crate::backend::PushOperations; use crate::files::package::{Package, PackageElement}; use aws_sdk_s3::operation::put_object::PutObjectOutput; use aws_sdk_s3::primitives::ByteStream; +use std::fs::File; +use std::io::{BufReader as StdBufReader, Read, Write}; +use std::path::Path; use std::path::PathBuf; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -16,11 +21,18 @@ pub struct Candidate { pub struct Push { pub bucket_connection: BucketConnection, pub metadata: Metadata, + pub chunk_size: usize, + pub buffer_size: usize, } impl Push { pub fn new(bucket_connection: BucketConnection, metadata: Metadata) -> Self { - Self { bucket_connection, metadata } + Self { + bucket_connection, + metadata, + chunk_size: DEFAULT_CHUNK_SIZE, + buffer_size: BUFFER_SIZE, + } } pub(crate) async fn upload_path( @@ -31,6 +43,7 @@ impl Push { let bucket = self.bucket_connection.bucket.clone(); let key = format!("{}/{}", self.metadata.syncer_epoch_prefix()?, relative_path.to_string_lossy()); + tracing::info!("Pushing file on S3 on bucket:{bucket} with key: {key}"); let body = ByteStream::from_path(full_path).await?; let s3_path = format!("s3://{}/{}", bucket, key); let output = self @@ -169,9 +182,25 @@ impl Push { #[async_trait::async_trait] impl PushOperations for Push { async fn push(&self, package: Package) -> Result { + tracing::debug!("Pushing package:{package:?}"); // prune the old epochs self.prune().await?; + // Split the too big files + let mut new_package_elements = vec![]; + for element in package.0 { + for file_path in element.sync_files { + let new_files = + split_archive(file_path, &element.root_dir, self.chunk_size, self.buffer_size)?; + let mut new_element = PackageElement::new(element.root_dir.clone()); + for dest in new_files { + new_element.add_sync_file(dest); + } + new_package_elements.push(new_element); + } + } + let package = Package(new_package_elements); + // upload the package let mut manifest_futures = Vec::new(); for manifest in package.into_manifests() { @@ -182,3 +211,70 @@ impl PushOperations for Push { Ok(Package(manifests)) } } + +fn split_archive>( + archive: PathBuf, + root_dir: P, + chunk_size: usize, + buffer_size: usize, +) -> Result, anyhow::Error> { + let output_dir = root_dir.as_ref(); + + // Check the file size before proceeding with the split + let file_metadata = std::fs::metadata(&archive)?; + let file_size = file_metadata.len() as usize; + if file_size <= chunk_size { + return Ok(vec![archive]); + } + + let archive_file = File::open(&archive)?; + + std::fs::create_dir_all(output_dir)?; + + let mut chunk_num = 0; + let mut buffer = vec![0; buffer_size]; + + let archive_relative_path = archive.strip_prefix(&output_dir)?; + let mut input_reader = StdBufReader::new(archive_file); + + let mut chunk_list = vec![]; + loop { + // Create a new file for the chunk + let chunk_path = output_dir.join(format!( + "{}_{:03}.chunk", + archive_relative_path.to_string_lossy(), + chunk_num + )); + + let mut chunk_file = File::create(&chunk_path)?; + + let mut all_read_bytes = 0; + let end = loop { + // Read a part of the chunk into the buffer + let bytes_read = input_reader.read(&mut buffer)?; + if bytes_read == 0 { + break true; // End of chunk file + } + + // Write the buffer data to the output file + chunk_file.write_all(&buffer[..bytes_read])?; + all_read_bytes += bytes_read; + if all_read_bytes >= chunk_size { + break false; + } + }; + + if all_read_bytes == 0 { + break; // End of chunk file and discard the current one. + } + + chunk_num += 1; + chunk_list.push(chunk_path); + if end { + break; // End of chunk file + } + } + + tracing::info!("split_archive return {chunk_list:?}",); + Ok(chunk_list) +}