Skip to content

Commit

Permalink
Implement request/response protocol (#2688)
Browse files Browse the repository at this point in the history
* implement r/r protocol

* cargo sort

* cargo sort _again_

* update lock, finish merge

* Update sequencer/src/external_event_handler.rs

Co-authored-by: Mathis <sveitser@gmail.com>

* PR comments

* PR comments pt2

* lints

---------

Co-authored-by: Mathis <sveitser@gmail.com>
  • Loading branch information
rob-maron and sveitser authored Mar 6, 2025
1 parent 1ed39cc commit a2f0fc7
Show file tree
Hide file tree
Showing 15 changed files with 666 additions and 370 deletions.
456 changes: 236 additions & 220 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ async-trait = { workspace = true }
bincode = { workspace = true }
blake3 = { workspace = true }
byteorder = { version = "1", default-features = false }
derive_builder = { workspace = true }
derive_more = { workspace = true }
hotshot-types = { workspace = true }
parking_lot = { workspace = true }
Expand Down
85 changes: 51 additions & 34 deletions request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::{

use anyhow::{anyhow, Context, Result};
use data_source::DataSource;
use derive_builder::Builder;
use derive_more::derive::Deref;
use hotshot_types::traits::signature_key::SignatureKey;
use message::{Message, RequestMessage, ResponseMessage};
Expand Down Expand Up @@ -87,39 +86,39 @@ pub trait Serializable: Sized {
}

/// The underlying configuration for the request-response protocol
#[derive(Clone, Builder)]
#[derive(Clone)]
pub struct RequestResponseConfig {
/// The timeout for incoming requests. Do not respond to a request after this threshold
/// has passed.
incoming_request_ttl: Duration,
pub incoming_request_ttl: Duration,
/// The maximum amount of time we will spend trying to both derive a response for a request and
/// send the response over the wire.
response_send_timeout: Duration,
pub response_send_timeout: Duration,
/// The maximum amount of time we will spend trying to validate a response. This is used to prevent
/// an attack where a malicious participant sends us a bunch of requests that take a long time to
/// validate.
response_validate_timeout: Duration,
pub response_validate_timeout: Duration,
/// The batch size for outgoing requests. This is the number of request messages that we will
/// send out at a time for a single request before waiting for the [`request_batch_interval`].
request_batch_size: usize,
pub request_batch_size: usize,
/// The time to wait (per request) between sending out batches of request messages
request_batch_interval: Duration,
pub request_batch_interval: Duration,
/// The maximum (global) number of outgoing responses that can be in flight at any given time
max_outgoing_responses: usize,
pub max_outgoing_responses: usize,
/// The maximum (global) number of incoming responses that can be processed at any given time.
/// We need this because responses coming in need to be validated [asynchronously] that they
/// satisfy the request they are responding to
max_incoming_responses: usize,
pub max_incoming_responses: usize,
}

/// A protocol that allows for request-response communication. Is cheaply cloneable, so there is no
/// need to wrap it in an `Arc`
#[derive(Clone, Deref)]
#[derive(Deref)]
pub struct RequestResponse<
S: Sender<K>,
R: Receiver,
Req: Request,
RS: RecipientSource<K>,
RS: RecipientSource<Req, K>,
DS: DataSource<Req>,
K: SignatureKey + 'static,
> {
Expand All @@ -130,11 +129,30 @@ pub struct RequestResponse<
_receiving_task_handle: Arc<AbortOnDropHandle<()>>,
}

/// We need to manually implement the `Clone` trait for this type because deriving
/// `Deref` will cause an issue where it tries to clone the inner field instead
impl<
S: Sender<K>,
R: Receiver,
Req: Request,
RS: RecipientSource<Req, K>,
DS: DataSource<Req>,
K: SignatureKey + 'static,
> Clone for RequestResponse<S, R, Req, RS, DS, K>
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
_receiving_task_handle: Arc::clone(&self._receiving_task_handle),
}
}
}

impl<
S: Sender<K>,
R: Receiver,
Req: Request,
RS: RecipientSource<K>,
RS: RecipientSource<Req, K>,
DS: DataSource<Req>,
K: SignatureKey + 'static,
> RequestResponse<S, R, Req, RS, DS, K>
Expand Down Expand Up @@ -186,14 +204,14 @@ pub struct RequestResponseInner<
S: Sender<K>,
R: Receiver,
Req: Request,
RS: RecipientSource<K>,
RS: RecipientSource<Req, K>,
DS: DataSource<Req>,
K: SignatureKey + 'static,
> {
/// The configuration of the protocol
config: RequestResponseConfig,
/// The sender to use for the protocol
sender: S,
pub sender: S,
/// The recipient source to use for the protocol
recipient_source: RS,
/// The data source to use for the protocol
Expand All @@ -207,7 +225,7 @@ impl<
S: Sender<K>,
R: Receiver,
Req: Request,
RS: RecipientSource<K>,
RS: RecipientSource<Req, K>,
DS: DataSource<Req>,
K: SignatureKey + 'static,
> RequestResponseInner<S, R, Req, RS, DS, K>
Expand Down Expand Up @@ -303,7 +321,7 @@ impl<
// that we don't always send to the same recipients in the same order
let mut recipients = self
.recipient_source
.get_recipients_for(&request_message.request)
.get_expected_responders(&request_message.request)
.await;
recipients.shuffle(&mut rand::thread_rng());

Expand Down Expand Up @@ -617,8 +635,8 @@ mod tests {

// Implement the [`RecipientSource`] trait for the [`TestSender`] type
#[async_trait]
impl RecipientSource<BLSPubKey> for TestSender {
async fn get_recipients_for<R: Request>(&self, _request: &R) -> Vec<BLSPubKey> {
impl RecipientSource<TestRequest, BLSPubKey> for TestSender {
async fn get_expected_responders(&self, _request: &TestRequest) -> Vec<BLSPubKey> {
// Get all the participants in the network
self.network.keys().copied().collect()
}
Expand Down Expand Up @@ -671,14 +689,14 @@ mod tests {
}

#[async_trait]
impl DataSource<Vec<u8>> for TestDataSource {
async fn derive_response_for(&self, request: &Vec<u8>) -> Result<Vec<u8>> {
impl DataSource<TestRequest> for TestDataSource {
async fn derive_response_for(&self, request: &TestRequest) -> Result<Vec<u8>> {
// Return a response if we hit the hit rate
if self.has_data && Instant::now() >= self.data_available_time {
if self.take_data && !self.taken.swap(true, std::sync::atomic::Ordering::Relaxed) {
return Err(anyhow::anyhow!("data already taken"));
}
Ok(blake3::hash(request).as_bytes().to_vec())
Ok(blake3::hash(&request.0).as_bytes().to_vec())
} else {
Err(anyhow::anyhow!("did not have the data"))
}
Expand All @@ -687,16 +705,15 @@ mod tests {

/// Create and return a default protocol configuration
fn default_protocol_config() -> RequestResponseConfig {
RequestResponseConfigBuilder::create_empty()
.incoming_request_ttl(Duration::from_secs(40))
.response_send_timeout(Duration::from_secs(40))
.request_batch_size(10)
.request_batch_interval(Duration::from_millis(100))
.max_outgoing_responses(10)
.response_validate_timeout(Duration::from_secs(1))
.max_incoming_responses(5)
.build()
.expect("failed to build config")
RequestResponseConfig {
incoming_request_ttl: Duration::from_secs(40),
response_send_timeout: Duration::from_secs(40),
request_batch_size: 10,
request_batch_interval: Duration::from_millis(100),
max_outgoing_responses: 10,
response_validate_timeout: Duration::from_secs(1),
max_incoming_responses: 5,
}
}

/// Create fully connected test networks with `num_participants` participants
Expand Down Expand Up @@ -805,10 +822,10 @@ mod tests {
.push(Arc::clone(&protocol._receiving_task_handle));

// Create a random request
let request = vec![rand::thread_rng().gen(); 100];
let request = TestRequest(vec![rand::thread_rng().gen(); 100]);

// Get the hash of the request
let request_hash = blake3::hash(&request).as_bytes().to_vec();
let request_hash = blake3::hash(&request.0).as_bytes().to_vec();

// Create a new request message
let request = RequestMessage::new_signed(&public_key, &private_key, &request)
Expand Down Expand Up @@ -924,7 +941,7 @@ mod tests {
let one = Arc::new(participants.remove(0));

// Create the request that they should all be able to join on
let request = vec![rand::thread_rng().gen(); 100];
let request = TestRequest(vec![rand::thread_rng().gen(); 100]);

// Create a join set to wait for all the tasks to finish
let mut join_set = JoinSet::new();
Expand Down
4 changes: 2 additions & 2 deletions request-response/src/recipient_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::request::Request;
/// expect responses from. In `HotShot` this would go on top of the [`Membership`] trait and determine
/// which nodes are able (quorum/DA) to respond to which requests
#[async_trait]
pub trait RecipientSource<K: SignatureKey + 'static>: Send + Sync + 'static {
pub trait RecipientSource<R: Request, K: SignatureKey + 'static>: Send + Sync + 'static {
/// Get all the recipients that the specific request should expect responses from
async fn get_recipients_for<R: Request>(&self, request: &R) -> Vec<K>;
async fn get_expected_responders(&self, request: &R) -> Vec<K>;
}
12 changes: 12 additions & 0 deletions request-response/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ pub trait Request: Send + Sync + Serializable + 'static + Clone + Debug {

/// A trait that a response needs to implement
#[async_trait]
#[cfg(not(test))]
pub trait Response<R: Request>: Send + Sync + Serializable + Clone + Debug {
/// Validate the response, making sure it is valid for the given request
///
/// # Errors
/// If the response is not valid for the given request
async fn validate(&self, request: &R) -> Result<()>;
}

/// A trait that a response needs to implement
#[async_trait]
#[cfg(test)]
pub trait Response<R: Request>:
Send + Sync + Serializable + Clone + Debug + PartialEq + Eq
{
Expand Down
22 changes: 22 additions & 0 deletions sequencer-sqlite/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async-lock = { workspace = true }
async-once-cell = { workspace = true }
async-trait = { workspace = true }
bincode = { workspace = true }
byteorder = "1"

# CDN imports
cdn-broker = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.5.1-upgrade", package = "cdn-broker", features = ["global-permits"] }
Expand Down Expand Up @@ -101,6 +102,7 @@ priority-queue = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
rand_distr = { workspace = true }
request-response = { path = "../request-response" }
sequencer-utils = { path = "../utils" }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
Loading

0 comments on commit a2f0fc7

Please sign in to comment.