Skip to content

Commit

Permalink
move heartbeat in the grpc send side
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev committed Jan 22, 2025
1 parent 94c8e06 commit f8594da
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ processes:
- "ETH_WS_CONNECTION_HOSTNAME=0.0.0.0"
- "ETH_WS_CONNECTION_PORT=8090"
- "MAYBE_RUN_LOCAL=true"
- "MOVEMENT_DA_LIGHT_NODE_HTTP1=true"
- "MOVEMENT_DA_LIGHT_NODE_HTTP1=false"
command: |
export AWS_REGION=us-west-2
export MOVEMENT_SYNC="leader::follower-test-$MOVEMENT_SHARED_RANDOM_1<=>{default_signer_address_whitelist,maptos,maptos-storage,movement-da-db}/**"
Expand All @@ -35,7 +35,7 @@ processes:
- "ETH_WS_CONNECTION_HOSTNAME=0.0.0.0"
- "ETH_WS_CONNECTION_PORT=8090"
- "MAYBE_RUN_LOCAL=true"
- "MOVEMENT_DA_LIGHT_NODE_HTTP1=true"
- "MOVEMENT_DA_LIGHT_NODE_HTTP1=false"
command: |
sleep 30
export AWS_REGION=us-west-2
Expand Down
74 changes: 28 additions & 46 deletions protocol-units/da/movement/protocol/da/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use async_stream::try_stream;
use movement_da_util::blob::ir::blob::DaBlob;
use std::future::Future;
use std::pin::Pin;
use tokio::time::{self, Duration};
use tokio_stream::{Stream, StreamExt};
use tracing::{info, warn};

Expand Down Expand Up @@ -111,58 +110,41 @@ pub trait DaOperations: Send + Sync {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height start");
let fut = async move {
let certificate_stream = self.stream_certificates().await?;
// Tick interval for generating HeartBeat.
let mut tick_interval = time::interval(Duration::from_secs(10));

let stream = try_stream! {
let mut last_height = start_height;
let mut certificate_stream = certificate_stream;

while let Some(certificate) = certificate_stream.next().await {

info!("certificate: {:?}", certificate);

loop {
tokio::select! {
// Yield from the data stream
Some(certificate) = certificate_stream.next() => {
info!("certificate: {:?}", certificate);
match certificate {
Ok(Certificate::Height(height)) if height > last_height => {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got new accepted blob at height:{height}");
let blob_stream = self
.stream_da_blobs_between_heights(last_height, height)
.await.unwrap(); // TODO remove the unwrap()
tokio::pin!(blob_stream);

while let Some(blob) = blob_stream.next().await {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height send new accepted blob at height:{height}");
yield blob?;
}

last_height = height;
}
Ok(Certificate::Nolo) => {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got Certificate::Nolo");
// Ignore Nolo
}
// Warn log non-fatal certificate errors
Err(DaError::NonFatalCertificate(e)) => {
warn!("non-fatal certificate error: {}", e);
}
// Exit on all other errors
Err(e) => {
yield Err(e)?;
}
// If height is less than last height, ignore
_ => {
warn!("ignoring certificate");
}
match certificate {
Ok(Certificate::Height(height)) if height > last_height => {
let blob_stream = self
.stream_da_blobs_between_heights(last_height, height)
.await?;
tokio::pin!(blob_stream);

while let Some(blob) = blob_stream.next().await {
yield blob?;
}
}

// Yield the periodic tick
_ = tick_interval.tick() => {
let heart_blob = (DaHeight(0u64), DaBlob::DigestV1(Vec::new()));
tracing::info!("TEST Da lib DaOperations stream_da_blobs_yield heartbeat.");
yield heart_blob;
last_height = height;
}
Ok(Certificate::Nolo) => {
// Ignore Nolo
}
// Warn log non-fatal certificate errors
Err(DaError::NonFatalCertificate(e)) => {
warn!("non-fatal certificate error: {}", e);
}
// Exit on all other errors
Err(e) => {
yield Err(e)?;
}
// If height is less than last height, ignore
_ => {
warn!("ignoring certificate");
}
}
}
Expand Down
61 changes: 44 additions & 17 deletions protocol-units/da/movement/protocol/light-node/src/passthrough.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use tokio::time::{self, Duration};
use tokio_stream::{Stream, StreamExt};
use tracing::info;

Expand Down Expand Up @@ -127,27 +128,53 @@ where
let verifier = self.verifier.clone();
let height = request.into_inner().height;

// Tick interval for generating HeartBeat.
let mut tick_interval = time::interval(Duration::from_secs(10));

let output = async_stream::try_stream! {

let mut blob_stream = da.stream_da_blobs_from_height(height).await.map_err(|e| tonic::Status::internal(e.to_string()))?;

while let Some(blob) = blob_stream.next().await {
let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
let response_content = if height.as_u64() == 0 {
//Heart beat. The value can be use to indicate some status.
BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) }
} else {
let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string()))?;
verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?
};
let response = StreamReadFromHeightResponse {
blob: Some(response_content)
};
tracing::info!("TEST passthrough LightNode stream_read_from_height send blob");
yield response;
}

info!("Stream read from height closed for height: {}", height);
loop {
let response_content = tokio::select! {
// Yield from the data stream
Some(blob) = blob_stream.next() => {
let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string())).unwrap(); //TODO remove unwrap
let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string())).unwrap(); //TODO remove unwrap;
tracing::info!("TEST passthrough LightNode stream_read_from_height send blob");
verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string())).unwrap() //TODO remove unwrap;
}
// Yield the periodic tick
_ = tick_interval.tick() => {
//Heart beat. The value can be use to indicate some status.
tracing::info!("TEST Da lib DaOperations stream_da_blobs_yield heartbeat.");
BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) }
}
};
let response = StreamReadFromHeightResponse {
blob: Some(response_content)
};
yield response;
}


// while let Some(blob) = blob_stream.next().await {
// let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
// let response_content = if height.as_u64() == 0 {
// //Heart beat. The value can be use to indicate some status.
// BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) }
// } else {
// let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string()))?;
// verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?
// };
// let response = StreamReadFromHeightResponse {
// blob: Some(response_content)
// };
// tracing::info!("TEST passthrough LightNode stream_read_from_height send blob");
// yield response;
// }

// info!("Stream read from height closed for height: {}", height);

};

Expand Down

0 comments on commit f8594da

Please sign in to comment.