Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Node] Correct DA stream timeout error. #1049

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions networks/movement/movement-full-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ movement-da-light-node-client = { workspace = true}
aptos-framework-elsa-to-biarritz-rc1-migration = { workspace = true }
movement-signer = { workspace = true }
movement-signer-loader = { workspace = true }
chrono = { workspace = true }

[features]
default = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,28 @@ impl StreamBlocks {
blob_response::BlobType::SequencedBlobBlock(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::PassedThroughBlob(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::HeartbeatBlob(_) => {
tracing::info!("Receive heartbeat blob");
continue;
}
_ => {
anyhow::bail!("Invalid blob type in response")
}
};
info!("{} {} {}", hex::encode(block_id), block_timestamp, da_height);
}

info!("Finished streaming blocks from DA");
// pretty print (with labels) the block_id, block_timestamp, and da_height
tracing::info!(
"Block ID: {}, Block Timestamp: {}, DA Height: {}",
hex::encode(block_id),
// unix date string from the block timestamp which is in microseconds
chrono::DateTime::from_timestamp((block_timestamp / 1_000_000) as i64, 0)
.context("Failed to convert timestamp to date")?,
da_height
);
}

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ where
blob_response::BlobType::PassedThroughBlob(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::HeartbeatBlob(_) => {
tracing::info!("Receive heartbeat blob");
// Do nothing.
return Ok(());
}
_ => anyhow::bail!("Invalid blob type"),
};

Expand Down
99 changes: 99 additions & 0 deletions proto/movementlabs/protocol_units/da/light_node/v1beta2.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
syntax = "proto3";
package movementlabs.protocol_units.da.light_node.v1beta2;


// Request and response messages
message Blob {
bytes blob_id = 1;
bytes data = 2;
uint64 height = 3;
bytes signature = 4;
uint64 timestamp = 5;
bytes signer = 6;
}

message BlobResponse {
oneof blob_type {
Blob passed_through_blob = 1;
Blob sequenced_blob_intent = 2;
Blob sequenced_blob_block = 3;
bool heartbeat_blob = 4;
}
}

message BlobWrite {
bytes data = 1;
}

// StreamReadAtHeight
message StreamReadFromHeightRequest {
uint64 height = 1;
}

message StreamReadFromHeightResponse {
BlobResponse blob = 1;
}

// StreamReadLatest
message StreamReadLatestRequest {

}

message StreamReadLatestResponse {
BlobResponse blob = 1;
}

// StreamWriteBlob
message StreamWriteBlobRequest {
BlobWrite blob = 1;
}

message StreamWriteBlobResponse {
BlobResponse blob = 1;
}

// ReadAtHeight
message ReadAtHeightRequest {
uint64 height = 1;
}

message ReadAtHeightResponse {
repeated BlobResponse blobs = 1;
}

// BatchRead
message BatchReadRequest {
repeated uint64 heights = 1;
}

message BatchReadResponse {
repeated ReadAtHeightResponse responses = 1;
}

message BatchWriteRequest {
repeated BlobWrite blobs = 1;
}

message BatchWriteResponse {
repeated BlobResponse blobs = 1;
}



// LightNode service definition
service LightNodeService {
// Stream blobs from a specified height or from the latest height.
rpc StreamReadFromHeight (StreamReadFromHeightRequest) returns (stream StreamReadFromHeightResponse);
rpc StreamReadLatest (StreamReadLatestRequest) returns (stream StreamReadLatestResponse);

// Stream blobs out, either individually or in batches.
rpc StreamWriteBlob (stream StreamWriteBlobRequest) returns (stream StreamWriteBlobResponse);

// Read blobs at a specified height.
rpc ReadAtHeight (ReadAtHeightRequest) returns (ReadAtHeightResponse);

// Batch read and write operations for efficiency.
rpc BatchRead (BatchReadRequest) returns (BatchReadResponse);
rpc BatchWrite (BatchWriteRequest) returns (BatchWriteResponse);

}
61 changes: 41 additions & 20 deletions protocol-units/da/movement/protocol/light-node/src/passthrough.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use tokio::time::{self, Duration};
use tokio_stream::{Stream, StreamExt};
use tracing::info;

Expand All @@ -10,7 +11,7 @@ use movement_da_light_node_digest_store::da::Da as DigestStoreDa;
use movement_da_light_node_proto::light_node_service_server::LightNodeService;
use movement_da_light_node_proto::*;
use movement_da_light_node_verifier::signed::InKnownSignersVerifier;
use movement_da_light_node_verifier::{Error as VerifierError, VerifierOperations};
use movement_da_light_node_verifier::VerifierOperations;
use movement_da_util::{
blob::ir::blob::DaBlob, blob::ir::data::InnerSignedBlobV1Data, config::Config,
};
Expand Down Expand Up @@ -135,31 +136,51 @@ where
let verifier = self.verifier.clone();
let height = request.into_inner().height;

// Tick interval for generating HeartBeat.
let mut tick_interval = time::interval(Duration::from_secs(10));

let output = async_stream::try_stream! {

let mut blob_stream = da.stream_da_blobs_from_height(height).await.map_err(|e| tonic::Status::internal(e.to_string()))?;

while let Some(blob) = blob_stream.next().await {
let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
match verifier.verify(da_blob, height.as_u64()).await {
Ok(verifed_blob) => {
let blob = verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?;
let response = StreamReadFromHeightResponse {
blob: Some(blob)
};
yield response;
},
Err(VerifierError::Validation(e)) => {
info!("Failed to verify blob: {}", e);
},
Err(VerifierError::Internal(e)) => {
Err(tonic::Status::internal(e.to_string()))?;
loop {
let response_content = tokio::select! {
// Yield from the data stream
block_opt = blob_stream.next() => {
match block_opt {
Some(Ok((height, da_blob))) => {
match verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string())).and_then(|verifed_blob| {
verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))
}) {
Ok(blob) => blob,
Err(err) => {
// Not verified block, skip to next one.
tracing::warn!("Stream blob of height: {} fail to verify error:{err}", height.as_u64());
continue;
}
}
}
Some(Err(err)) => {
tracing::warn!("Stream blob return an error, exit stream :{err}");
return;
},
None => {
info!("Stream blob closed , exit stream.");
return;
}
}
}
// Yield the periodic tick
_ = tick_interval.tick() => {
//Heart beat. The value can be use to indicate some status.
BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) }
}
}
};
let response = StreamReadFromHeightResponse {
blob: Some(response_content)
};
yield response;
}

info!("Stream read from height closed for height: {}", height);

};

Ok(tonic::Response::new(Box::pin(output) as Self::StreamReadFromHeightStream))
Expand Down
5 changes: 4 additions & 1 deletion protocol-units/da/movement/protocol/proto/build.rs
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
buildtime::proto_build_main!("movementlabs/protocol_units/da/light_node/v1beta1.proto");
buildtime::proto_build_main!(
"movementlabs/protocol_units/da/light_node/v1beta1.proto",
"movementlabs/protocol_units/da/light_node/v1beta2.proto"
);
6 changes: 3 additions & 3 deletions protocol-units/da/movement/protocol/proto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod v1beta1 {
tonic::include_proto!("movementlabs.protocol_units.da.light_node.v1beta1"); // The string specified here
pub mod v1beta2 {
tonic::include_proto!("movementlabs.protocol_units.da.light_node.v1beta2"); // The string specified here
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("movement-da-light-node-proto-descriptor");
}

// Re-export the latest version at the crate root
pub use v1beta1::*;
pub use v1beta2::*;
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,4 @@ pub fn default_celestia_bridge_replace_args() -> Vec<String> {
env_default!(default_da_light_node_is_initial, "MOVEMENT_DA_LIGHT_NODE_IS_INITIAL", bool, true);

// Whether to use http1 for Movement Light Node Connections
env_default!(default_movement_da_light_node_http1, "MOVEMENT_DA_LIGHT_NODE_HTTP1", bool, true);
env_default!(default_movement_da_light_node_http1, "MOVEMENT_DA_LIGHT_NODE_HTTP1", bool, false);
Loading