diff --git a/Cargo.lock b/Cargo.lock index aff240688..29b101e4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5498,6 +5498,7 @@ dependencies = [ "anyhow", "serde", "serde_json", + "syncup", "tokio", ] @@ -10914,9 +10915,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -12509,6 +12510,7 @@ dependencies = [ "celestia-types", "commander", "dot-movement", + "futures", "godfig", "hex", "m1-da-light-node-setup", @@ -12519,6 +12521,7 @@ dependencies = [ "serde", "serde_json", "suzuka-config", + "syncup", "tempfile", "tokio", "tokio-stream", @@ -12597,6 +12600,7 @@ dependencies = [ "flate2", "flocks", "futures", + "glob", "movement-types", "serde", "serde_json", @@ -12607,6 +12611,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "syncup" +version = "0.0.2" +dependencies = [ + "anyhow", + "async-stream", + "async-trait", + "futures", + "syncador", + "tokio", +] + [[package]] name = "sysinfo" version = "0.28.4" diff --git a/Cargo.toml b/Cargo.toml index 537306966..07ca7d024 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "protocol-units/da/m1/*", "protocol-units/sequencing/memseq/*", "protocol-units/mempool/*", + "protocol-units/syncing/*", "protocol-units/settlement/mcr/client", "protocol-units/settlement/mcr/config", "protocol-units/settlement/mcr/manager", @@ -90,6 +91,8 @@ monza-config = { path = "networks/monza/monza-config" } flocks = { path = "util/flocks" } godfig = { path = "util/godfig" } movement-tracing = { path = "util/tracing" } +syncup = { path = "protocol-units/syncing/syncup" } +syncador = { path = "util/syncador" } # Serialization and Deserialization borsh = { version = "0.10" } # todo: internalize jmt and bump @@ -277,6 +280,8 @@ uuid = { version = "1.10.0", features = ["v4"] } tar = "0.4.41" flate2 = "1.0.31" blake-3 = "1.4.0" +regex = "1.10.6" +glob = "0.3.1" # trying to pin diesel # diesel = "=2.1.1" diff --git a/docker/compose/suzuka-full-node/docker-compose.celestia-local.yml b/docker/compose/suzuka-full-node/docker-compose.local.yml similarity index 53% rename from docker/compose/suzuka-full-node/docker-compose.celestia-local.yml rename to docker/compose/suzuka-full-node/docker-compose.local.yml index 47ce5b82f..d698be517 100644 --- a/docker/compose/suzuka-full-node/docker-compose.celestia-local.yml +++ b/docker/compose/suzuka-full-node/docker-compose.local.yml @@ -1,4 +1,35 @@ services: + setup: + image: ghcr.io/movementlabsxyz/suzuka-full-node-setup:${CONTAINER_REV} + container_name: setup + environment: + DOT_MOVEMENT_PATH: /.movement + # By default, all the listener hostnames and ports are reasonable. + # However, for container networking, we need to specify the hostnames on the connection. + # You can also use host.docker.internal for the host machine as all of these ports should be exposed. + CELESTIA_RPC_CONNECTION_HOSTNAME: m1-da-light-node-celestia-appd + CELESTIA_WEBSOCKET_CONNECTION_HOSTNAME: m1-da-light-node-celestia-bridge + M1_DA_LIGHT_NODE_CONNECTION_HOSTNAME: m1-da-light-node + MAPTOS_API_CONNECTION_HOSTNAME: suzuka-full-node + FAUCET_API_CONNECTION_HOSTNAME: suzuka-faucet-service + ETH_RPC_CONNECTION_HOSTNAME: setup + ETH_RPC_CONNECTION_PROTOCOL: http + ETH_RPC_CONNECTION_PORT: 8090 + MAYBE_RUN_LOCAL: "true" + MAYBE_DEPLOY_MCR: "true" + INDEXER_PROCESSOR_POSTGRES_CONNECTION_STRING: postgres://postgres:password@postgres:5432/postgres + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} + AWS_REGION: ${AWS_DEFAULT_REGION} + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + ports: + - "8090:8090" # port for anvil, this should be moved out into runner + healthcheck: + test: [ "CMD-SHELL", "nc -zv 0.0.0.0 8090" ] + retries: 10 + interval: 10s + timeout: 5s m1-da-light-node-celestia-appd: image: ghcr.io/movementlabsxyz/m1-da-light-node-celestia-appd:${CONTAINER_REV} diff --git a/docker/compose/suzuka-full-node/docker-compose.remote-no-celestia-light-node.yml b/docker/compose/suzuka-full-node/docker-compose.remote-no-celestia-light-node.yml new file mode 100644 index 000000000..20de4d886 --- /dev/null +++ b/docker/compose/suzuka-full-node/docker-compose.remote-no-celestia-light-node.yml @@ -0,0 +1,38 @@ +services: + setup: + image: ghcr.io/movementlabsxyz/suzuka-full-node-setup:${CONTAINER_REV} + container_name: setup + environment: + DOT_MOVEMENT_PATH: /.movement + # By default, all the listener hostnames and ports are reasonable. + # However, for container networking, we need to specify the hostnames on the connection. + # You can also use host.docker.internal for the host machine as all of these ports should be exposed. + CELESTIA_RPC_CONNECTION_HOSTNAME: ${CELESTIA_RPC_CONNECTION_HOSTNAME:?CELESTIA_RPC_CONNECTION_HOSTNAME is not set} + CELESTIA_RPC_CONNECTION_PORT: ${CELESTIA_RPC_CONNECTION_PORT:?CELESTIA_RPC_CONNECTION_HOSTNAME is not set} + CELESTIA_WEBSOCKET_CONNECTION_HOSTNAME: ${CELESTIA_WEBSOCKET_CONNECTION_HOSTNAME:?CELESTIA_WEBSOCKET_CONNECTION_HOSTNAME is not set} + CELESTIA_WEBSOCKET_CONNECTION_PORT: ${CELESTIA_WEBSOCKET_CONNECTION_HOSTNAME:?CELESTIA_WEBSOCKET_CONNECTION_HOSTNAME is not set} + M1_DA_LIGHT_NODE_CONNECTION_HOSTNAME: m1-da-light-node + MAPTOS_API_CONNECTION_HOSTNAME: suzuka-full-node + FAUCET_API_CONNECTION_HOSTNAME: suzuka-faucet-service + ETH_RPC_CONNECTION_HOSTNAME: ${ETH_RPC_CONNECTION_HOSTNAME:?ETH_RPC_CONNECTION_HOSTNAME is not set} + ETH_RPC_CONNECTION_PROTOCOL: ${ETH_RPC_CONNECTION_PROTOCOL:?ETH_RPC_CONNECTION_PROTOCOL is not set} + ETH_RPC_CONNECTION_PORT: ${ETH_RPC_CONNECTION_PORT:?ETH_RPC_CONNECTION_PORT is not set} + INDEXER_PROCESSOR_POSTGRES_CONNECTION_STRING: postgres://postgres:password@postgres:5432/postgres + MOVEMENT_SYNC: ${MOVEMENT_SYNC:?MOVEMENT_SYNC is not set} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:?AWS_ACCESS_KEY_ID is not set} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:?AWS_SECRET_ACCESS_KEY is not set} + AWS_REGION: ${AWS_DEFAULT_REGION:?AWS_DEFAULT_REGION is not set} + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + + celestia-light-node: + image: busybox + container_name: celestia-light-node + command: sh -c 'echo "Starting Celestia light-node services." && sleep infinity' + environment: + - DOT_MOVEMENT_PATH=/.movement + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + depends_on: + setup: + condition: service_healthy diff --git a/docker/compose/suzuka-full-node/docker-compose.setup-local.yml b/docker/compose/suzuka-full-node/docker-compose.setup-local.yml deleted file mode 100644 index 309bcac82..000000000 --- a/docker/compose/suzuka-full-node/docker-compose.setup-local.yml +++ /dev/null @@ -1,29 +0,0 @@ -services: - setup: - image: ghcr.io/movementlabsxyz/suzuka-full-node-setup:${CONTAINER_REV} - container_name: setup - environment: - DOT_MOVEMENT_PATH: /.movement - # By default, all the listener hostnames and ports are reasonable. - # However, for container networking, we need to specify the hostnames on the connection. - # You can also use host.docker.internal for the host machine as all of these ports should be exposed. - CELESTIA_RPC_CONNECTION_HOSTNAME: m1-da-light-node-celestia-appd - CELESTIA_WEBSOCKET_CONNECTION_HOSTNAME: m1-da-light-node-celestia-bridge - M1_DA_LIGHT_NODE_CONNECTION_HOSTNAME: m1-da-light-node - MAPTOS_API_CONNECTION_HOSTNAME: suzuka-full-node - FAUCET_API_CONNECTION_HOSTNAME: suzuka-faucet-service - ETH_RPC_CONNECTION_HOSTNAME: setup - ETH_RPC_CONNECTION_PROTOCOL: http - ETH_RPC_CONNECTION_PORT: 8090 - MAYBE_RUN_LOCAL: "true" - MAYBE_DEPLOY_MCR: "true" - INDEXER_PROCESSOR_POSTGRES_CONNECTION_STRING: postgres://postgres:password@postgres:5432/postgres - volumes: - - ${DOT_MOVEMENT_PATH}:/.movement - ports: - - "8090:8090" # port for anvil, this should be moved out into runner - healthcheck: - test: [ "CMD-SHELL", "nc -zv 0.0.0.0 8090" ] - retries: 10 - interval: 10s - timeout: 5s \ No newline at end of file diff --git a/networks/suzuka/setup/Cargo.toml b/networks/suzuka/setup/Cargo.toml index fc0eec84c..2349e620e 100644 --- a/networks/suzuka/setup/Cargo.toml +++ b/networks/suzuka/setup/Cargo.toml @@ -33,6 +33,8 @@ celestia-types = { workspace = true } async-recursion = { workspace = true } tracing-subscriber = { workspace = true } godfig = { workspace = true } +syncup = { workspace = true } +futures = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/networks/suzuka/setup/src/main.rs b/networks/suzuka/setup/src/main.rs index 06c1961a2..14c74d099 100644 --- a/networks/suzuka/setup/src/main.rs +++ b/networks/suzuka/setup/src/main.rs @@ -1,5 +1,7 @@ use anyhow::Context; use godfig::{backend::config_file::ConfigFile, Godfig}; +use std::future::Future; +use std::pin::Pin; use suzuka_config::Config; use suzuka_full_node_setup::{local::Local, SuzukaFullNodeSetupOperations}; use tokio::signal::unix::signal; @@ -39,6 +41,24 @@ async fn main() -> Result<(), anyhow::Error> { // get the config file let dot_movement = dot_movement::DotMovement::try_from_env()?; + + // check if the MOVEMENT_SYNC environment variable is set + let sync_task: Pin> + Send>> = + if let Ok(bucket_arrow_glob) = std::env::var("MOVEMENT_SYNC") { + let mut bucket_arrow_glob = bucket_arrow_glob.split("<=>"); + let bucket = bucket_arrow_glob.next().context( + "MOVEMENT_SYNC environment variable must be in the format ,", + )?; + let glob = bucket_arrow_glob.next().context( + "MOVEMENT_SYNC environment variable must be in the format ,", + )?; + + let sync_task = dot_movement.sync(glob, bucket.to_string()).await?; + Box::pin(async { sync_task.await }) + } else { + Box::pin(async { futures::future::pending::>().await }) + }; + let mut config_file = dot_movement.try_get_or_create_config_file().await?; // get a matching godfig object @@ -66,6 +86,10 @@ async fn main() -> Result<(), anyhow::Error> { _ = stop_rx.changed() => { tracing::info!("Cancellation received, killing anvil task."); } + // sync task + _ = sync_task => { + tracing::info!("Sync task finished."); + } } Ok(()) diff --git a/protocol-units/da/m1/runners/src/celestia_bridge/local.rs b/protocol-units/da/m1/runners/src/celestia_bridge/local.rs index ad52eb05a..8b1662408 100644 --- a/protocol-units/da/m1/runners/src/celestia_bridge/local.rs +++ b/protocol-units/da/m1/runners/src/celestia_bridge/local.rs @@ -23,11 +23,13 @@ impl Local { let max_attempts = 30; // get the required connection details from the config + let connection_protocol = config.bridge.celestia_rpc_connection_protocol.clone(); let connection_hostname = config.bridge.celestia_rpc_connection_hostname.clone(); let connection_port = config.bridge.celestia_rpc_connection_port.clone(); - let celestia_rpc_address = format!("{}:{}", connection_hostname, connection_port); + let celestia_rpc_address = + format!("{}://{}:{}", connection_protocol, connection_hostname, connection_port); - let first_block_request_url = format!("http://{}/block?height=1", celestia_rpc_address); + let first_block_request_url = format!("{}/block?height=1", celestia_rpc_address); while genesis.len() <= 4 && cnt < max_attempts { info!("Waiting for genesis block."); let response = client @@ -56,7 +58,7 @@ impl Local { pub async fn run( &self, - dot_movement: dot_movement::DotMovement, + _dot_movement: dot_movement::DotMovement, config: m1_da_light_node_util::config::local::Config, ) -> Result<()> { let genesis = self.get_genesis_block(&config).await?; diff --git a/protocol-units/da/m1/runners/src/celestia_bridge/mod.rs b/protocol-units/da/m1/runners/src/celestia_bridge/mod.rs index 596b82f76..b3720d6c2 100644 --- a/protocol-units/da/m1/runners/src/celestia_bridge/mod.rs +++ b/protocol-units/da/m1/runners/src/celestia_bridge/mod.rs @@ -17,10 +17,10 @@ impl Runner for CelestiaBridge { let local = local::Local::new(); local.run(dot_movement, config).await?; } - m1_da_light_node_util::config::Config::Arabica(config) => { + m1_da_light_node_util::config::Config::Arabica(_config) => { Err(anyhow::anyhow!("Arabica not implemented"))?; } - m1_da_light_node_util::config::Config::Mocha(config) => { + m1_da_light_node_util::config::Config::Mocha(_config) => { Err(anyhow::anyhow!("Mocha not implemented"))?; } } diff --git a/protocol-units/da/m1/runners/src/celestia_light/arabica.rs b/protocol-units/da/m1/runners/src/celestia_light/arabica.rs index ab3a6a862..a4959ade4 100644 --- a/protocol-units/da/m1/runners/src/celestia_light/arabica.rs +++ b/protocol-units/da/m1/runners/src/celestia_light/arabica.rs @@ -8,8 +8,8 @@ impl Arabica { pub async fn run( &self, - dot_movement: dot_movement::DotMovement, - config: m1_da_light_node_util::config::local::Config, + _dot_movement: dot_movement::DotMovement, + _config: m1_da_light_node_util::config::local::Config, ) -> Result<(), anyhow::Error> { // celestia light start --core.ip validator-1.celestia-arabica-11.com --p2p.network arabica commander::run_command( diff --git a/protocol-units/da/m1/runners/src/celestia_light/mocha.rs b/protocol-units/da/m1/runners/src/celestia_light/mocha.rs index bbd6c130d..b31636bc8 100644 --- a/protocol-units/da/m1/runners/src/celestia_light/mocha.rs +++ b/protocol-units/da/m1/runners/src/celestia_light/mocha.rs @@ -8,8 +8,8 @@ impl Mocha { pub async fn run( &self, - dot_movement: dot_movement::DotMovement, - config: m1_da_light_node_util::config::local::Config, + _dot_movement: dot_movement::DotMovement, + _config: m1_da_light_node_util::config::local::Config, ) -> Result<(), anyhow::Error> { // celestia light start --core.ip validator-1.celestia-mocha-11.com --p2p.network mocha commander::run_command( diff --git a/protocol-units/da/m1/runners/src/celestia_light/mod.rs b/protocol-units/da/m1/runners/src/celestia_light/mod.rs index b68d49f54..6f53718a2 100644 --- a/protocol-units/da/m1/runners/src/celestia_light/mod.rs +++ b/protocol-units/da/m1/runners/src/celestia_light/mod.rs @@ -14,7 +14,7 @@ impl Runner for CelestiaLight { config: M1DaLightNodeConfig, ) -> Result<(), anyhow::Error> { match config.m1_da_light_node_config { - m1_da_light_node_util::config::Config::Local(config) => { + m1_da_light_node_util::config::Config::Local(_config) => { Err(anyhow::anyhow!("Local not implemented"))?; } m1_da_light_node_util::config::Config::Arabica(config) => { diff --git a/protocol-units/da/m1/util/src/config/common.rs b/protocol-units/da/m1/util/src/config/common.rs index 1c8ddc0d1..4a082d83b 100644 --- a/protocol-units/da/m1/util/src/config/common.rs +++ b/protocol-units/da/m1/util/src/config/common.rs @@ -12,6 +12,14 @@ env_default!( // The default port for the Celestia RPC env_default!(default_celestia_rpc_listen_port, "CELESTIA_RPC_LISTEN_PORT", u16, 26657); +// The default Celestia RPC connection protocol +env_default!( + default_celestia_rpc_connection_protocol, + "CELESTIA_RPC_CONNECTION_PROTOCOL", + String, + "http".to_string() +); + // The default Celestia RPC connection hostname env_default!( default_celestia_rpc_connection_hostname, @@ -34,6 +42,14 @@ env_default!( // The default port for the Celestia Node websocket env_default!(default_celestia_websocket_listen_port, "CELESTIA_WEBSOCKET_LISTEN_PORT", u16, 26658); +// the default Celestia Node websocket connection protocol +env_default!( + default_celestia_websocket_connection_protocol, + "CELESTIA_WEBSOCKET_CONNECTION_PROTOCOL", + String, + "ws".to_string() +); + // The default Celestia Node websocket connection hostname env_default!( default_celestia_websocket_connection_hostname, diff --git a/protocol-units/da/m1/util/src/config/local/appd.rs b/protocol-units/da/m1/util/src/config/local/appd.rs index f89aec83e..2b61d5983 100644 --- a/protocol-units/da/m1/util/src/config/local/appd.rs +++ b/protocol-units/da/m1/util/src/config/local/appd.rs @@ -2,7 +2,7 @@ use crate::config::common::{ default_celestia_appd_replace_args, default_celestia_appd_use_replace_args, default_celestia_chain_id, default_celestia_namespace, default_celestia_rpc_listen_hostname, default_celestia_rpc_listen_port, default_celestia_websocket_connection_hostname, - default_celestia_websocket_connection_port, + default_celestia_websocket_connection_port, default_celestia_websocket_connection_protocol, }; use celestia_types::nmt::Namespace; @@ -19,6 +19,10 @@ pub struct Config { #[serde(default = "default_celestia_rpc_listen_port")] pub celestia_rpc_listen_port: u16, + /// The protocol for the Celestia Node websocket + #[serde(default = "default_celestia_websocket_connection_protocol")] + pub celestia_websocket_connection_protocol: String, + /// The hostname of the Celestia Node websocket #[serde(default = "default_celestia_websocket_connection_hostname")] pub celestia_websocket_connection_hostname: String, @@ -60,6 +64,8 @@ impl Default for Config { Self { celestia_rpc_listen_hostname: default_celestia_rpc_listen_hostname(), celestia_rpc_listen_port: default_celestia_rpc_listen_port(), + celestia_websocket_connection_protocol: default_celestia_websocket_connection_protocol( + ), celestia_websocket_connection_hostname: default_celestia_websocket_connection_hostname( ), celestia_websocket_connection_port: default_celestia_websocket_connection_port(), diff --git a/protocol-units/da/m1/util/src/config/local/bridge.rs b/protocol-units/da/m1/util/src/config/local/bridge.rs index 279959cd1..4f5a1bd39 100644 --- a/protocol-units/da/m1/util/src/config/local/bridge.rs +++ b/protocol-units/da/m1/util/src/config/local/bridge.rs @@ -1,13 +1,18 @@ use crate::config::common::{ default_celestia_bridge_replace_args, default_celestia_bridge_use_replace_args, default_celestia_rpc_connection_hostname, default_celestia_rpc_connection_port, - default_celestia_websocket_listen_hostname, default_celestia_websocket_listen_port, + default_celestia_rpc_connection_protocol, default_celestia_websocket_listen_hostname, + default_celestia_websocket_listen_port, }; use serde::{Deserialize, Serialize}; /// The inner configuration for the local Celestia Bridge Runner #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Config { + /// The protocol for the Celestia RPC + #[serde(default = "default_celestia_rpc_connection_protocol")] + pub celestia_rpc_connection_protocol: String, + /// The URL of the Celestia RPC #[serde(default = "default_celestia_rpc_connection_hostname")] pub celestia_rpc_connection_hostname: String, @@ -40,6 +45,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { + celestia_rpc_connection_protocol: default_celestia_rpc_connection_protocol(), celestia_rpc_connection_hostname: default_celestia_rpc_connection_hostname(), celestia_rpc_connection_port: default_celestia_rpc_connection_port(), celestia_websocket_listen_hostname: default_celestia_websocket_listen_hostname(), diff --git a/protocol-units/da/m1/util/src/config/local/m1_da_light_node.rs b/protocol-units/da/m1/util/src/config/local/m1_da_light_node.rs index 5f9f467a0..867ce5c58 100644 --- a/protocol-units/da/m1/util/src/config/local/m1_da_light_node.rs +++ b/protocol-units/da/m1/util/src/config/local/m1_da_light_node.rs @@ -1,14 +1,19 @@ use crate::config::common::{ default_celestia_rpc_connection_hostname, default_celestia_rpc_connection_port, - default_celestia_websocket_connection_hostname, default_celestia_websocket_connection_port, - default_m1_da_light_node_connection_hostname, default_m1_da_light_node_connection_port, - default_m1_da_light_node_listen_hostname, default_m1_da_light_node_listen_port, + default_celestia_rpc_connection_protocol, default_celestia_websocket_connection_hostname, + default_celestia_websocket_connection_port, default_m1_da_light_node_connection_hostname, + default_m1_da_light_node_connection_port, default_m1_da_light_node_listen_hostname, + default_m1_da_light_node_listen_port, }; use serde::{Deserialize, Serialize}; /// The inner configuration for the local Celestia Appd Runner #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Config { + /// The protocol for the Celestia RPC + #[serde(default = "default_celestia_rpc_connection_protocol")] + pub celestia_rpc_connection_protocol: String, + /// The URL of the Celestia RPC #[serde(default = "default_celestia_rpc_connection_hostname")] pub celestia_rpc_connection_hostname: String, @@ -45,6 +50,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { + celestia_rpc_connection_protocol: default_celestia_rpc_connection_protocol(), celestia_rpc_connection_hostname: default_celestia_rpc_connection_hostname(), celestia_rpc_connection_port: default_celestia_rpc_connection_port(), celestia_websocket_connection_hostname: default_celestia_websocket_connection_hostname( diff --git a/protocol-units/da/m1/util/src/config/mod.rs b/protocol-units/da/m1/util/src/config/mod.rs index 5a72be098..25d472da4 100644 --- a/protocol-units/da/m1/util/src/config/mod.rs +++ b/protocol-units/da/m1/util/src/config/mod.rs @@ -32,7 +32,8 @@ impl Config { match self { Config::Local(local) => { let celestia_node_url = format!( - "ws://{}:{}", + "{}://{}:{}", + local.appd.celestia_websocket_connection_protocol, local.appd.celestia_websocket_connection_hostname, local.appd.celestia_websocket_connection_port ); @@ -55,7 +56,8 @@ impl Config { Config::Arabica(local) => { // arabica is also local for now let celestia_node_url = format!( - "ws://{}:{}", + "{}://{}:{}", + local.appd.celestia_websocket_connection_protocol, local.appd.celestia_websocket_connection_hostname, local.appd.celestia_websocket_connection_port ); @@ -78,7 +80,8 @@ impl Config { Config::Mocha(local) => { // mocha is also local for now let celestia_node_url = format!( - "ws://{}:{}", + "{}://{}:{}", + local.appd.celestia_websocket_connection_protocol, local.appd.celestia_websocket_connection_hostname, local.appd.celestia_websocket_connection_port ); @@ -178,12 +181,17 @@ impl Config { pub fn try_block_building_parameters(&self) -> Result<(u32, u64), anyhow::Error> { match self { - Config::Local(local) => Ok((local.memseq.memseq_max_block_size, local.memseq.memseq_build_time)), - Config::Arabica(local) => Ok((local.memseq.memseq_max_block_size, local.memseq.memseq_build_time)), - Config::Mocha(local) => Ok((local.memseq.memseq_max_block_size, local.memseq.memseq_build_time)), + Config::Local(local) => { + Ok((local.memseq.memseq_max_block_size, local.memseq.memseq_build_time)) + } + Config::Arabica(local) => { + Ok((local.memseq.memseq_max_block_size, local.memseq.memseq_build_time)) + } + Config::Mocha(local) => { + Ok((local.memseq.memseq_max_block_size, local.memseq.memseq_build_time)) + } } } - } /// The M1 DA Light Node configuration as should be read from file. diff --git a/protocol-units/syncing/syncup/Cargo.toml b/protocol-units/syncing/syncup/Cargo.toml new file mode 100644 index 000000000..dca735661 --- /dev/null +++ b/protocol-units/syncing/syncup/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "syncup" +version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +syncador = { workspace = true } +tokio = { workspace = true } +async-stream = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } +futures = { workspace = true } + +[lints] +workspace = true diff --git a/protocol-units/syncing/syncup/README.md b/protocol-units/syncing/syncup/README.md new file mode 100644 index 000000000..2f3b15b5d --- /dev/null +++ b/protocol-units/syncing/syncup/README.md @@ -0,0 +1,2 @@ +# `syncup` +Used to easily setup sync for an application that downsyncs once and then upsyncs continuously. \ No newline at end of file diff --git a/protocol-units/syncing/syncup/src/lib.rs b/protocol-units/syncing/syncup/src/lib.rs new file mode 100644 index 000000000..a80b54a88 --- /dev/null +++ b/protocol-units/syncing/syncup/src/lib.rs @@ -0,0 +1,95 @@ +use std::path::PathBuf; +use syncador::backend::{archive, glob, pipeline, s3, PullOperations, PushOperations}; +use tokio::time::interval; + +#[derive(Debug, Clone)] +pub struct Notifier { + pub sender: tokio::sync::mpsc::Sender<()>, +} + +impl Notifier { + pub fn new(size: usize) -> (Self, tokio::sync::mpsc::Receiver<()>) { + let (sender, receiver) = tokio::sync::mpsc::channel(size); + (Self { sender }, receiver) + } +} + +#[async_trait::async_trait] +impl PushOperations for Notifier { + async fn push(&self, package: syncador::Package) -> Result { + self.sender.send(()).await?; + Ok(package) + } +} + +#[async_trait::async_trait] +impl PullOperations for Notifier { + async fn pull(&self, package: syncador::Package) -> Result { + self.sender.send(()).await?; + Ok(package) + } +} + +#[derive(Debug, Clone)] +pub enum Target { + S3(String), +} + +impl Target { + pub async fn create_pipelines( + &self, + root_dir: PathBuf, + glob: &str, + notifier: Notifier, + ) -> Result<(pipeline::push::Pipeline, pipeline::pull::Pipeline), anyhow::Error> { + match self { + Target::S3(bucket) => { + let (s3_push, s3_pull) = s3::shared_bucket::create_random(bucket.clone()).await?; + + let push_pipe = pipeline::push::Pipeline::new(vec![ + Box::new(glob::file::FileGlob::try_new(glob, root_dir.clone())?), + Box::new(archive::gzip::push::Push::new(root_dir.clone())), + Box::new(s3_push), + Box::new(notifier.clone()), + ]); + + let pull_pipe = pipeline::pull::Pipeline::new(vec![ + Box::new(s3_pull), + Box::new(archive::gzip::pull::Pull::new(root_dir.clone())), + Box::new(notifier.clone()), + ]); + + Ok((push_pipe, pull_pipe)) + } + } + } +} + +/// Takes a glob pattern and a target, and syncs up the files in the glob to the target. +/// Returns two futures, one for the initial pull and one for the indefinite push. +pub async fn syncup( + root_dir: PathBuf, + glob: &str, + target: Target, +) -> Result>, anyhow::Error> { + // create the pipelines for the target + let (push_pipeline, pull_pipeline) = + target.create_pipelines(root_dir.clone(), glob, Notifier::new(1).0).await?; + + // run the pull pipeline once + pull_pipeline.pull(syncador::Package::null()).await?; + + // Create the upsync task using tokio::time::interval + let upsync_task = async move { + let mut interval = interval(std::time::Duration::from_millis( + s3::shared_bucket::metadata::DEFAULT_SYNC_EPOCH_DURATION, + )); + loop { + interval.tick().await; + push_pipeline.push(syncador::Package::null()).await?; + } + Ok::<(), anyhow::Error>(()) + }; + + Ok(upsync_task) +} diff --git a/scripts/tests/container-test b/scripts/tests/container-test index ed8ca129a..50e81ae9a 100755 --- a/scripts/tests/container-test +++ b/scripts/tests/container-test @@ -1,6 +1,6 @@ #!/bin/bash # Run the command and capture the output -STATUS=$(timeout 4m just suzuka-full-node docker-compose setup-local.celestia-local.faucet-replicas.test --abort-on-container-failure) +STATUS=$(timeout 4m just suzuka-full-node docker-compose local.faucet-replicas.test --abort-on-container-failure) EXIT_CODE=$? # Check the exit code and act accordingly diff --git a/util/dot-movement/Cargo.toml b/util/dot-movement/Cargo.toml index 7650cc818..4344bfa1e 100644 --- a/util/dot-movement/Cargo.toml +++ b/util/dot-movement/Cargo.toml @@ -16,6 +16,7 @@ anyhow = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } tokio = { workspace = true } +syncup = { workspace = true } [lints] workspace = true diff --git a/util/dot-movement/src/lib.rs b/util/dot-movement/src/lib.rs index 60135c607..e37695945 100644 --- a/util/dot-movement/src/lib.rs +++ b/util/dot-movement/src/lib.rs @@ -1,4 +1,5 @@ pub mod path; +pub mod sync; #[derive(Debug, Clone)] pub struct DotMovement(std::path::PathBuf); diff --git a/util/dot-movement/src/sync.rs b/util/dot-movement/src/sync.rs new file mode 100644 index 000000000..6ecc2c6c9 --- /dev/null +++ b/util/dot-movement/src/sync.rs @@ -0,0 +1,13 @@ +use crate::DotMovement; +use syncup::{syncup, Target}; + +impl DotMovement { + pub async fn sync( + &self, + glob: &str, + bucket: String, + ) -> Result>, anyhow::Error> { + let sync_task = syncup(self.0.clone(), glob, Target::S3(bucket)).await?; + Ok(sync_task) + } +} diff --git a/util/syncador/Cargo.toml b/util/syncador/Cargo.toml index 2f57abe42..4735d29c6 100644 --- a/util/syncador/Cargo.toml +++ b/util/syncador/Cargo.toml @@ -28,6 +28,7 @@ async-trait = { workspace = true } tar = { workspace = true } flate2 = { workspace = true } clap = { workspace = true } +glob = { workspace = true } [dev-dependencies] uuid = { workspace = true } diff --git a/util/syncador/src/backend/archive/gzip/pull.rs b/util/syncador/src/backend/archive/gzip/pull.rs index 09ad4cc49..9e8ab00fc 100644 --- a/util/syncador/src/backend/archive/gzip/pull.rs +++ b/util/syncador/src/backend/archive/gzip/pull.rs @@ -16,6 +16,11 @@ pub struct Pull { } impl Pull { + /// Creates a new Pull instance. + pub fn new(destination_dir: PathBuf) -> Self { + Self { destination_dir } + } + /// Iteratively collects all files (not directories) in the specified directory using BFS. async fn collect_files(dir: &Path, entries: &mut Vec) -> Result<(), anyhow::Error> { let mut queue = VecDeque::new(); diff --git a/util/syncador/src/backend/constant/mod.rs b/util/syncador/src/backend/constant/mod.rs new file mode 100644 index 000000000..b49207c16 --- /dev/null +++ b/util/syncador/src/backend/constant/mod.rs @@ -0,0 +1,21 @@ +use crate::backend::{PullOperations, PushOperations}; +use crate::files::package::Package; + +#[derive(Debug, Clone)] +pub struct Constant { + pub package: Package, +} + +#[async_trait::async_trait] +impl PullOperations for Constant { + async fn pull(&self, _package: Package) -> Result { + Ok(self.package.clone()) + } +} + +#[async_trait::async_trait] +impl PushOperations for Constant { + async fn push(&self, package: Package) -> Result { + Ok(package) + } +} diff --git a/util/syncador/src/backend/copy/mod.rs b/util/syncador/src/backend/copy/mod.rs deleted file mode 100644 index 07ddfc579..000000000 --- a/util/syncador/src/backend/copy/mod.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::backend::BackendOperations; -use crate::files::package::{Package, PackageElement}; -use std::path::PathBuf; - -#[derive(Debug, Clone)] -pub struct Copy { - pub copy_dir: PathBuf, -} - -impl Copy { - pub fn new(copy_dir: PathBuf) -> Self { - Self { copy_dir } - } - - pub async fn copy_to_based_on_manifest( - &self, - manifest: PackageElement, - ) -> Result { - let mut path_copy_futures = Vec::new(); - for (relative_path, absolute_path) in manifest.try_path_tuples()? { - // compute the temp path - let mut temp_path = self.copy_dir.clone(); - temp_path.push(relative_path); - - path_copy_futures.push(async move { - // make all of the parent directories - if let Some(parent) = temp_path.parent() { - std::fs::create_dir_all(parent)?; - } - - // copy the file - std::fs::copy(absolute_path, &temp_path)?; - - Ok::<(PathBuf, PathBuf), anyhow::Error>((relative_path.to_path_buf(), temp_path)) - }); - } - - let put_copy_outputs = futures::future::try_join_all(path_copy_futures).await?; - let mut new_manifest = PackageElement::empty_matching(&manifest, self.copy_dir.clone()); - for (_, path) in put_copy_outputs { - new_manifest.add_sync_file(path); - } - - Ok(new_manifest) - } -} - -#[async_trait::async_trait] -impl BackendOperations for Copy { - async fn push(&self, package: Package) -> Result { - let mut manifest_futures = Vec::new(); - for manifest in package.into_manifests() { - let future = self.copy_to_based_on_manifest(manifest); - manifest_futures.push(future); - } - let manifests = futures::future::try_join_all(manifest_futures).await?; - Ok(Package(manifests)) - } - - async fn pull(&self, package: Package) -> Result { - // same as push - self.push(package).await - } -} diff --git a/util/syncador/src/backend/glob/file.rs b/util/syncador/src/backend/glob/file.rs new file mode 100644 index 000000000..c88996f18 --- /dev/null +++ b/util/syncador/src/backend/glob/file.rs @@ -0,0 +1,40 @@ +use std::path::PathBuf; + +use crate::backend::{PullOperations, PushOperations}; +use crate::files::package::{Package, PackageElement}; +use glob::{glob, Pattern}; + +#[derive(Debug, Clone)] +pub struct FileGlob { + pub pattern: Pattern, + pub root_dir: PathBuf, +} + +impl FileGlob { + pub fn try_new(pattern: &str, root_dir: PathBuf) -> Result { + // the pattern is actually the pattern applied to the root_dir + let root_pattern = format!("{}/{}", root_dir.to_string_lossy(), pattern); + + Ok(Self { pattern: Pattern::new(root_pattern.as_str())?, root_dir }) + } +} + +#[async_trait::async_trait] +impl PullOperations for FileGlob { + async fn pull(&self, _package: Package) -> Result { + // just check the matching glob files + let mut sync_files = Vec::new(); + for path in glob(self.pattern.as_str())? { + sync_files.push(path?); + } + let package_element = PackageElement { sync_files, root_dir: self.root_dir.clone() }; + Ok(Package(vec![package_element])) + } +} + +#[async_trait::async_trait] +impl PushOperations for FileGlob { + async fn push(&self, package: Package) -> Result { + self.pull(package).await + } +} diff --git a/util/syncador/src/backend/glob/mod.rs b/util/syncador/src/backend/glob/mod.rs new file mode 100644 index 000000000..42f3a2a43 --- /dev/null +++ b/util/syncador/src/backend/glob/mod.rs @@ -0,0 +1,2 @@ +pub mod file; +pub mod package; diff --git a/util/syncador/src/backend/glob/package.rs b/util/syncador/src/backend/glob/package.rs new file mode 100644 index 000000000..4e40e8d96 --- /dev/null +++ b/util/syncador/src/backend/glob/package.rs @@ -0,0 +1,47 @@ +use crate::backend::{PullOperations, PushOperations}; +use crate::files::package::{Package, PackageElement}; +use glob::Pattern; + +#[derive(Debug, Clone)] +pub struct PackageGlob { + pub pattern: Pattern, +} + +impl PackageGlob { + pub fn new(pattern: &str) -> Self { + Self { pattern: Pattern::new(pattern).unwrap() } + } + + pub fn is_match(&self, path: &str) -> bool { + self.pattern.matches(path) + } +} + +#[async_trait::async_trait] +impl PullOperations for PackageGlob { + async fn pull(&self, package: Package) -> Result { + let filtered = package + .0 + .into_iter() + .map( + // filter the sync files in the entry that match the glob pattern + |entry| { + let sync_files = entry + .sync_files + .into_iter() + .filter(|entry| self.is_match(&entry.to_string_lossy())) + .collect(); + PackageElement { sync_files, root_dir: entry.root_dir } + }, + ) + .collect(); + Ok(Package(filtered)) + } +} + +#[async_trait::async_trait] +impl PushOperations for PackageGlob { + async fn push(&self, package: Package) -> Result { + self.pull(package).await + } +} diff --git a/util/syncador/src/backend/mod.rs b/util/syncador/src/backend/mod.rs index d2da7cc12..e021aeb1f 100644 --- a/util/syncador/src/backend/mod.rs +++ b/util/syncador/src/backend/mod.rs @@ -1,18 +1,11 @@ // pub mod archive; // pub mod copy; pub mod archive; +pub mod constant; +pub mod glob; pub mod pipeline; pub mod s3; use crate::files::package::Package; -#[async_trait::async_trait] -pub trait BackendOperations { - /// Uploads a package to the backend. - async fn push(&self, package: Package) -> Result; - - /// Downloads a package from the backend. - async fn pull(&self, package: Package) -> Result; -} - #[async_trait::async_trait] pub trait PushOperations { async fn push(&self, package: Package) -> Result; diff --git a/util/syncador/src/backend/pipeline/pull.rs b/util/syncador/src/backend/pipeline/pull.rs index a4dd3a6c1..4b24c3c44 100644 --- a/util/syncador/src/backend/pipeline/pull.rs +++ b/util/syncador/src/backend/pipeline/pull.rs @@ -2,15 +2,15 @@ use crate::backend::PullOperations; use crate::files::package::Package; pub struct Pipeline { - pub backends: Vec>, + pub backends: Vec>, } impl Pipeline { - pub fn new(backends: Vec>) -> Self { + pub fn new(backends: Vec>) -> Self { Self { backends } } - pub fn boxed(backends: Vec>) -> Box { + pub fn boxed(backends: Vec>) -> Box { Box::new(Self::new(backends)) } } diff --git a/util/syncador/src/backend/pipeline/push.rs b/util/syncador/src/backend/pipeline/push.rs index e69de29bb..0e89ce5fa 100644 --- a/util/syncador/src/backend/pipeline/push.rs +++ b/util/syncador/src/backend/pipeline/push.rs @@ -0,0 +1,27 @@ +use crate::backend::PushOperations; +use crate::files::package::Package; + +pub struct Pipeline { + pub backends: Vec>, +} + +impl Pipeline { + pub fn new(backends: Vec>) -> Self { + Self { backends } + } + + pub fn boxed(backends: Vec>) -> Box { + Box::new(Self::new(backends)) + } +} + +#[async_trait::async_trait] +impl PushOperations for Pipeline { + async fn push(&self, package: Package) -> Result { + let mut package = package; + for backend in &self.backends { + package = backend.push(package.clone()).await?; + } + Ok(package) + } +} diff --git a/util/syncador/src/backend/s3/shared_bucket/metadata.rs b/util/syncador/src/backend/s3/shared_bucket/metadata.rs index b1e03297d..08b2efb01 100644 --- a/util/syncador/src/backend/s3/shared_bucket/metadata.rs +++ b/util/syncador/src/backend/s3/shared_bucket/metadata.rs @@ -3,6 +3,8 @@ use movement_types::Id; use std::collections::HashSet; use std::time; +pub const DEFAULT_SYNC_EPOCH_DURATION: u64 = 1000 * 60; + #[derive(Debug, Clone)] pub struct Metadata { pub application_id: Id, @@ -25,8 +27,8 @@ impl Metadata { Self { application_id: Id::random(), syncer_id: Id::random(), - sync_epoch_duration: 1000 * 60 * 60 * 24, - retain_epochs_count: 7, + sync_epoch_duration: DEFAULT_SYNC_EPOCH_DURATION, + retain_epochs_count: 16, } } diff --git a/util/syncador/src/backend/s3/shared_bucket/mod.rs b/util/syncador/src/backend/s3/shared_bucket/mod.rs index fd4eb3b33..018b55729 100644 --- a/util/syncador/src/backend/s3/shared_bucket/mod.rs +++ b/util/syncador/src/backend/s3/shared_bucket/mod.rs @@ -4,6 +4,12 @@ pub mod metadata; pub mod pull; pub mod push; +pub async fn create_random(bucket: String) -> Result<(push::Push, pull::Pull), anyhow::Error> { + let config = aws_config::load_from_env().await; + let client = aws_sdk_s3::Client::new(&config); + create(client, bucket, metadata::Metadata::random()).await +} + pub async fn create( client: aws_sdk_s3::Client, bucket: String, diff --git a/util/syncador/src/files/package/mod.rs b/util/syncador/src/files/package/mod.rs index 252cfeac0..17cbefdc0 100644 --- a/util/syncador/src/files/package/mod.rs +++ b/util/syncador/src/files/package/mod.rs @@ -1,10 +1,14 @@ use std::path::{Path, PathBuf}; /// A package is a collection of file system locations that are synced together, either publicly or privately. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Package(pub Vec); impl Package { + pub fn null() -> Self { + Self(Vec::new()) + } + /// Returns references to all of the package manifests in the package. pub fn as_manifests(&self) -> Vec<&PackageElement> { self.0.iter().collect() diff --git a/util/syncador/src/lib.rs b/util/syncador/src/lib.rs index 2ca6e2f63..afcc27ab0 100644 --- a/util/syncador/src/lib.rs +++ b/util/syncador/src/lib.rs @@ -1,3 +1,111 @@ pub mod backend; pub mod files; -pub mod providers; + +pub use crate::backend::{PullOperations, PushOperations}; +pub use crate::files::package::{Package, PackageElement}; +use futures::pin_mut; +use futures::stream::{Stream, StreamExt}; + +pub async fn sync< + Q: PushOperations + Send + Sync + 'static, + P: PullOperations + Send + Sync + 'static, +>( + push_stream: impl Stream> + Send, + push: Q, + pull_stream: impl Stream> + Send, + pull: P, +) -> Result<(), anyhow::Error> { + // Pin the streams to use them in loops + pin_mut!(push_stream); + pin_mut!(pull_stream); + + // run both pull and push operations until the last one closes or one fails + let pull = async { + while let Some(pkg) = pull_stream.next().await { + pull.pull(pkg?).await?; + } + Ok::<(), anyhow::Error>(()) + }; + + let push = async { + while let Some(pkg) = push_stream.next().await { + push.push(pkg?).await?; + } + Ok::<(), anyhow::Error>(()) + }; + + futures::try_join!(pull, push)?; + + Ok(()) +} + +#[cfg(test)] +pub mod test { + + use super::*; + + pub struct TestSyncer { + pub sender: tokio::sync::mpsc::Sender<()>, + } + + impl TestSyncer { + pub fn new(size: usize) -> (Self, tokio::sync::mpsc::Receiver<()>) { + let (sender, receiver) = tokio::sync::mpsc::channel(size); + (Self { sender }, receiver) + } + } + + #[async_trait::async_trait] + impl PushOperations for TestSyncer { + async fn push(&self, package: Package) -> Result { + println!("push"); + self.sender.send(()).await?; + Ok(package) + } + } + + #[async_trait::async_trait] + impl PullOperations for TestSyncer { + async fn pull(&self, package: Package) -> Result { + println!("pull"); + self.sender.send(()).await?; + Ok(package) + } + } + + #[tokio::test] + pub async fn test_example_sync() -> Result<(), anyhow::Error> { + let (push_syncer, mut push_receiver) = TestSyncer::new(1); + let (pull_syncer, mut pull_receiver) = TestSyncer::new(10); + + // use a once stream for the push stream + let push_stream = futures::stream::once(async { Ok(Package::null()) }); + + // use a 10 ms interval stream for the pull stream + let pull_stream = async_stream::stream! { + for _ in 0..10 { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + yield Ok(Package::null()); + } + }; + + // let the sync function run for 100 milliseconds + sync(push_stream, push_syncer, pull_stream, pull_syncer).await?; + + // check that the push operations were called by draining the receiver + let mut push_receiver_count = 0; + while let Some(_) = push_receiver.recv().await { + push_receiver_count += 1; + } + assert_eq!(push_receiver_count, 1); + + // check that the pull operations were called by draining the receiver + let mut pull_receiver_count = 0; + while let Some(_) = pull_receiver.recv().await { + pull_receiver_count += 1; + } + assert_eq!(pull_receiver_count, 10); + + Ok(()) + } +} diff --git a/util/syncador/src/providers/archive_s3/mod.rs b/util/syncador/src/providers/archive_s3/mod.rs deleted file mode 100644 index b918309c5..000000000 --- a/util/syncador/src/providers/archive_s3/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -/// let push = PushPipeline(vec![ -/// MatcherPush::new(), -/// ArchiveGzipPush::new(), -/// S3Push::new(), -/// ]); -/// -/// let pull = PullPipeline(vec![ -/// S3Pull::new(), -/// ArchiveGzipPull::new(), -/// ]); -/// -/// let push_runner = Every(10, minutes).push(push); -/// let pull_runner = Once.pull(pull); -/// -/// let syncer = Syncer::new(push_runner, pull_runner); -/// syncer.run().await?; -pub mod nothing_yet {} diff --git a/util/syncador/src/providers/mod.rs b/util/syncador/src/providers/mod.rs deleted file mode 100644 index a9d2c2c74..000000000 --- a/util/syncador/src/providers/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod archive_s3;