Skip to content

Commit

Permalink
use a timer to generate heartbreak
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev committed Jan 22, 2025
1 parent 5f6de96 commit 94c8e06
Showing 1 changed file with 46 additions and 49 deletions.
95 changes: 46 additions & 49 deletions protocol-units/da/movement/protocol/da/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_stream::try_stream;
use movement_da_util::blob::ir::blob::DaBlob;
use std::future::Future;
use std::pin::Pin;
use tokio::time::{self, Duration};
use tokio_stream::{Stream, StreamExt};
use tracing::{info, warn};

Expand Down Expand Up @@ -110,63 +111,59 @@ pub trait DaOperations: Send + Sync {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height start");
let fut = async move {
let certificate_stream = self.stream_certificates().await?;
// Tick interval for generating HeartBeat.
let mut tick_interval = time::interval(Duration::from_secs(10));

let stream = try_stream! {
let mut last_height = start_height;
let mut certificate_stream = certificate_stream;

while let Some(certificate) = certificate_stream.next().await {

info!("certificate: {:?}", certificate);

match certificate {
Ok(Certificate::Height(height)) if height > last_height => {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got new accepted blob at height:{height}");
let blob_stream = self
.stream_da_blobs_between_heights(last_height, height)
.await?;
tokio::pin!(blob_stream);

while let Some(blob) = blob_stream.next().await {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height send new accepted blob at height:{height}");
yield blob?;
loop {
tokio::select! {
// Yield from the data stream
Some(certificate) = certificate_stream.next() => {
info!("certificate: {:?}", certificate);
match certificate {
Ok(Certificate::Height(height)) if height > last_height => {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got new accepted blob at height:{height}");
let blob_stream = self
.stream_da_blobs_between_heights(last_height, height)
.await.unwrap(); // TODO remove the unwrap()
tokio::pin!(blob_stream);

while let Some(blob) = blob_stream.next().await {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height send new accepted blob at height:{height}");
yield blob?;
}

last_height = height;
}
Ok(Certificate::Nolo) => {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got Certificate::Nolo");
// Ignore Nolo
}
// Warn log non-fatal certificate errors
Err(DaError::NonFatalCertificate(e)) => {
warn!("non-fatal certificate error: {}", e);
}
// Exit on all other errors
Err(e) => {
yield Err(e)?;
}
// If height is less than last height, ignore
_ => {
warn!("ignoring certificate");
}
}

last_height = height;
}
// Already executed Height are use to send Heartbeat.
Ok(Certificate::Height(height)) => {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got old height:{height}");
//old certificate, use to send Heartbeat block.
let blob_stream = self
.stream_da_blobs_between_heights(height, height)
.await?;
tokio::pin!(blob_stream);

while let Some(blob_res) = blob_stream.next().await {
let (_, blob) = blob_res?;
// Ack use heigth zero to identify heart beat block.
// Should be changed to a type.
let heart_blob = (DaHeight(0u64), blob);
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got old height:{height} send heartbeat.");
yield heart_blob;
}
}
Ok(Certificate::Nolo) => {
tracing::info!("TEST Da lib DaOperations stream_da_blobs_from_height got Certificate::Nolo");
// Ignore Nolo
}
// Warn log non-fatal certificate errors
Err(DaError::NonFatalCertificate(e)) => {
warn!("non-fatal certificate error: {}", e);
}
// Exit on all other errors
Err(e) => {
yield Err(e)?;

// Yield the periodic tick
_ = tick_interval.tick() => {
let heart_blob = (DaHeight(0u64), DaBlob::DigestV1(Vec::new()));
tracing::info!("TEST Da lib DaOperations stream_da_blobs_yield heartbeat.");
yield heart_blob;
}
// If height is less than last height, ignore
// _ => {
// warn!("ignoring certificate");
// }
}
}
};
Expand Down

0 comments on commit 94c8e06

Please sign in to comment.