From 568ee26568e04b93bc493202b41c0a6f6121a163 Mon Sep 17 00:00:00 2001 From: Radu Popa Date: Mon, 24 Feb 2025 12:53:49 +0200 Subject: [PATCH 1/4] feat: add curl in the final container --- docker/build/movement-indexer/Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker/build/movement-indexer/Dockerfile b/docker/build/movement-indexer/Dockerfile index 6c093bdec..64a23dad0 100644 --- a/docker/build/movement-indexer/Dockerfile +++ b/docker/build/movement-indexer/Dockerfile @@ -17,6 +17,9 @@ RUN rust_binary="./target/release/movement-indexer-service"; dest_dir="/tmp/runt FROM alpine:latest +# curl is needed for the health check +RUN apk add --no-cache curl + # Copy the build artifact from the builder stage COPY --from=builder /tmp/build/target/release/movement-indexer-service /app/movement-indexer-service COPY --from=builder /tmp/build/target/release/load_metadata /app/load_metadata From 50612dee8e275ecd6a38f1dbdf415cf2d5657df2 Mon Sep 17 00:00:00 2001 From: Philippe Delrieu Date: Tue, 25 Feb 2025 16:00:35 +0100 Subject: [PATCH 2/4] [Node] Add retry on DA connection (#1054) --- Cargo.lock | 1 + .../da/movement/protocol/client/Cargo.toml | 1 + .../da/movement/protocol/client/src/lib.rs | 26 +++++++++++++++++-- util/godfig/src/backend/config_file/mod.rs | 2 +- util/godfig/src/backend/mod.rs | 4 +-- util/signing/interface/src/key/mod.rs | 18 ++++++------- 6 files changed, 38 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91a9f3150..e7318276a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11899,6 +11899,7 @@ dependencies = [ "tonic 0.12.3", "tonic-web", "tower 0.5.1", + "tracing", ] [[package]] diff --git a/protocol-units/da/movement/protocol/client/Cargo.toml b/protocol-units/da/movement/protocol/client/Cargo.toml index 62acd8002..6a65454e8 100644 --- a/protocol-units/da/movement/protocol/client/Cargo.toml +++ b/protocol-units/da/movement/protocol/client/Cargo.toml @@ -20,6 +20,7 @@ tower = { workspace = true } http-body-util = { workspace = true } bytes = { workspace = true } anyhow = { workspace = true } +tracing = { workspace = true } [lints] diff --git a/protocol-units/da/movement/protocol/client/src/lib.rs b/protocol-units/da/movement/protocol/client/src/lib.rs index a52577be2..85b450b41 100644 --- a/protocol-units/da/movement/protocol/client/src/lib.rs +++ b/protocol-units/da/movement/protocol/client/src/lib.rs @@ -15,12 +15,34 @@ pub enum MovementDaLightNodeClient { impl MovementDaLightNodeClient { /// Creates an http1 connection to the light node service. pub fn try_http1(connection_string: &str) -> Result { - Ok(Self::Http1(http1::Http1::try_new(connection_string)?)) + for _ in 0..5 { + match http1::Http1::try_new(connection_string) { + Ok(result) => return Ok(Self::Http1(result)), + Err(err) => { + tracing::warn!("DA Http1 connection failed: {}. Retrying in 5s...", err); + std::thread::sleep(std::time::Duration::from_secs(5)); + } + } + } + return Err( + anyhow::anyhow!("Error DA Http1 connection failed more than 5 time aborting.",), + ); } /// Creates an http2 connection to the light node service. pub async fn try_http2(connection_string: &str) -> Result { - Ok(Self::Http2(http2::Http2::connect(connection_string).await?)) + for _ in 0..5 { + match http2::Http2::connect(connection_string).await { + Ok(result) => return Ok(Self::Http2(result)), + Err(err) => { + tracing::warn!("DA Http2 connection failed: {}. Retrying in 5s...", err); + std::thread::sleep(std::time::Duration::from_secs(5)); + } + } + } + return Err( + anyhow::anyhow!("Error DA Http2 connection failed more than 5 time aborting.",), + ); } /// Stream reads from a given height. diff --git a/util/godfig/src/backend/config_file/mod.rs b/util/godfig/src/backend/config_file/mod.rs index 1bfd39176..de2def7e1 100644 --- a/util/godfig/src/backend/config_file/mod.rs +++ b/util/godfig/src/backend/config_file/mod.rs @@ -44,7 +44,7 @@ impl ConfigFile { } let json: serde_json::Value = serde_json::from_str(&contents) - .map_err(|e| GodfigBackendError::TypeContractMismatch(e.to_string()))?; + .map_err(|e| GodfigBackendError::ConfigDeserializationError(e.to_string()))?; let keys = key.into(); let mut current = &json; diff --git a/util/godfig/src/backend/mod.rs b/util/godfig/src/backend/mod.rs index de2d0c101..19cca0853 100644 --- a/util/godfig/src/backend/mod.rs +++ b/util/godfig/src/backend/mod.rs @@ -7,8 +7,8 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum GodfigBackendError { - #[error("Type Contract Mismatch")] - TypeContractMismatch(String), + #[error("An error occurs during config deserialization: {0}")] + ConfigDeserializationError(String), #[error("Backend Error: {0}")] BackendError(#[from] anyhow::Error), #[error("IO Error: {0}")] diff --git a/util/signing/interface/src/key/mod.rs b/util/signing/interface/src/key/mod.rs index 2094c355b..0ecdfd4e8 100644 --- a/util/signing/interface/src/key/mod.rs +++ b/util/signing/interface/src/key/mod.rs @@ -220,18 +220,18 @@ impl TryFromCanonicalString for Key { /// Example canonical string: "movement/prod/full_node/mcr_settlement/signer/validator/0" fn try_from_canonical_string(s: &str) -> Result { let parts: Vec<&str> = s.split('/').collect(); - if parts.len() != 7 { - return Err(format!("invalid key: {}", s)); + if parts.len() != 8 { + return Err(format!("invalid key, bad number of elements {:?}: '{}'", parts, s)); } Ok(Self { - org: Organization::try_from_canonical_string(parts[0])?, - environment: Environment::try_from_canonical_string(parts[1])?, - software_unit: SoftwareUnit::try_from_canonical_string(parts[2])?, - usage: Usage::try_from_canonical_string(parts[3])?, - allowed_roles: AllowedRoles::try_from_canonical_string(parts[4])?, - key_name: parts[5].to_string(), - app_replica: Some(parts[6].to_string()), + org: Organization::try_from_canonical_string(parts[1])?, + environment: Environment::try_from_canonical_string(parts[2])?, + software_unit: SoftwareUnit::try_from_canonical_string(parts[3])?, + usage: Usage::try_from_canonical_string(parts[4])?, + allowed_roles: AllowedRoles::try_from_canonical_string(parts[5])?, + key_name: parts[6].to_string(), + app_replica: Some(parts[7].to_string()), }) } } From fc5109ed13e0b6ac251f837cf3baa8629e4fd719 Mon Sep 17 00:00:00 2001 From: Philippe Delrieu Date: Thu, 27 Feb 2025 14:08:55 +0100 Subject: [PATCH 3/4] [DA] Update light client protocol and heartbeat. (#1064) --- Cargo.lock | 2 + .../movement/movement-full-node/Cargo.toml | 1 + .../src/da/stream_blocks/mod.rs | 19 +++- .../src/node/tasks/execute_settle.rs | 11 ++- .../da/light_node/v1beta2.proto | 99 +++++++++++++++++++ .../celestia/light-node/src/v1/passthrough.rs | 3 +- .../da/movement/protocol/da/Cargo.toml | 1 + .../da/movement/protocol/da/src/lib.rs | 2 +- .../da/movement/protocol/da/src/mock/mod.rs | 2 +- .../protocol/light-node/src/passthrough.rs | 69 ++++++++----- .../protocol/light-node/src/sequencer.rs | 12 ++- .../da/movement/protocol/proto/build.rs | 5 +- .../da/movement/protocol/proto/src/lib.rs | 6 +- .../protocol/util/src/blob/ir/blob.rs | 21 ++-- .../protocol/util/src/config/common.rs | 2 +- .../movement/providers/celestia/src/da/mod.rs | 15 ++- .../providers/digest-store/src/da/mod.rs | 14 ++- 17 files changed, 232 insertions(+), 52 deletions(-) create mode 100644 proto/movementlabs/protocol_units/da/light_node/v1beta2.proto diff --git a/Cargo.lock b/Cargo.lock index e7318276a..61d4445bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11909,6 +11909,7 @@ dependencies = [ "anyhow", "async-stream", "futures", + "hex", "movement-da-light-node-proto", "movement-da-util", "movement-signer", @@ -12106,6 +12107,7 @@ dependencies = [ "anyhow", "aptos-framework-elsa-to-biarritz-rc1-migration", "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", + "chrono", "clap 4.5.21", "console-subscriber", "dot-movement", diff --git a/networks/movement/movement-full-node/Cargo.toml b/networks/movement/movement-full-node/Cargo.toml index f1731bbbb..2a1a69330 100644 --- a/networks/movement/movement-full-node/Cargo.toml +++ b/networks/movement/movement-full-node/Cargo.toml @@ -47,6 +47,7 @@ movement-signer = { workspace = true } movement-signer-loader = { workspace = true } syncador = { workspace = true } syncup = { workspace = true } +chrono = { workspace = true } [features] default = [] diff --git a/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs b/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs index 44750a028..4fba0f5c6 100644 --- a/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs +++ b/networks/movement/movement-full-node/src/da/stream_blocks/mod.rs @@ -38,13 +38,30 @@ impl StreamBlocks { .ok_or(anyhow::anyhow!("No blob type in response"))? { blob_response::BlobType::SequencedBlobBlock(blob) => { + tracing::info!("Receive SequencedBlobBlock blob"); (blob.data, blob.timestamp, blob.blob_id, blob.height) } + blob_response::BlobType::PassedThroughBlob(blob) => { + tracing::info!("Receive PassedThroughBlob blob"); + (blob.data, blob.timestamp, blob.blob_id, blob.height) + } + blob_response::BlobType::Heartbeat(_) => { + tracing::info!("Receive heartbeat blob"); + continue; + } _ => { anyhow::bail!("Invalid blob type in response") } }; - info!("{} {} {}", hex::encode(block_id), block_timestamp, da_height); + // pretty print (with labels) the block_id, block_timestamp, and da_height + tracing::info!( + "Block ID: {}, Block Timestamp: {:?}, DA Height: {}", + hex::encode(block_id), + // unix date string from the block timestamp which is in microseconds + chrono::DateTime::from_timestamp_micros(block_timestamp as i64) + .context("Failed to convert timestamp to date")?, + da_height + ); } info!("Finished streaming blocks from DA"); diff --git a/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs b/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs index 974a82d39..41b7acece 100644 --- a/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs +++ b/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs @@ -73,7 +73,11 @@ where let mut blocks_from_da = self .da_light_node_client .stream_read_from_height(StreamReadFromHeightRequest { height: synced_height }) - .await?; + .await + .map_err(|e| { + error!("Failed to stream blocks from DA: {:?}", e); + e + })?; loop { select! { @@ -112,6 +116,11 @@ where blob_response::BlobType::PassedThroughBlob(blob) => { (blob.data, blob.timestamp, blob.blob_id, blob.height) } + blob_response::BlobType::Heartbeat(_) => { + tracing::info!("Receive DA heartbeat"); + // Do nothing. + return Ok(()); + } _ => anyhow::bail!("Invalid blob type"), }; diff --git a/proto/movementlabs/protocol_units/da/light_node/v1beta2.proto b/proto/movementlabs/protocol_units/da/light_node/v1beta2.proto new file mode 100644 index 000000000..d17f7aca3 --- /dev/null +++ b/proto/movementlabs/protocol_units/da/light_node/v1beta2.proto @@ -0,0 +1,99 @@ +syntax = "proto3"; +package movementlabs.protocol_units.da.light_node.v1beta2; + + +// Request and response messages +message Blob { + bytes blob_id = 1; + bytes data = 2; + uint64 height = 3; + bytes signature = 4; + uint64 timestamp = 5; + bytes signer = 6; +} + +message BlobResponse { + oneof blob_type { + Blob passed_through_blob = 1; + Blob sequenced_blob_intent = 2; + Blob sequenced_blob_block = 3; + bool heartbeat = 4; + } +} + +message BlobWrite { + bytes data = 1; +} + +// StreamReadAtHeight +message StreamReadFromHeightRequest { + uint64 height = 1; +} + +message StreamReadFromHeightResponse { + BlobResponse blob = 1; +} + +// StreamReadLatest +message StreamReadLatestRequest { + +} + +message StreamReadLatestResponse { + BlobResponse blob = 1; +} + +// StreamWriteBlob +message StreamWriteBlobRequest { + BlobWrite blob = 1; +} + +message StreamWriteBlobResponse { + BlobResponse blob = 1; +} + +// ReadAtHeight +message ReadAtHeightRequest { + uint64 height = 1; +} + +message ReadAtHeightResponse { + repeated BlobResponse blobs = 1; +} + +// BatchRead +message BatchReadRequest { + repeated uint64 heights = 1; +} + +message BatchReadResponse { + repeated ReadAtHeightResponse responses = 1; +} + +message BatchWriteRequest { + repeated BlobWrite blobs = 1; +} + +message BatchWriteResponse { + repeated BlobResponse blobs = 1; +} + + + +// LightNode service definition +service LightNodeService { + // Stream blobs from a specified height or from the latest height. + rpc StreamReadFromHeight (StreamReadFromHeightRequest) returns (stream StreamReadFromHeightResponse); + rpc StreamReadLatest (StreamReadLatestRequest) returns (stream StreamReadLatestResponse); + + // Stream blobs out, either individually or in batches. + rpc StreamWriteBlob (stream StreamWriteBlobRequest) returns (stream StreamWriteBlobResponse); + + // Read blobs at a specified height. + rpc ReadAtHeight (ReadAtHeightRequest) returns (ReadAtHeightResponse); + + // Batch read and write operations for efficiency. + rpc BatchRead (BatchReadRequest) returns (BatchReadResponse); + rpc BatchWrite (BatchWriteRequest) returns (BatchWriteResponse); + +} \ No newline at end of file diff --git a/protocol-units/da/movement/celestia/light-node/src/v1/passthrough.rs b/protocol-units/da/movement/celestia/light-node/src/v1/passthrough.rs index b74178baf..3c31585b2 100644 --- a/protocol-units/da/movement/celestia/light-node/src/v1/passthrough.rs +++ b/protocol-units/da/movement/celestia/light-node/src/v1/passthrough.rs @@ -184,6 +184,7 @@ where verified_blobs.push(blob); } Err(e) => { + tracing::error!(error = %e, "ICI failed to verify blob"); error!(error = %e, "failed to verify blob"); } } @@ -265,7 +266,7 @@ where while let Some(blob) = blob_stream.next().await { - debug!("Stream got blob: {:?}", blob); + debug!("Back fetch Stream got blob: {:?}", blob); yield blob?; } diff --git a/protocol-units/da/movement/protocol/da/Cargo.toml b/protocol-units/da/movement/protocol/da/Cargo.toml index c1cbf9f9f..2c24ae8d1 100644 --- a/protocol-units/da/movement/protocol/da/Cargo.toml +++ b/protocol-units/da/movement/protocol/da/Cargo.toml @@ -23,6 +23,7 @@ async-stream = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } anyhow = { workspace = true } +hex = { workspace = true } [lints] workspace = true diff --git a/protocol-units/da/movement/protocol/da/src/lib.rs b/protocol-units/da/movement/protocol/da/src/lib.rs index a28cf7779..7176e21e6 100644 --- a/protocol-units/da/movement/protocol/da/src/lib.rs +++ b/protocol-units/da/movement/protocol/da/src/lib.rs @@ -54,7 +54,7 @@ pub enum DaError { /// Trait for DA operations. pub trait DaOperations: Send + Sync where - C: Curve + Send + Sync + 'static, + C: Curve + Send + Sync + 'static + std::fmt::Debug, { fn submit_blob( &self, diff --git a/protocol-units/da/movement/protocol/da/src/mock/mod.rs b/protocol-units/da/movement/protocol/da/src/mock/mod.rs index c5ed31dd9..41ebe307c 100644 --- a/protocol-units/da/movement/protocol/da/src/mock/mod.rs +++ b/protocol-units/da/movement/protocol/da/src/mock/mod.rs @@ -64,7 +64,7 @@ where impl DaOperations for Mock where - C: Curve + Send + Sync + 'static, + C: Curve + Send + Sync + 'static + std::fmt::Debug, { fn submit_blob( &self, diff --git a/protocol-units/da/movement/protocol/light-node/src/passthrough.rs b/protocol-units/da/movement/protocol/light-node/src/passthrough.rs index 47ec387fc..88af1b9b6 100644 --- a/protocol-units/da/movement/protocol/light-node/src/passthrough.rs +++ b/protocol-units/da/movement/protocol/light-node/src/passthrough.rs @@ -10,7 +10,7 @@ use movement_da_light_node_digest_store::da::Da as DigestStoreDa; use movement_da_light_node_proto::light_node_service_server::LightNodeService; use movement_da_light_node_proto::*; use movement_da_light_node_verifier::signed::InKnownSignersVerifier; -use movement_da_light_node_verifier::{Error as VerifierError, VerifierOperations}; +use movement_da_light_node_verifier::VerifierOperations; use movement_da_util::{ blob::ir::blob::DaBlob, blob::ir::data::InnerSignedBlobV1Data, config::Config, }; @@ -35,7 +35,8 @@ where + Serialize + for<'de> Deserialize<'de> + Clone - + 'static, + + 'static + + std::fmt::Debug, Da: DaOperations, V: VerifierOperations, DaBlob>, { @@ -56,7 +57,8 @@ where + Serialize + for<'de> Deserialize<'de> + Clone - + 'static, + + 'static + + std::fmt::Debug, Da: DaOperations, V: VerifierOperations, DaBlob>, { @@ -113,7 +115,8 @@ where + Serialize + for<'de> Deserialize<'de> + Clone - + 'static, + + 'static + + std::fmt::Debug, Da: DaOperations + 'static, V: VerifierOperations, DaBlob> + Send + Sync + 'static, { @@ -135,31 +138,51 @@ where let verifier = self.verifier.clone(); let height = request.into_inner().height; + // Tick interval for generating HeartBeat. + let mut tick_interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + let output = async_stream::try_stream! { let mut blob_stream = da.stream_da_blobs_from_height(height).await.map_err(|e| tonic::Status::internal(e.to_string()))?; - while let Some(blob) = blob_stream.next().await { - let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?; - match verifier.verify(da_blob, height.as_u64()).await { - Ok(verifed_blob) => { - let blob = verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?; - let response = StreamReadFromHeightResponse { - blob: Some(blob) - }; - yield response; - }, - Err(VerifierError::Validation(e)) => { - info!("Failed to verify blob: {}", e); - }, - Err(VerifierError::Internal(e)) => { - Err(tonic::Status::internal(e.to_string()))?; + loop { + let response_content = tokio::select! { + // Yield from the data stream + block_opt = blob_stream.next() => { + match block_opt { + Some(Ok((height, da_blob))) => { + match verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string())).and_then(|verifed_blob| { + verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string())) + }) { + Ok(blob) => blob, + Err(err) => { + // Not verified block, skip to next one. + tracing::warn!("Stream blob of height: {} fail to verify error:{err}", height.as_u64()); + continue; + } + } + } + Some(Err(err)) => { + tracing::warn!("Stream blob return an error, exit stream :{err}"); + return; + }, + None => { + tracing::warn!("Stream blob closed , exit stream."); + return; + } + } + } + // Yield the periodic tick + _ = tick_interval.tick() => { + //Heart beat. The value can be use to indicate some status. + BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::Heartbeat(true)) } } - } + }; + let response = StreamReadFromHeightResponse { + blob: Some(response_content) + }; + yield response; } - - info!("Stream read from height closed for height: {}", height); - }; Ok(tonic::Response::new(Box::pin(output) as Self::StreamReadFromHeightStream)) diff --git a/protocol-units/da/movement/protocol/light-node/src/sequencer.rs b/protocol-units/da/movement/protocol/light-node/src/sequencer.rs index 044ff1f8b..825dc8e2a 100644 --- a/protocol-units/da/movement/protocol/light-node/src/sequencer.rs +++ b/protocol-units/da/movement/protocol/light-node/src/sequencer.rs @@ -46,7 +46,8 @@ where + Serialize + for<'de> Deserialize<'de> + Clone - + 'static, + + 'static + + std::fmt::Debug, Da: DaOperations, V: VerifierOperations, DaBlob>, { @@ -66,7 +67,8 @@ where + Serialize + for<'de> Deserialize<'de> + Clone - + 'static, + + 'static + + std::fmt::Debug, Da: DaOperations, V: VerifierOperations, DaBlob>, { @@ -132,7 +134,8 @@ where + Serialize + for<'de> Deserialize<'de> + Clone - + 'static, + + 'static + + std::fmt::Debug, Da: DaOperations, V: VerifierOperations, DaBlob>, { @@ -307,7 +310,8 @@ where + Serialize + for<'de> Deserialize<'de> + Clone - + 'static, + + 'static + + std::fmt::Debug, Da: DaOperations + 'static, V: VerifierOperations, DaBlob> + Send + Sync + 'static, { diff --git a/protocol-units/da/movement/protocol/proto/build.rs b/protocol-units/da/movement/protocol/proto/build.rs index 0631f8820..ba94fa22e 100644 --- a/protocol-units/da/movement/protocol/proto/build.rs +++ b/protocol-units/da/movement/protocol/proto/build.rs @@ -1 +1,4 @@ -buildtime::proto_build_main!("movementlabs/protocol_units/da/light_node/v1beta1.proto"); +buildtime::proto_build_main!( + "movementlabs/protocol_units/da/light_node/v1beta1.proto", + "movementlabs/protocol_units/da/light_node/v1beta2.proto" +); diff --git a/protocol-units/da/movement/protocol/proto/src/lib.rs b/protocol-units/da/movement/protocol/proto/src/lib.rs index 4defff541..9ce09e331 100644 --- a/protocol-units/da/movement/protocol/proto/src/lib.rs +++ b/protocol-units/da/movement/protocol/proto/src/lib.rs @@ -1,8 +1,8 @@ -pub mod v1beta1 { - tonic::include_proto!("movementlabs.protocol_units.da.light_node.v1beta1"); // The string specified here +pub mod v1beta2 { + tonic::include_proto!("movementlabs.protocol_units.da.light_node.v1beta2"); // The string specified here pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("movement-da-light-node-proto-descriptor"); } // Re-export the latest version at the crate root -pub use v1beta1::*; +pub use v1beta2::*; diff --git a/protocol-units/da/movement/protocol/util/src/blob/ir/blob.rs b/protocol-units/da/movement/protocol/util/src/blob/ir/blob.rs index 76eba7246..fd81d9cec 100644 --- a/protocol-units/da/movement/protocol/util/src/blob/ir/blob.rs +++ b/protocol-units/da/movement/protocol/util/src/blob/ir/blob.rs @@ -66,7 +66,7 @@ where impl DaBlob where - C: Curve + Verify + Digester, + C: Curve, { pub fn blob(&self) -> &[u8] { match self { @@ -107,13 +107,6 @@ where } } - pub fn verify_signature(&self) -> Result<(), anyhow::Error> { - match self { - DaBlob::SignedV1(inner) => inner.try_verify(), - DaBlob::DigestV1(_) => Ok(()), - } - } - pub fn to_blob(self, height: u64) -> Result { Ok(Blob { data: self.blob().to_vec(), @@ -160,6 +153,18 @@ where } } +impl DaBlob +where + C: Curve + Verify + Digester, +{ + pub fn verify_signature(&self) -> Result<(), anyhow::Error> { + match self { + DaBlob::SignedV1(inner) => inner.try_verify(), + DaBlob::DigestV1(_) => Ok(()), + } + } +} + #[cfg(test)] pub mod test { diff --git a/protocol-units/da/movement/protocol/util/src/config/common.rs b/protocol-units/da/movement/protocol/util/src/config/common.rs index 17cf5acfb..8ec56bbc5 100644 --- a/protocol-units/da/movement/protocol/util/src/config/common.rs +++ b/protocol-units/da/movement/protocol/util/src/config/common.rs @@ -156,4 +156,4 @@ pub fn default_celestia_bridge_replace_args() -> Vec { env_default!(default_da_light_node_is_initial, "MOVEMENT_DA_LIGHT_NODE_IS_INITIAL", bool, true); // Whether to use http1 for Movement Light Node Connections -env_default!(default_movement_da_light_node_http1, "MOVEMENT_DA_LIGHT_NODE_HTTP1", bool, true); +env_default!(default_movement_da_light_node_http1, "MOVEMENT_DA_LIGHT_NODE_HTTP1", bool, false); diff --git a/protocol-units/da/movement/providers/celestia/src/da/mod.rs b/protocol-units/da/movement/providers/celestia/src/da/mod.rs index 84135c6e6..361df7691 100644 --- a/protocol-units/da/movement/providers/celestia/src/da/mod.rs +++ b/protocol-units/da/movement/providers/celestia/src/da/mod.rs @@ -53,7 +53,14 @@ where impl DaOperations for Da where - C: Curve + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static, + C: Curve + + Send + + Sync + + Clone + + Serialize + + for<'de> Deserialize<'de> + + 'static + + std::fmt::Debug, { fn submit_blob( &self, @@ -63,12 +70,12 @@ where // create the blob let celestia_blob = self .create_new_celestia_blob(data) - .map_err(|e| DaError::Internal("failed to create celestia blob".to_string()))?; + .map_err(|e| DaError::Internal(format!("failed to create celestia blob :{e}")))?; // submit the blob to the celestia node self.submit_celestia_blob(celestia_blob) .await - .map_err(|e| DaError::Internal("failed to submit celestia blob".to_string()))?; + .map_err(|e| DaError::Internal(format!("failed to submit celestia blob :{e}")))?; Ok(()) }) @@ -114,7 +121,7 @@ where let me = self.clone(); Box::pin(async move { let mut subscription = me.default_client.header_subscribe().await.map_err(|e| { - DaError::Certificate("failed to subscribe to headers".to_string().into()) + DaError::Certificate(format!("failed to subscribe to headers :{e}").into()) })?; let stream = async_stream::try_stream! { diff --git a/protocol-units/da/movement/providers/digest-store/src/da/mod.rs b/protocol-units/da/movement/providers/digest-store/src/da/mod.rs index d5b45900a..7db425085 100644 --- a/protocol-units/da/movement/providers/digest-store/src/da/mod.rs +++ b/protocol-units/da/movement/providers/digest-store/src/da/mod.rs @@ -13,7 +13,7 @@ use std::sync::Arc; #[derive(Clone)] pub struct Da where - C: Curve + Send + Sync + Clone + 'static, + C: Curve + Send + Sync + Clone + 'static + std::fmt::Debug, D: DaOperations, { /// The namespace on Celestia which the Da will use. @@ -26,7 +26,14 @@ where impl Da where - C: Curve + Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static, + C: Curve + + Send + + Sync + + Clone + + Serialize + + for<'de> Deserialize<'de> + + 'static + + std::fmt::Debug, D: DaOperations, { /// Creates a new Da instance with the provided Celestia namespace and RPC client. @@ -49,7 +56,8 @@ where + Send + Sync + Clone - + 'static, + + 'static + + std::fmt::Debug, D: DaOperations, { fn submit_blob( From 52b744ef287f6dee57ed98a17ccedc7689b05559 Mon Sep 17 00:00:00 2001 From: Liam Monninger <79056955+l-monninger@users.noreply.github.com> Date: Thu, 27 Feb 2025 05:12:09 -0800 Subject: [PATCH 4/4] Gas Upgrades Beta Fixes pt. 2 (#1070) Co-authored-by: Andy Golay Co-authored-by: musitdev Co-authored-by: Mikhail Zabaluev Co-authored-by: Richard Melkonian <35300528+0xmovses@users.noreply.github.com> Co-authored-by: Icarus131 Co-authored-by: primata Co-authored-by: Icarus131 Co-authored-by: Richard Melkonian Co-authored-by: Radu Popa --- .github/workflows/build-push-container.yml | 2 +- Cargo.lock | 1 + .../protocol/light-node/src/passthrough.rs | 1 - .../da/movement/protocol/util/Cargo.toml | 1 + .../protocol/util/src/blob/ir/data.rs | 37 ++++--------------- .../releases/biarritz-rc1/src/cached.rs | 2 +- .../src/background/transaction_pipe.rs | 9 ++++- 7 files changed, 18 insertions(+), 35 deletions(-) diff --git a/.github/workflows/build-push-container.yml b/.github/workflows/build-push-container.yml index b7b7cd04a..d32b06e65 100644 --- a/.github/workflows/build-push-container.yml +++ b/.github/workflows/build-push-container.yml @@ -21,7 +21,7 @@ jobs: strategy: matrix: architecture: [x86_64, arm64] - runs-on: ${{ matrix.architecture == 'x86_64' && 'buildjet-8vcpu-ubuntu-2204' || 'buildjet-8vcpu-ubuntu-2204-arm' }} + runs-on: ${{ matrix.architecture == 'x86_64' && 'buildjet-16vcpu-ubuntu-2204' || 'buildjet-16vcpu-ubuntu-2204-arm' }} steps: - name: Checkout repository uses: actions/checkout@v4 diff --git a/Cargo.lock b/Cargo.lock index 61d4445bd..38a44c855 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12055,6 +12055,7 @@ dependencies = [ "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", "celestia-rpc", "celestia-types", + "chrono", "dot-movement", "ecdsa 0.16.9", "godfig", diff --git a/protocol-units/da/movement/protocol/light-node/src/passthrough.rs b/protocol-units/da/movement/protocol/light-node/src/passthrough.rs index 88af1b9b6..241d9dfb6 100644 --- a/protocol-units/da/movement/protocol/light-node/src/passthrough.rs +++ b/protocol-units/da/movement/protocol/light-node/src/passthrough.rs @@ -234,7 +234,6 @@ where let blobs = request.into_inner().blobs; for data in blobs { let blob = InnerSignedBlobV1Data::now(data.data) - .map_err(|e| tonic::Status::internal(format!("Failed to create blob data: {}", e)))? .try_to_sign(&self.signer) .await .map_err(|e| tonic::Status::internal(format!("Failed to sign blob: {}", e)))?; diff --git a/protocol-units/da/movement/protocol/util/Cargo.toml b/protocol-units/da/movement/protocol/util/Cargo.toml index 3b2679889..bdd22513a 100644 --- a/protocol-units/da/movement/protocol/util/Cargo.toml +++ b/protocol-units/da/movement/protocol/util/Cargo.toml @@ -49,6 +49,7 @@ movement-da-light-node-signer = { workspace = true } movement-signer = { workspace = true } movement-signer-loader = { workspace = true } movement-types = { workspace = true } +chrono = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/protocol-units/da/movement/protocol/util/src/blob/ir/data.rs b/protocol-units/da/movement/protocol/util/src/blob/ir/data.rs index c6e9dca45..414a77e9b 100644 --- a/protocol-units/da/movement/protocol/util/src/blob/ir/data.rs +++ b/protocol-units/da/movement/protocol/util/src/blob/ir/data.rs @@ -27,11 +27,12 @@ where Self { blob, timestamp, __curve_marker: std::marker::PhantomData } } - pub fn now(blob: Vec) -> Result { - Ok(Self::new( - blob, - std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?.as_secs(), - )) + pub fn now(blob: Vec) -> Self { + // Define the block timestamp value. Aptos framework need a timestamp in micro sec. + // Changing this value will generate blocks that can't be executed. + let timestamp = chrono::Utc::now().timestamp_micros() as u64; + + Self::new(blob, timestamp) } /// Gets an owned copy of the bytes to be signed @@ -76,31 +77,7 @@ pub mod block { fn try_from(block: block::Block) -> Result { let blob = bcs::to_bytes(&block)?; - Self::now(blob) - } - } - - impl TryFrom for InnerSignedBlobV1Data - where - C: Curve + Verify + Digester, - { - type Error = anyhow::Error; - - fn try_from(id: block::Id) -> Result { - let blob = id.as_bytes().to_vec(); - Self::now(blob) - } - } - - impl TryFrom> for InnerSignedBlobV1Data - where - C: Curve + Verify + Digester, - { - type Error = anyhow::Error; - - fn try_from(ids: Vec) -> Result { - let blob = bcs::to_bytes(&ids)?; - Self::now(blob) + Ok(Self::now(blob)) } } } diff --git a/protocol-units/execution/maptos/framework/releases/biarritz-rc1/src/cached.rs b/protocol-units/execution/maptos/framework/releases/biarritz-rc1/src/cached.rs index 0a4ebb474..ad711be14 100644 --- a/protocol-units/execution/maptos/framework/releases/biarritz-rc1/src/cached.rs +++ b/protocol-units/execution/maptos/framework/releases/biarritz-rc1/src/cached.rs @@ -11,7 +11,7 @@ generate_gas_upgrade_module!(gas_upgrade, BiarritzRc1, { let mut gas_parameters = AptosGasParameters::initial(); gas_parameters.vm.txn.max_transaction_size_in_bytes = GasQuantity::new(100_000_000); gas_parameters.vm.txn.max_execution_gas = GasQuantity::new(10_000_000_000); - + gas_parameters.vm.txn.gas_unit_scaling_factor = GasQuantity::new(50_000); aptos_types::on_chain_config::GasScheduleV2 { feature_version: aptos_gas_schedule::LATEST_GAS_FEATURE_VERSION, entries: gas_parameters diff --git a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs index e34503b5d..875377486 100644 --- a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs +++ b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs @@ -102,9 +102,14 @@ impl TransactionPipe { } } - /// Pipes a batch of transactions from the mempool to the transaction channel. - /// todo: it may be wise to move the batching logic up a level to the consuming structs. + /// Performs a transaction read, mempool batch formation, and garbage collection. pub(crate) async fn tick(&mut self) -> Result<(), Error> { + self.receive_transaction_tick().await + } + + /// Receives a transaction and adds it to the mempool. + /// todo: it may be wise to move the batching logic up a level to the consuming structs. + pub(crate) async fn receive_transaction_tick(&mut self) -> Result<(), Error> { let next = self.mempool_client_receiver.next().await; if let Some(request) = next { match request {