Skip to content

Commit

Permalink
feat: use cbor instead of json serialization (#746)
Browse files Browse the repository at this point in the history
Co-authored-by: Mariano A. Nicolini <mariano.nicolini.91@gmail.com>
  • Loading branch information
taturosati and entropidelic authored Aug 21, 2024
1 parent 8a35c9b commit 39b08e4
Show file tree
Hide file tree
Showing 19 changed files with 253 additions and 58 deletions.
51 changes: 51 additions & 0 deletions batcher/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions batcher/aligned-batcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ halo2_backend = { git = "https://github.com/yetanotherco/yet-another-halo2-fork.
halo2_proofs = { git = "https://github.com/yetanotherco/yet-another-halo2-fork.git", rev = "a3a56819d9183ac0b11c8d0543c7673c4a4c71a6" }
bincode = "1.3.3"
aligned-sdk = { path = "../aligned-sdk" }
ciborium = "=0.2.2"
101 changes: 59 additions & 42 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
extern crate core;

use aligned_sdk::communication::serialization::{cbor_deserialize, cbor_serialize};
use aligned_sdk::eth::batcher_payment_service::SignatureData;
use config::NonPayingConfig;
use dotenv::dotenv;
Expand All @@ -24,7 +25,7 @@ use eth::{try_create_new_task, BatcherPaymentService, SignerMiddlewareT};
use ethers::prelude::{Middleware, Provider};
use ethers::providers::Ws;
use ethers::types::{Address, Signature, TransactionReceipt, U256};
use futures_util::stream::{self, SplitSink};
use futures_util::stream::SplitSink;
use futures_util::{future, SinkExt, StreamExt, TryStreamExt};
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
Expand Down Expand Up @@ -263,7 +264,7 @@ impl Batcher {
aligned_sdk::communication::protocol::EXPECTED_PROTOCOL_VERSION,
);

let serialized_protocol_version_msg = serde_json::to_vec(&protocol_version_msg)
let serialized_protocol_version_msg = cbor_serialize(&protocol_version_msg)
.expect("Could not serialize protocol version message");

outgoing
Expand All @@ -274,7 +275,7 @@ impl Batcher {
.expect("Could not send protocol version message");

match incoming
.try_filter(|msg| future::ready(msg.is_text()))
.try_filter(|msg| future::ready(msg.is_binary()))
.try_for_each(|msg| self.clone().handle_message(msg, outgoing.clone()))
.await
{
Expand All @@ -291,8 +292,7 @@ impl Batcher {
) -> Result<(), Error> {
// Deserialize verification data from message
let client_msg: ClientMessage =
serde_json::from_str(message.to_text().expect("Message is not text"))
.expect("Failed to deserialize task");
cbor_deserialize(message.into_data().as_slice()).expect("Failed to deserialize task");

info!(
"Received message with nonce: {}",
Expand Down Expand Up @@ -502,15 +502,35 @@ impl Batcher {
// Set the batch posting flag to true
*batch_posting = true;

let current_batch_size = serde_json::to_vec(&batch_verification_data).unwrap().len();
let current_batch_size = match cbor_serialize(&batch_verification_data) {
Ok(serialized) => serialized.len(),
Err(e) => {
error!(
"Failed to serialize verification data: {:?}, resetting batch state",
e
);
self.flush_queue_and_clear_nonce_cache().await;
return None;
}
};

// check if the current batch needs to be splitted into smaller batches
if current_batch_size > self.max_batch_size {
info!("Batch max size exceded. Splitting current batch...");
let mut acc_batch_size = 0;
let mut finalized_batch_idx = 0;
for (idx, (verification_data, _, _, _)) in batch_state.batch_queue.iter().enumerate() {
acc_batch_size += serde_json::to_vec(verification_data).unwrap().len();
acc_batch_size += match cbor_serialize(verification_data) {
Ok(serialized) => serialized.len(),
Err(e) => {
error!(
"Failed to serialize verification data: {:?}, resetting batch",
e
);
self.flush_queue_and_clear_nonce_cache().await;
return None;
}
};
if acc_batch_size > self.max_batch_size {
finalized_batch_idx = idx;
break;
Expand Down Expand Up @@ -552,8 +572,8 @@ impl Batcher {
.map(|vd| vd.verification_data.clone())
.collect();

let batch_bytes = serde_json::to_vec(batch_verification_data.as_slice())
.expect("Failed to serialize batch");
let batch_bytes = cbor_serialize(&batch_verification_data)
.map_err(|e| BatcherError::TaskCreationError(e.to_string()))?;

info!("Finalizing batch. Length: {}", finalized_batch.len());
let batch_data_comm: Vec<VerificationDataCommitment> = finalized_batch
Expand Down Expand Up @@ -615,9 +635,7 @@ impl Batcher {
return Err(e);
};

send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await;

Ok(())
send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await
}

async fn flush_queue_and_clear_nonce_cache(&self) {
Expand Down Expand Up @@ -896,44 +914,43 @@ impl Batcher {
async fn send_batch_inclusion_data_responses(
finalized_batch: BatchQueue,
batch_merkle_tree: &MerkleTree<VerificationCommitmentBatch>,
) {
stream::iter(finalized_batch.iter())
.enumerate()
.for_each(|(vd_batch_idx, (_, _, ws_sink, _))| async move {
let batch_inclusion_data = BatchInclusionData::new(vd_batch_idx, batch_merkle_tree);
let response = ResponseMessage::BatchInclusionData(batch_inclusion_data);
) -> Result<(), BatcherError> {
for (vd_batch_idx, (_, _, ws_sink, _)) in finalized_batch.iter().enumerate() {
let batch_inclusion_data = BatchInclusionData::new(vd_batch_idx, batch_merkle_tree);
let response = ResponseMessage::BatchInclusionData(batch_inclusion_data);

let serialized_response =
serde_json::to_vec(&response).expect("Could not serialize response");
let serialized_response = cbor_serialize(&response)
.map_err(|e| BatcherError::SerializationError(e.to_string()))?;

let sending_result = ws_sink
.write()
.await
.send(Message::binary(serialized_response))
.await;
let sending_result = ws_sink
.write()
.await
.send(Message::binary(serialized_response))
.await;

match sending_result {
Err(Error::AlreadyClosed) => (),
Err(e) => error!("Error while sending batch inclusion data response: {}", e),
Ok(_) => (),
}
match sending_result {
Err(Error::AlreadyClosed) => (),
Err(e) => error!("Error while sending batch inclusion data response: {}", e),
Ok(_) => (),
}

info!("Response sent");
}

info!("Response sent");
})
.await;
Ok(())
}

async fn send_message<T: Serialize>(
ws_conn_sink: Arc<RwLock<SplitSink<WebSocketStream<TcpStream>, Message>>>,
message: T,
) {
let serialized_response = serde_json::to_vec(&message).expect("Could not serialize response");

// Send error message
ws_conn_sink
.write()
.await
.send(Message::binary(serialized_response))
.await
.expect("Failed to send message");
match cbor_serialize(&message) {
Ok(serialized_response) => ws_conn_sink
.write()
.await
.send(Message::binary(serialized_response))
.await
.expect("Failed to send message"),
Err(e) => error!("Error while serializing message: {}", e),
}
}
4 changes: 4 additions & 0 deletions batcher/aligned-batcher/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub enum BatcherError {
ReceiptNotFoundError,
TransactionSendError,
MaxRetriesReachedError,
SerializationError(String),
}

impl From<tungstenite::Error> for BatcherError {
Expand Down Expand Up @@ -56,6 +57,9 @@ impl fmt::Debug for BatcherError {
"Maximum tries reached. Could not send createNewTask call"
)
}
BatcherError::SerializationError(e) => {
write!(f, "Serialization error: {}", e)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions batcher/aligned-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ serde = { version = "1.0.201", features = ["derive"] }
sha3 = { version = "0.10.8"}
url = "2.5.0"
hex = "0.4.3"
ciborium = "=0.2.2"
serde_repr = "0.1.19"
9 changes: 5 additions & 4 deletions batcher/aligned-sdk/src/communication/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures_util::future::Ready;
use futures_util::stream::{SplitSink, TryFilter};
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};

use crate::communication::serialization::{cbor_deserialize, cbor_serialize};
use crate::{
communication::batch::handle_batch_inclusion_data,
core::{
Expand Down Expand Up @@ -47,9 +48,9 @@ pub async fn send_messages(
nonce += U256::one();

let msg = ClientMessage::new(verification_data.clone(), wallet.clone());
let msg_str = serde_json::to_string(&msg).map_err(SubmitError::SerializationError)?;
let msg_bin = cbor_serialize(&msg).map_err(SubmitError::SerializationError)?;
ws_write
.send(Message::Text(msg_str.clone()))
.send(Message::Binary(msg_bin.clone()))
.await
.map_err(SubmitError::WebSocketConnectionError)?;

Expand All @@ -65,7 +66,7 @@ pub async fn send_messages(
}
};

let response_msg = serde_json::from_slice::<ValidityResponseMessage>(&msg.into_data())
let response_msg: ValidityResponseMessage = cbor_deserialize(msg.into_data().as_slice())
.map_err(SubmitError::SerializationError)?;

match response_msg {
Expand Down Expand Up @@ -154,7 +155,7 @@ async fn process_batch_inclusion_data(
*num_responses_lock += 1;

let data = msg.into_data();
match serde_json::from_slice::<ResponseMessage>(&data) {
match cbor_deserialize(data.as_slice()) {
Ok(ResponseMessage::BatchInclusionData(batch_inclusion_data)) => {
let _ = handle_batch_inclusion_data(
batch_inclusion_data,
Expand Down
6 changes: 4 additions & 2 deletions batcher/aligned-sdk/src/communication/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};

use crate::core::{errors::SubmitError, types::ResponseMessage};

pub const EXPECTED_PROTOCOL_VERSION: u16 = 2;
use super::serialization::cbor_deserialize;

pub const EXPECTED_PROTOCOL_VERSION: u16 = 3;

pub async fn check_protocol_version(
ws_read: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Result<(), SubmitError> {
if let Some(Ok(msg)) = ws_read.next().await {
match serde_json::from_slice::<ResponseMessage>(&msg.into_data()) {
match cbor_deserialize(msg.into_data().as_slice()) {
Ok(ResponseMessage::ProtocolVersion(protocol_version)) => {
if protocol_version > EXPECTED_PROTOCOL_VERSION {
return Err(SubmitError::ProtocolVersionMismatch {
Expand Down
22 changes: 22 additions & 0 deletions batcher/aligned-sdk/src/communication/serialization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::io::Read;

use serde::{de::DeserializeOwned, Serialize};

pub fn cbor_serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, SerializationError> {
let mut buf = Vec::new();
ciborium::into_writer(value, &mut buf).map_err(|_| SerializationError)?;
Ok(buf)
}

pub fn cbor_deserialize<R: Read, T: DeserializeOwned>(buf: R) -> Result<T, SerializationError> {
ciborium::from_reader(buf).map_err(|_| SerializationError)
}

#[derive(Debug)]
pub struct SerializationError;

impl std::fmt::Display for SerializationError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Serialization error")
}
}
Loading

0 comments on commit 39b08e4

Please sign in to comment.