Skip to content

Commit

Permalink
Merge branch 'main' into mikhail/celestia-mainnet-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mzabaluev committed Feb 27, 2025
2 parents c2c40ad + 52b744e commit 04c5590
Show file tree
Hide file tree
Showing 26 changed files with 280 additions and 98 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions docker/build/movement-indexer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ RUN rust_binary="./target/release/movement-indexer-service"; dest_dir="/tmp/runt

FROM alpine:latest

# curl is needed for the health check
RUN apk add --no-cache curl

# Copy the build artifact from the builder stage
COPY --from=builder /tmp/build/target/release/movement-indexer-service /app/movement-indexer-service
COPY --from=builder /tmp/build/target/release/load_metadata /app/load_metadata
Expand Down
1 change: 1 addition & 0 deletions networks/movement/movement-full-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ movement-signer = { workspace = true }
movement-signer-loader = { workspace = true }
syncador = { workspace = true }
syncup = { workspace = true }
chrono = { workspace = true }

[features]
default = []
Expand Down
19 changes: 18 additions & 1 deletion networks/movement/movement-full-node/src/da/stream_blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,30 @@ impl StreamBlocks {
.ok_or(anyhow::anyhow!("No blob type in response"))?
{
blob_response::BlobType::SequencedBlobBlock(blob) => {
tracing::info!("Receive SequencedBlobBlock blob");
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::PassedThroughBlob(blob) => {
tracing::info!("Receive PassedThroughBlob blob");
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::Heartbeat(_) => {
tracing::info!("Receive heartbeat blob");
continue;
}
_ => {
anyhow::bail!("Invalid blob type in response")
}
};
info!("{} {} {}", hex::encode(block_id), block_timestamp, da_height);
// pretty print (with labels) the block_id, block_timestamp, and da_height
tracing::info!(
"Block ID: {}, Block Timestamp: {:?}, DA Height: {}",
hex::encode(block_id),
// unix date string from the block timestamp which is in microseconds
chrono::DateTime::from_timestamp_micros(block_timestamp as i64)
.context("Failed to convert timestamp to date")?,
da_height
);
}

info!("Finished streaming blocks from DA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ where
let mut blocks_from_da = self
.da_light_node_client
.stream_read_from_height(StreamReadFromHeightRequest { height: synced_height })
.await?;
.await
.map_err(|e| {
error!("Failed to stream blocks from DA: {:?}", e);
e
})?;

loop {
select! {
Expand Down Expand Up @@ -112,6 +116,11 @@ where
blob_response::BlobType::PassedThroughBlob(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::Heartbeat(_) => {
tracing::info!("Receive DA heartbeat");
// Do nothing.
return Ok(());
}
_ => anyhow::bail!("Invalid blob type"),
};

Expand Down
99 changes: 99 additions & 0 deletions proto/movementlabs/protocol_units/da/light_node/v1beta2.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
syntax = "proto3";
package movementlabs.protocol_units.da.light_node.v1beta2;


// Request and response messages
message Blob {
bytes blob_id = 1;
bytes data = 2;
uint64 height = 3;
bytes signature = 4;
uint64 timestamp = 5;
bytes signer = 6;
}

message BlobResponse {
oneof blob_type {
Blob passed_through_blob = 1;
Blob sequenced_blob_intent = 2;
Blob sequenced_blob_block = 3;
bool heartbeat = 4;
}
}

message BlobWrite {
bytes data = 1;
}

// StreamReadAtHeight
message StreamReadFromHeightRequest {
uint64 height = 1;
}

message StreamReadFromHeightResponse {
BlobResponse blob = 1;
}

// StreamReadLatest
message StreamReadLatestRequest {

}

message StreamReadLatestResponse {
BlobResponse blob = 1;
}

// StreamWriteBlob
message StreamWriteBlobRequest {
BlobWrite blob = 1;
}

message StreamWriteBlobResponse {
BlobResponse blob = 1;
}

// ReadAtHeight
message ReadAtHeightRequest {
uint64 height = 1;
}

message ReadAtHeightResponse {
repeated BlobResponse blobs = 1;
}

// BatchRead
message BatchReadRequest {
repeated uint64 heights = 1;
}

message BatchReadResponse {
repeated ReadAtHeightResponse responses = 1;
}

message BatchWriteRequest {
repeated BlobWrite blobs = 1;
}

message BatchWriteResponse {
repeated BlobResponse blobs = 1;
}



// LightNode service definition
service LightNodeService {
// Stream blobs from a specified height or from the latest height.
rpc StreamReadFromHeight (StreamReadFromHeightRequest) returns (stream StreamReadFromHeightResponse);
rpc StreamReadLatest (StreamReadLatestRequest) returns (stream StreamReadLatestResponse);

// Stream blobs out, either individually or in batches.
rpc StreamWriteBlob (stream StreamWriteBlobRequest) returns (stream StreamWriteBlobResponse);

// Read blobs at a specified height.
rpc ReadAtHeight (ReadAtHeightRequest) returns (ReadAtHeightResponse);

// Batch read and write operations for efficiency.
rpc BatchRead (BatchReadRequest) returns (BatchReadResponse);
rpc BatchWrite (BatchWriteRequest) returns (BatchWriteResponse);

}
1 change: 1 addition & 0 deletions protocol-units/da/movement/protocol/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tower = { workspace = true }
http-body-util = { workspace = true }
bytes = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }


[lints]
Expand Down
26 changes: 24 additions & 2 deletions protocol-units/da/movement/protocol/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,34 @@ pub enum MovementDaLightNodeClient {
impl MovementDaLightNodeClient {
/// Creates an http1 connection to the light node service.
pub fn try_http1(connection_string: &str) -> Result<Self, anyhow::Error> {
Ok(Self::Http1(http1::Http1::try_new(connection_string)?))
for _ in 0..5 {
match http1::Http1::try_new(connection_string) {
Ok(result) => return Ok(Self::Http1(result)),
Err(err) => {
tracing::warn!("DA Http1 connection failed: {}. Retrying in 5s...", err);
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
}
return Err(
anyhow::anyhow!("Error DA Http1 connection failed more than 5 time aborting.",),
);
}

/// Creates an http2 connection to the light node service.
pub async fn try_http2(connection_string: &str) -> Result<Self, anyhow::Error> {
Ok(Self::Http2(http2::Http2::connect(connection_string).await?))
for _ in 0..5 {
match http2::Http2::connect(connection_string).await {
Ok(result) => return Ok(Self::Http2(result)),
Err(err) => {
tracing::warn!("DA Http2 connection failed: {}. Retrying in 5s...", err);
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
}
return Err(
anyhow::anyhow!("Error DA Http2 connection failed more than 5 time aborting.",),
);
}

/// Stream reads from a given height.
Expand Down
1 change: 1 addition & 0 deletions protocol-units/da/movement/protocol/da/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async-stream = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
hex = { workspace = true }

[lints]
workspace = true
2 changes: 1 addition & 1 deletion protocol-units/da/movement/protocol/da/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub enum DaError {
/// Trait for DA operations.
pub trait DaOperations<C>: Send + Sync
where
C: Curve + Send + Sync + 'static,
C: Curve + Send + Sync + 'static + std::fmt::Debug,
{
fn submit_blob(
&self,
Expand Down
2 changes: 1 addition & 1 deletion protocol-units/da/movement/protocol/da/src/mock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where

impl<C> DaOperations<C> for Mock<C>
where
C: Curve + Send + Sync + 'static,
C: Curve + Send + Sync + 'static + std::fmt::Debug,
{
fn submit_blob(
&self,
Expand Down
70 changes: 46 additions & 24 deletions protocol-units/da/movement/protocol/light-node/src/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use movement_da_light_node_digest_store::da::Da as DigestStoreDa;
use movement_da_light_node_proto::light_node_service_server::LightNodeService;
use movement_da_light_node_proto::*;
use movement_da_light_node_verifier::signed::InKnownSignersVerifier;
use movement_da_light_node_verifier::{Error as VerifierError, VerifierOperations};
use movement_da_light_node_verifier::VerifierOperations;
use movement_da_util::{
blob::ir::blob::DaBlob, blob::ir::data::InnerSignedBlobV1Data, config::Config,
};
Expand All @@ -35,7 +35,8 @@ where
+ Serialize
+ for<'de> Deserialize<'de>
+ Clone
+ 'static,
+ 'static
+ std::fmt::Debug,
Da: DaOperations<C>,
V: VerifierOperations<DaBlob<C>, DaBlob<C>>,
{
Expand All @@ -56,7 +57,8 @@ where
+ Serialize
+ for<'de> Deserialize<'de>
+ Clone
+ 'static,
+ 'static
+ std::fmt::Debug,
Da: DaOperations<C>,
V: VerifierOperations<DaBlob<C>, DaBlob<C>>,
{
Expand Down Expand Up @@ -113,7 +115,8 @@ where
+ Serialize
+ for<'de> Deserialize<'de>
+ Clone
+ 'static,
+ 'static
+ std::fmt::Debug,
Da: DaOperations<C> + 'static,
V: VerifierOperations<DaBlob<C>, DaBlob<C>> + Send + Sync + 'static,
{
Expand All @@ -135,31 +138,51 @@ where
let verifier = self.verifier.clone();
let height = request.into_inner().height;

// Tick interval for generating HeartBeat.
let mut tick_interval = tokio::time::interval(tokio::time::Duration::from_secs(10));

let output = async_stream::try_stream! {

let mut blob_stream = da.stream_da_blobs_from_height(height).await.map_err(|e| tonic::Status::internal(e.to_string()))?;

while let Some(blob) = blob_stream.next().await {
let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
match verifier.verify(da_blob, height.as_u64()).await {
Ok(verifed_blob) => {
let blob = verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?;
let response = StreamReadFromHeightResponse {
blob: Some(blob)
};
yield response;
},
Err(VerifierError::Validation(e)) => {
info!("Failed to verify blob: {}", e);
},
Err(VerifierError::Internal(e)) => {
Err(tonic::Status::internal(e.to_string()))?;
loop {
let response_content = tokio::select! {
// Yield from the data stream
block_opt = blob_stream.next() => {
match block_opt {
Some(Ok((height, da_blob))) => {
match verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string())).and_then(|verifed_blob| {
verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))
}) {
Ok(blob) => blob,
Err(err) => {
// Not verified block, skip to next one.
tracing::warn!("Stream blob of height: {} fail to verify error:{err}", height.as_u64());
continue;
}
}
}
Some(Err(err)) => {
tracing::warn!("Stream blob return an error, exit stream :{err}");
return;
},
None => {
tracing::warn!("Stream blob closed , exit stream.");
return;
}
}
}
// Yield the periodic tick
_ = tick_interval.tick() => {
//Heart beat. The value can be use to indicate some status.
BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::Heartbeat(true)) }
}
}
};
let response = StreamReadFromHeightResponse {
blob: Some(response_content)
};
yield response;
}

info!("Stream read from height closed for height: {}", height);

};

Ok(tonic::Response::new(Box::pin(output) as Self::StreamReadFromHeightStream))
Expand Down Expand Up @@ -211,7 +234,6 @@ where
let blobs = request.into_inner().blobs;
for data in blobs {
let blob = InnerSignedBlobV1Data::now(data.data)
.map_err(|e| tonic::Status::internal(format!("Failed to create blob data: {}", e)))?
.try_to_sign(&self.signer)
.await
.map_err(|e| tonic::Status::internal(format!("Failed to sign blob: {}", e)))?;
Expand Down
Loading

0 comments on commit 04c5590

Please sign in to comment.