Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of large responses in nasty client #2320

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ jobs:
run: |
cargo build --locked --release --workspace

- name: Build Espresso Dev Node
# Espresso Dev Node currently requires testing feature, so it is built separately.
- name: Build Dev Tools
# Some dev tools require additional features that we don't want included in the production
# build, so they are built separately.
run: |
cargo build --locked --release --features testing --bin espresso-dev-node
cargo build --locked --release --features nasty-client,testing --bin nasty-client --bin espresso-dev-node

- name: Upload artifacts
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -100,10 +101,11 @@ jobs:
run: |
cargo build --locked --release --workspace

- name: Build Espresso Dev Node
# Espresso Dev Node currently requires testing feature, so it is built separately.
- name: Build Dev Tools
# Some dev tools require additional features that we don't want included in the production
# build, so they are built separately.
run: |
cargo build --locked --release --features testing --bin espresso-dev-node
cargo build --locked --release --features nasty-client,testing --bin nasty-client --bin espresso-dev-node

- name: Upload artifacts
uses: actions/upload-artifact@v4
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ testing = [
"hotshot-query-service/testing",
]
benchmarking = []
nasty-client = ["reqwest", "bytesize"]

[[bin]]
name = "espresso-dev-node"
required-features = ["testing"]

[[bin]]
name = "nasty-client"
required-features = ["nasty-client"]

[dev-dependencies]
escargot = "0.5.10"
espresso-macros = { git = "https://github.com/EspressoSystems/espresso-macros.git", tag = "0.1.0" }
Expand Down Expand Up @@ -125,6 +130,10 @@ url = { workspace = true }
vbs = { workspace = true }
vec1 = { workspace = true }

# Dependencies for nasty-client
bytesize = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }

[package.metadata.cargo-udeps.ignore]
normal = ["hotshot-testing"]

Expand Down
77 changes: 59 additions & 18 deletions sequencer/src/bin/nasty-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::{bail, ensure, Context};
use async_lock::RwLock;
use bytesize::ByteSize;
use clap::Parser;
use committable::Committable;
use derivative::Derivative;
Expand All @@ -38,6 +39,7 @@ use jf_merkle_tree::{
ForgetableMerkleTreeScheme, MerkleCommitment, MerkleTreeScheme, UniversalMerkleTreeScheme,
};
use rand::{seq::SliceRandom, RngCore};
use reqwest::header::ACCEPT;
use sequencer::{api::endpoints::NamespaceProofQueryData, SequencerApiVersion};
use sequencer_utils::logging;
use serde::de::DeserializeOwned;
Expand All @@ -50,13 +52,13 @@ use std::{
time::{Duration, Instant},
};
use strum::{EnumDiscriminants, VariantArray};
use surf_disco::{error::ClientError, socket, Error, StatusCode, Url};
use surf_disco::{error::ClientError, socket, StatusCode, Url};
use tide_disco::{error::ServerError, App};
use time::OffsetDateTime;
use tokio::{task::spawn, time::sleep};
use toml::toml;
use tracing::info_span;
use vbs::version::StaticVersionType;
use vbs::{version::StaticVersionType, BinarySerializer, Serializer};

/// An adversarial stress test for sequencer APIs.
#[derive(Clone, Debug, Parser)]
Expand Down Expand Up @@ -90,18 +92,22 @@ struct ClientConfig {
///
/// Requests that take longer than this will fail, causing an error log and an increment of the
/// `failed_actions` metric.
///
/// Note that this time includes the time taken to stream the response body, so it should be set
/// somewhat conservatively to allow time to stream large (few MB) responses.
#[clap(
long,
env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_ERROR",
default_value = "5s",
default_value = "60s",
value_parser = parse_duration,
)]
http_timeout_error: Duration,

/// Timeout for issuing a warning due to slow HTTP requests.
///
/// Requests that take longer than this but shorter than HTTP_TIMEOUT_ERROR will not generate an
/// error but will output a warning and increment a counter of slow HTTP requests.
/// Requests that take longer than this but shorter than HTTP_TIMEOUT_ERROR to return at least a
/// response header will not generate an error but will output a warning and increment a counter
/// of slow HTTP requests.
#[clap(
long,
env = "ESPRESSO_NASTY_CLIENT_HTTP_TIMEOUT_WARNING",
Expand Down Expand Up @@ -262,6 +268,8 @@ struct Metrics {
query_fee_state_actions: Box<dyn Counter>,
slow_requests: Box<dyn Counter>,
request_latency: Box<dyn Histogram>,
request_bandwidth: Box<dyn Histogram>,
response_size: Box<dyn Histogram>,
}

impl Metrics {
Expand Down Expand Up @@ -350,6 +358,12 @@ impl Metrics {
request_latency: registry
.subgroup("http".into())
.create_histogram("latency".into(), Some("s".into())),
request_bandwidth: registry
.subgroup("http".into())
.create_histogram("bandwidth".into(), Some("MiB/s".into())),
response_size: registry
.subgroup("http".into())
.create_histogram("response_size".into(), Some("mb".into())),
}
}
}
Expand Down Expand Up @@ -462,7 +476,9 @@ struct Subscription<T: Queryable> {

#[derive(Debug)]
struct ResourceManager<T: Queryable> {
client: surf_disco::Client<ClientError, SequencerApiVersion>,
stream_client: surf_disco::Client<ClientError, SequencerApiVersion>,
get_client: reqwest::Client,
base_url: Url,
open_streams: BTreeMap<u64, Subscription<T>>,
next_stream_id: u64,
metrics: Arc<Metrics>,
Expand All @@ -472,9 +488,12 @@ struct ResourceManager<T: Queryable> {
impl<T: Queryable> ResourceManager<T> {
fn new(opt: &Options, metrics: Arc<Metrics>) -> Self {
Self {
client: surf_disco::Client::builder(opt.url.clone())
.set_timeout(Some(opt.client_config.http_timeout_error))
.build(),
stream_client: surf_disco::Client::builder(opt.url.clone()).build(),
get_client: reqwest::Client::builder()
.timeout(opt.client_config.http_timeout_error)
.build()
.unwrap(),
base_url: opt.url.clone(),
open_streams: BTreeMap::new(),
next_stream_id: 0,
metrics,
Expand Down Expand Up @@ -551,14 +570,25 @@ impl<T: Queryable> ResourceManager<T> {
tracing::debug!("-> GET {path}");

let start = Instant::now();
let res = self.client.get::<R>(&path).send().await;
let res = self
.get_client
.get(self.base_url.join(&path)?)
.header(ACCEPT, "application/octet-stream")
.send()
.await
.context(format!("error sending request {path}"))?;
let elapsed = start.elapsed();
let status = res.status();

let status = match &res {
Ok(_) => StatusCode::OK,
Err(err) => err.status(),
};
tracing::debug!("<- GET {path} {} ({elapsed:?})", u16::from(status));
// Time the body separately; we don't want to penalize the server for time spent streaming a
// large response over the network; we're more interested in computation time on the server
// itself to generate the response.
let body_start = Instant::now();
let body = res.bytes().await.context("error streaming response body")?;
let body_elapsed = body_start.elapsed();
let body_size = ByteSize::b(body.len() as u64);

tracing::debug!("<- GET {path} {status} ({elapsed:?}) ({body_size} in {body_elapsed:?})",);

self.metrics
.request_latency
Expand All @@ -568,7 +598,18 @@ impl<T: Queryable> ResourceManager<T> {
tracing::warn!(%path, ?elapsed, "slow request");
}

res.context(format!("GET {path}"))
self.metrics.request_bandwidth.add_point(
((body_size.0 as f64) / (ByteSize::mib(1).0 as f64)) / (body_elapsed.as_secs() as f64),
);
self.metrics
.response_size
.add_point((body_size.0 as f64) / (ByteSize::kb(1).0 as f64));

ensure!(
status == StatusCode::OK,
"{path}: error from server: {status}",
);
Serializer::<SequencerApiVersion>::deserialize(&body).context("decoding response body")
}

async fn query(&self, at: u64) -> anyhow::Result<()> {
Expand Down Expand Up @@ -701,7 +742,7 @@ impl<T: Queryable> ResourceManager<T> {

let from = self.adjust_index(from).await?;
let stream = self
.client
.stream_client
.socket(&format!("availability/stream/{}/{from}", Self::plural()))
.subscribe()
.await
Expand Down Expand Up @@ -804,7 +845,7 @@ impl<T: Queryable> ResourceManager<T> {
// but refresh the connection and try again.
tracing::warn!("error in old connection, refreshing connection: {err:#}");
let conn = self
.client
.stream_client
.socket(&format!("availability/stream/{}/{pos}", Self::plural()))
.subscribe()
.await
Expand Down
Loading