Skip to content

Commit

Permalink
add DA heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev committed Feb 12, 2025
1 parent 64f3bfa commit 0ac1296
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions networks/movement/movement-full-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ movement-da-light-node-client = { workspace = true}
aptos-framework-elsa-to-biarritz-rc1-migration = { workspace = true }
movement-signer = { workspace = true }
movement-signer-loader = { workspace = true }
chrono = { workspace = true }

[features]
default = []
Expand Down
20 changes: 17 additions & 3 deletions networks/movement/movement-full-node/src/da/stream_blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,28 @@ impl StreamBlocks {
blob_response::BlobType::SequencedBlobBlock(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::PassedThroughBlob(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::HeartbeatBlob(_) => {
tracing::info!("Receive heartbeat blob");
continue;
}
_ => {
anyhow::bail!("Invalid blob type in response")
}
};
info!("{} {} {}", hex::encode(block_id), block_timestamp, da_height);
}

info!("Finished streaming blocks from DA");
// pretty print (with labels) the block_id, block_timestamp, and da_height
tracing::info!(
"Block ID: {}, Block Timestamp: {}, DA Height: {}",
hex::encode(block_id),
// unix date string from the block timestamp which is in microseconds
chrono::DateTime::from_timestamp((block_timestamp / 1_000_000) as i64, 0)
.context("Failed to convert timestamp to date")?,
da_height
);
}

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ where
blob_response::BlobType::PassedThroughBlob(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::HeartbeatBlob(_) => {
tracing::info!("Receive heartbeat blob");
// Do nothing.
return Ok(());
}
_ => anyhow::bail!("Invalid blob type"),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,5 @@ processes:
condition: process_healthy
movement-faucet:
condition: process_healthy
availability:
exit_on_end: true


99 changes: 99 additions & 0 deletions proto/movementlabs/protocol_units/da/light_node/v1beta2.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
syntax = "proto3";
package movementlabs.protocol_units.da.light_node.v1beta2;


// Request and response messages
message Blob {
bytes blob_id = 1;
bytes data = 2;
uint64 height = 3;
bytes signature = 4;
uint64 timestamp = 5;
bytes signer = 6;
}

message BlobResponse {
oneof blob_type {
Blob passed_through_blob = 1;
Blob sequenced_blob_intent = 2;
Blob sequenced_blob_block = 3;
bool heartbeat_blob = 4;
}
}

message BlobWrite {
bytes data = 1;
}

// StreamReadAtHeight
message StreamReadFromHeightRequest {
uint64 height = 1;
}

message StreamReadFromHeightResponse {
BlobResponse blob = 1;
}

// StreamReadLatest
message StreamReadLatestRequest {

}

message StreamReadLatestResponse {
BlobResponse blob = 1;
}

// StreamWriteBlob
message StreamWriteBlobRequest {
BlobWrite blob = 1;
}

message StreamWriteBlobResponse {
BlobResponse blob = 1;
}

// ReadAtHeight
message ReadAtHeightRequest {
uint64 height = 1;
}

message ReadAtHeightResponse {
repeated BlobResponse blobs = 1;
}

// BatchRead
message BatchReadRequest {
repeated uint64 heights = 1;
}

message BatchReadResponse {
repeated ReadAtHeightResponse responses = 1;
}

message BatchWriteRequest {
repeated BlobWrite blobs = 1;
}

message BatchWriteResponse {
repeated BlobResponse blobs = 1;
}



// LightNode service definition
service LightNodeService {
// Stream blobs from a specified height or from the latest height.
rpc StreamReadFromHeight (StreamReadFromHeightRequest) returns (stream StreamReadFromHeightResponse);
rpc StreamReadLatest (StreamReadLatestRequest) returns (stream StreamReadLatestResponse);

// Stream blobs out, either individually or in batches.
rpc StreamWriteBlob (stream StreamWriteBlobRequest) returns (stream StreamWriteBlobResponse);

// Read blobs at a specified height.
rpc ReadAtHeight (ReadAtHeightRequest) returns (ReadAtHeightResponse);

// Batch read and write operations for efficiency.
rpc BatchRead (BatchReadRequest) returns (BatchReadResponse);
rpc BatchWrite (BatchWriteRequest) returns (BatchWriteResponse);

}
62 changes: 42 additions & 20 deletions protocol-units/da/movement/protocol/light-node/src/passthrough.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use tokio::time::{self, Duration};
use tokio_stream::{Stream, StreamExt};
use tracing::info;

Expand All @@ -10,7 +11,7 @@ use movement_da_light_node_digest_store::da::Da as DigestStoreDa;
use movement_da_light_node_proto::light_node_service_server::LightNodeService;
use movement_da_light_node_proto::*;
use movement_da_light_node_verifier::signed::InKnownSignersVerifier;
use movement_da_light_node_verifier::{Error as VerifierError, VerifierOperations};
use movement_da_light_node_verifier::VerifierOperations;
use movement_da_util::{
blob::ir::blob::DaBlob, blob::ir::data::InnerSignedBlobV1Data, config::Config,
};
Expand Down Expand Up @@ -135,31 +136,52 @@ where
let verifier = self.verifier.clone();
let height = request.into_inner().height;

// Tick interval for generating HeartBeat.
let mut tick_interval = time::interval(Duration::from_secs(10));

let output = async_stream::try_stream! {

let mut blob_stream = da.stream_da_blobs_from_height(height).await.map_err(|e| tonic::Status::internal(e.to_string()))?;

while let Some(blob) = blob_stream.next().await {
let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
match verifier.verify(da_blob, height.as_u64()).await {
Ok(verifed_blob) => {
let blob = verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?;
let response = StreamReadFromHeightResponse {
blob: Some(blob)
};
yield response;
},
Err(VerifierError::Validation(e)) => {
info!("Failed to verify blob: {}", e);
},
Err(VerifierError::Internal(e)) => {
Err(tonic::Status::internal(e.to_string()))?;
loop {
let response_content = tokio::select! {
// Yield from the data stream
block_opt = blob_stream.next() => {
match block_opt {
Some(Ok((height, da_blob))) => {
//let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
match verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string())).and_then(|verifed_blob| {
verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))
}) {
Ok(blob) => blob,
Err(err) => {
// Not verified block, skip to next one.
tracing::warn!("Stream blob of height: {} fail to verify error:{err}", height.as_u64());
continue;
}
}
}
Some(Err(err)) => {
tracing::warn!("Stream blob return an error, exit stream :{err}");
return;
},
None => {
info!("Stream blob closed , exit stream.");
return;
}
}
}
// Yield the periodic tick
_ = tick_interval.tick() => {
//Heart beat. The value can be use to indicate some status.
BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) }
}
}
};
let response = StreamReadFromHeightResponse {
blob: Some(response_content)
};
yield response;
}

info!("Stream read from height closed for height: {}", height);

};

Ok(tonic::Response::new(Box::pin(output) as Self::StreamReadFromHeightStream))
Expand Down
5 changes: 4 additions & 1 deletion protocol-units/da/movement/protocol/proto/build.rs
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
buildtime::proto_build_main!("movementlabs/protocol_units/da/light_node/v1beta1.proto");
buildtime::proto_build_main!(
"movementlabs/protocol_units/da/light_node/v1beta1.proto",
"movementlabs/protocol_units/da/light_node/v1beta2.proto"
);
15 changes: 12 additions & 3 deletions protocol-units/da/movement/protocol/proto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
pub mod v1beta1 {
tonic::include_proto!("movementlabs.protocol_units.da.light_node.v1beta1"); // The string specified here
// pub mod v1beta1 {
// tonic::include_proto!("movementlabs.protocol_units.da.light_node.v1beta1"); // The string specified here
// pub const FILE_DESCRIPTOR_SET: &[u8] =
// tonic::include_file_descriptor_set!("movement-da-light-node-proto-descriptor");
// }

// // Re-export the latest version at the crate root
// pub use v1beta1::*;

pub mod v1beta2 {
tonic::include_proto!("movementlabs.protocol_units.da.light_node.v1beta2"); // The string specified here
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("movement-da-light-node-proto-descriptor");
}

// Re-export the latest version at the crate root
pub use v1beta1::*;
pub use v1beta2::*;
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,4 @@ pub fn default_celestia_bridge_replace_args() -> Vec<String> {
env_default!(default_da_light_node_is_initial, "MOVEMENT_DA_LIGHT_NODE_IS_INITIAL", bool, true);

// Whether to use http1 for Movement Light Node Connections
env_default!(default_movement_da_light_node_http1, "MOVEMENT_DA_LIGHT_NODE_HTTP1", bool, true);
env_default!(default_movement_da_light_node_http1, "MOVEMENT_DA_LIGHT_NODE_HTTP1", bool, false);

0 comments on commit 0ac1296

Please sign in to comment.