diff --git a/process-compose/movement-full-node/process-compose.test-followers.yml b/process-compose/movement-full-node/process-compose.test-followers.yml index f081d8969..8e0b4033c 100644 --- a/process-compose/movement-full-node/process-compose.test-followers.yml +++ b/process-compose/movement-full-node/process-compose.test-followers.yml @@ -11,7 +11,7 @@ processes: - "ETH_WS_CONNECTION_HOSTNAME=0.0.0.0" - "ETH_WS_CONNECTION_PORT=8090" - "MAYBE_RUN_LOCAL=true" - - "MOVEMENT_DA_LIGHT_NODE_HTTP1=true" + - "MOVEMENT_DA_LIGHT_NODE_HTTP1=false" command: | export AWS_REGION=us-west-2 export MOVEMENT_SYNC="leader::follower-test-$MOVEMENT_SHARED_RANDOM_1<=>{default_signer_address_whitelist,maptos,maptos-storage,movement-da-db}/**" @@ -35,7 +35,7 @@ processes: - "ETH_WS_CONNECTION_HOSTNAME=0.0.0.0" - "ETH_WS_CONNECTION_PORT=8090" - "MAYBE_RUN_LOCAL=true" - - "MOVEMENT_DA_LIGHT_NODE_HTTP1=true" + - "MOVEMENT_DA_LIGHT_NODE_HTTP1=false" command: | sleep 30 export AWS_REGION=us-west-2 diff --git a/protocol-units/da/movement/protocol/da/src/lib.rs b/protocol-units/da/movement/protocol/da/src/lib.rs index 93009f0c2..ea0003002 100644 --- a/protocol-units/da/movement/protocol/da/src/lib.rs +++ b/protocol-units/da/movement/protocol/da/src/lib.rs @@ -4,7 +4,6 @@ use async_stream::try_stream; use movement_da_util::blob::ir::blob::DaBlob; use std::future::Future; use std::pin::Pin; -use tokio::time::{self, Duration}; use tokio_stream::{Stream, StreamExt}; use tracing::{info, warn}; @@ -111,58 +110,41 @@ pub trait DaOperations: Send + Sync { tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height start"); let fut = async move { let certificate_stream = self.stream_certificates().await?; - // Tick interval for generating HeartBeat. - let mut tick_interval = time::interval(Duration::from_secs(10)); - let stream = try_stream! { let mut last_height = start_height; let mut certificate_stream = certificate_stream; + while let Some(certificate) = certificate_stream.next().await { + + info!("certificate: {:?}", certificate); - loop { - tokio::select! { - // Yield from the data stream - Some(certificate) = certificate_stream.next() => { - info!("certificate: {:?}", certificate); - match certificate { - Ok(Certificate::Height(height)) if height > last_height => { - tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got new accepted blob at height:{height}"); - let blob_stream = self - .stream_da_blobs_between_heights(last_height, height) - .await.unwrap(); // TODO remove the unwrap() - tokio::pin!(blob_stream); - - while let Some(blob) = blob_stream.next().await { - tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height send new accepted blob at height:{height}"); - yield blob?; - } - - last_height = height; - } - Ok(Certificate::Nolo) => { - tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got Certificate::Nolo"); - // Ignore Nolo - } - // Warn log non-fatal certificate errors - Err(DaError::NonFatalCertificate(e)) => { - warn!("non-fatal certificate error: {}", e); - } - // Exit on all other errors - Err(e) => { - yield Err(e)?; - } - // If height is less than last height, ignore - _ => { - warn!("ignoring certificate"); - } + match certificate { + Ok(Certificate::Height(height)) if height > last_height => { + let blob_stream = self + .stream_da_blobs_between_heights(last_height, height) + .await?; + tokio::pin!(blob_stream); + + while let Some(blob) = blob_stream.next().await { + yield blob?; } - } - // Yield the periodic tick - _ = tick_interval.tick() => { - let heart_blob = (DaHeight(0u64), DaBlob::DigestV1(Vec::new())); - tracing::info!("TEST Da lib DaOperations stream_da_blobs_yield heartbeat."); - yield heart_blob; + last_height = height; + } + Ok(Certificate::Nolo) => { + // Ignore Nolo + } + // Warn log non-fatal certificate errors + Err(DaError::NonFatalCertificate(e)) => { + warn!("non-fatal certificate error: {}", e); + } + // Exit on all other errors + Err(e) => { + yield Err(e)?; + } + // If height is less than last height, ignore + _ => { + warn!("ignoring certificate"); } } } diff --git a/protocol-units/da/movement/protocol/light-node/src/passthrough.rs b/protocol-units/da/movement/protocol/light-node/src/passthrough.rs index a6f32a6ce..19df26296 100644 --- a/protocol-units/da/movement/protocol/light-node/src/passthrough.rs +++ b/protocol-units/da/movement/protocol/light-node/src/passthrough.rs @@ -1,5 +1,6 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; +use tokio::time::{self, Duration}; use tokio_stream::{Stream, StreamExt}; use tracing::info; @@ -127,27 +128,53 @@ where let verifier = self.verifier.clone(); let height = request.into_inner().height; + // Tick interval for generating HeartBeat. + let mut tick_interval = time::interval(Duration::from_secs(10)); + let output = async_stream::try_stream! { let mut blob_stream = da.stream_da_blobs_from_height(height).await.map_err(|e| tonic::Status::internal(e.to_string()))?; - while let Some(blob) = blob_stream.next().await { - let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?; - let response_content = if height.as_u64() == 0 { - //Heart beat. The value can be use to indicate some status. - BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) } - } else { - let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string()))?; - verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))? - }; - let response = StreamReadFromHeightResponse { - blob: Some(response_content) - }; - tracing::info!("TEST passthrough LightNode stream_read_from_height send blob"); - yield response; - } - - info!("Stream read from height closed for height: {}", height); + loop { + let response_content = tokio::select! { + // Yield from the data stream + Some(blob) = blob_stream.next() => { + let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string())).unwrap(); //TODO remove unwrap + let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string())).unwrap(); //TODO remove unwrap; + tracing::info!("TEST passthrough LightNode stream_read_from_height send blob"); + verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string())).unwrap() //TODO remove unwrap; + } + // Yield the periodic tick + _ = tick_interval.tick() => { + //Heart beat. The value can be use to indicate some status. + tracing::info!("TEST Da lib DaOperations stream_da_blobs_yield heartbeat."); + BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) } + } + }; + let response = StreamReadFromHeightResponse { + blob: Some(response_content) + }; + yield response; + } + + + // while let Some(blob) = blob_stream.next().await { + // let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?; + // let response_content = if height.as_u64() == 0 { + // //Heart beat. The value can be use to indicate some status. + // BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::HeartbeatBlob(true)) } + // } else { + // let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string()))?; + // verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))? + // }; + // let response = StreamReadFromHeightResponse { + // blob: Some(response_content) + // }; + // tracing::info!("TEST passthrough LightNode stream_read_from_height send blob"); + // yield response; + // } + + // info!("Stream read from height closed for height: {}", height); };