Skip to content

Commit

Permalink
refactor(batcher): add user state & other code quality refactors (#1106)
Browse files Browse the repository at this point in the history
Co-authored-by: Urix <43704209+uri-99@users.noreply.github.com>
  • Loading branch information
entropidelic and uri-99 authored Oct 3, 2024
1 parent 34fb21e commit 7d04367
Show file tree
Hide file tree
Showing 12 changed files with 917 additions and 662 deletions.
184 changes: 135 additions & 49 deletions batcher/Cargo.lock

Large diffs are not rendered by default.

68 changes: 68 additions & 0 deletions batcher/aligned-batcher/src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::sync::Arc;

use aligned_sdk::{
communication::serialization::cbor_serialize,
core::types::{BatchInclusionData, ResponseMessage, VerificationCommitmentBatch},
};
use futures_util::{stream::SplitSink, SinkExt};
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
use log::{error, info};
use serde::Serialize;
use tokio::{net::TcpStream, sync::RwLock};
use tokio_tungstenite::{
tungstenite::{Error, Message},
WebSocketStream,
};

use crate::types::{batch_queue::BatchQueueEntry, errors::BatcherError};

pub(crate) type WsMessageSink = Arc<RwLock<SplitSink<WebSocketStream<TcpStream>, Message>>>;

pub(crate) async fn send_batch_inclusion_data_responses(
finalized_batch: Vec<BatchQueueEntry>,
batch_merkle_tree: &MerkleTree<VerificationCommitmentBatch>,
) -> Result<(), BatcherError> {
for (vd_batch_idx, entry) in finalized_batch.iter().enumerate() {
let batch_inclusion_data = BatchInclusionData::new(vd_batch_idx, batch_merkle_tree);
let response = ResponseMessage::BatchInclusionData(batch_inclusion_data);

let serialized_response = cbor_serialize(&response)
.map_err(|e| BatcherError::SerializationError(e.to_string()))?;

let Some(ws_sink) = entry.messaging_sink.as_ref() else {
return Err(BatcherError::WsSinkEmpty);
};

let sending_result = ws_sink
.write()
.await
.send(Message::binary(serialized_response))
.await;

match sending_result {
Err(Error::AlreadyClosed) => (),
Err(e) => error!("Error while sending batch inclusion data response: {}", e),
Ok(_) => (),
}

info!("Response sent");
}

Ok(())
}

pub(crate) async fn send_message<T: Serialize>(ws_conn_sink: WsMessageSink, message: T) {
match cbor_serialize(&message) {
Ok(serialized_response) => {
if let Err(err) = ws_conn_sink
.write()
.await
.send(Message::binary(serialized_response))
.await
{
error!("Error while sending message: {}", err)
}
}
Err(e) => error!("Error while serializing message: {}", e),
}
}
Loading

0 comments on commit 7d04367

Please sign in to comment.