Skip to content

Commit

Permalink
handle error properly
Browse files Browse the repository at this point in the history
  • Loading branch information
macovedj committed Feb 16, 2024
1 parent cbba788 commit 2be58ea
Show file tree
Hide file tree
Showing 21 changed files with 505 additions and 376 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
.cargo
vendor/
publish
# Test generated files
bundled.wasm
locked.wasm
214 changes: 109 additions & 105 deletions crates/client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use warg_transparency::{
};

use crate::registry_url::RegistryUrl;

/// Represents an error that occurred while communicating with the registry.
#[derive(Debug, Error)]
pub enum ClientError {
Expand Down Expand Up @@ -102,6 +101,9 @@ pub enum ClientError {
/// Invalid upload HTTP method.
#[error("server returned an invalid HTTP header `{0}: {1}`")]
InvalidHttpHeader(String, String),
/// The provided log was not found with hint header.
#[error("log `{0}` was not found in this registry, but the registry provided the hint header: `{1:?}`")]
LogNotFoundWithHint(LogId, HeaderValue),
/// An other error occurred during the requested operation.
#[error(transparent)]
Other(#[from] anyhow::Error),
Expand Down Expand Up @@ -153,50 +155,61 @@ async fn into_result<T: DeserializeOwned, E: DeserializeOwned + Into<ClientError
}
}

trait WithWargHeader {
type Client;
fn warg_header(self, registry_header: &Option<HeaderValue>) -> RequestBuilder;
}

impl WithWargHeader for RequestBuilder {
type Client = Client;
fn warg_header(self, registry_header: &Option<HeaderValue>) -> reqwest::RequestBuilder {
if let Some(reg) = registry_header {
let registry_header = HeaderName::try_from("warg-registry").unwrap();
self.header(registry_header, reg)
} else {
self
}
}
}

/// Represents a Warg API client for communicating with
/// a Warg registry server.
pub struct Client {
home_url: RegistryUrl,
url: RegistryUrl,
client: reqwest::Client,
namespace_registry: Option<String>,
warg_header: Option<HeaderValue>,
}

impl Client {
/// Creates a new API client with the given URL.
pub fn new(url: impl IntoUrl) -> Result<Self> {
let url = RegistryUrl::new(url)?;
Ok(Self {
home_url: url,
url,
client: reqwest::Client::new(),
namespace_registry: None,
warg_header: None,
})
}

/// Gets the URL of the API client.
pub fn home_url(&self) -> &RegistryUrl {
&self.home_url
pub fn url(&self) -> &RegistryUrl {
&self.url
}

/// Gets the latest checkpoint from the registry.
pub async fn latest_checkpoint(
&self,
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, ClientError> {
let url = self.home_url.join(paths::fetch_checkpoint());
let url = self.url.join(paths::fetch_checkpoint());
tracing::debug!("getting latest checkpoint at `{url}`");
if let Some(nm) = self.namespace_registry() {
let registry_header = HeaderName::try_from("warg-registry").unwrap();
let header_val = HeaderValue::try_from(nm).unwrap();
into_result::<_, FetchError>(
self.client
.get(url)
.header(registry_header, header_val)
.send()
.await?,
)
.await
} else {
into_result::<_, FetchError>(reqwest::get(url).await?).await
}
into_result::<_, FetchError>(
self.client
.get(url)
.warg_header(self.get_warg_header())
.send()
.await?,
)
.await
}

/// Gets the latest checkpoints from registries.
Expand All @@ -206,7 +219,7 @@ impl Client {
) -> Result<HashMap<String, SerdeEnvelope<TimestampedCheckpoint>>> {
let mut timestamps = HashMap::new();
for reg in registries.into_iter() {
let url = self.home_url.join(paths::fetch_checkpoint());
let url = self.url.join(paths::fetch_checkpoint());
let registry_header = HeaderName::try_from("warg-registry").unwrap();
let header_val = HeaderValue::try_from(reg).unwrap();
let res: SerdeEnvelope<TimestampedCheckpoint> = into_result::<_, FetchError>(
Expand All @@ -227,75 +240,51 @@ impl Client {
&self,
request: SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<CheckpointVerificationResponse, ClientError> {
let url = self.home_url.join(paths::verify_checkpoint());
let url = self.url.join(paths::verify_checkpoint());
tracing::debug!("verifying checkpoint at `{url}`");

let response = self.client.post(url).json(&request).send().await?;
let response = self
.client
.post(url)
.json(&request)
.warg_header(self.get_warg_header())
.send()
.await?;
into_result::<_, MonitorError>(response).await
}

/// Add warg header to request
pub fn warg_header(
&self,
namespace_registry: &Option<String>,
req: RequestBuilder,
) -> RequestBuilder {
if let Some(nm) = namespace_registry {
let registry_header = HeaderName::try_from("warg-registry").unwrap();
let header_val = HeaderValue::try_from(nm).unwrap();
req.header(registry_header, header_val)
} else {
req
}
}

/// Fetches package log entries from the registry.
pub async fn fetch_logs(
&self,
request: FetchLogsRequest<'_>,
) -> Result<FetchLogsResponse, ClientError> {
let url = self.home_url.join(paths::fetch_logs());
let url = self.url.join(paths::fetch_logs());
tracing::debug!("fetching logs at `{url}`");
let response = self
.warg_header(self.namespace_registry(), self.client.post(&url))
.client
.post(&url)
.json(&request)
.warg_header(self.get_warg_header())
.send()
.await?;

if let Some(hint) = response.headers().get("Warg-Registry-Hint") {
let hint_reg = hint.to_str().unwrap().to_owned();
let mut terms = hint_reg.split('=');
let namespace = terms.next();
let registry = terms.next();
if let (Some(namespace), Some(registry)) = (namespace, registry) {
print!(
"One of the packages you're requesting does not exist in the registry you're using.
However, the package namespace `{namespace}` does exist in the registry at {registry}.\nWould you like to configure your warg cli to use this registry for packages with this namespace in the future? y/N\n",
);
std::io::Write::flush(&mut std::io::stdout()).expect("flush failed!");
let mut buf = String::new();
std::io::stdin().read_line(&mut buf).unwrap();
let lowered = buf.to_lowercase();
if lowered == "y" || lowered == "yes" {
return into_result::<_, FetchError>(
self.warg_header(&Some(hint_reg), self.client.post(url))
.json(&request)
.send()
.await?,
)
.await;
let header = response.headers().get("Warg-Registry-Hint").cloned();
into_result::<_, FetchError>(response)
.await
.map_err(|err| match err {
ClientError::Fetch(FetchError::LogNotFound(log_id)) if header.is_some() => {
ClientError::LogNotFoundWithHint(log_id, header.unwrap())
}
}
}
into_result::<_, FetchError>(response).await
_ => err,
})
}

/// Fetches package names from the registry.
pub async fn fetch_package_names(
&self,
request: FetchPackageNamesRequest<'_>,
) -> Result<FetchPackageNamesResponse, ClientError> {
let url = self.home_url.join(paths::fetch_package_names());
let url = self.url.join(paths::fetch_package_names());
tracing::debug!("fetching package names at `{url}`");

let response = self.client.post(url).json(&request).send().await?;
Expand All @@ -304,11 +293,17 @@ impl Client {

/// Gets ledger sources from the registry.
pub async fn ledger_sources(&self) -> Result<LedgerSourcesResponse, ClientError> {
let url = self.home_url.join(paths::ledger_sources());
let url = self.url.join(paths::ledger_sources());
tracing::debug!("getting ledger sources at `{url}`");

let response = reqwest::get(url).await?;
into_result::<_, LedgerError>(response).await
into_result::<_, LedgerError>(
self.client
.get(url)
.warg_header(self.get_warg_header())
.send()
.await?,
)
.await
}

/// Publish a new record to a package log.
Expand All @@ -317,15 +312,17 @@ impl Client {
log_id: &LogId,
request: PublishRecordRequest<'_>,
) -> Result<PackageRecord, ClientError> {
let url = self.home_url.join(&paths::publish_package_record(log_id));
let url = self.url.join(&paths::publish_package_record(log_id));
tracing::debug!(
"appending record to package `{name}` at `{url}`",
name = request.package_name
);

let response = self
.warg_header(&self.namespace_registry, self.client.post(url))
.client
.post(url)
.json(&request)
.warg_header(self.get_warg_header())
.send()
.await?;
into_result::<_, PackageError>(response).await
Expand All @@ -337,35 +334,35 @@ impl Client {
log_id: &LogId,
record_id: &RecordId,
) -> Result<PackageRecord, ClientError> {
let url = self
.home_url
.join(&paths::package_record(log_id, record_id));
let url = self.url.join(&paths::package_record(log_id, record_id));
tracing::debug!("getting record `{record_id}` for package `{log_id}` at `{url}`");

let response = if let Some(nm) = self.namespace_registry() {
let registry_header = HeaderName::try_from("warg-registry").unwrap();
let header_val = HeaderValue::try_from(nm).unwrap();
into_result::<_, PackageError>(
self.client
.get(url)
.header(registry_header, header_val)
.warg_header(self.get_warg_header())
.send()
.await?
} else {
reqwest::get(url).await?
};
into_result::<_, PackageError>(response).await
.await?,
)
.await
}

/// Gets a content sources from the registry.
pub async fn content_sources(
&self,
digest: &AnyHash,
) -> Result<ContentSourcesResponse, ClientError> {
let url = self.home_url.join(&paths::content_sources(digest));
let url = self.url.join(&paths::content_sources(digest));
tracing::debug!("getting content sources for digest `{digest}` at `{url}`");

let response = reqwest::get(url).await?;
into_result::<_, ContentError>(response).await
into_result::<_, ContentError>(
self.client
.get(url)
.warg_header(self.get_warg_header())
.send()
.await?,
)
.await
}

/// Downloads the content associated with a given record.
Expand All @@ -386,7 +383,12 @@ impl Client {

tracing::debug!("downloading content `{digest}` from `{url}`");

let response = self.client.get(url).send().await?;
let response = self
.client
.get(url)
.warg_header(self.get_warg_header())
.send()
.await?;
if !response.status().is_success() {
tracing::debug!(
"failed to download content `{digest}` from `{url}`: {status}",
Expand All @@ -402,13 +404,13 @@ impl Client {
}

/// Map namespace
pub fn map_namespace(&mut self, registry: Option<String>) {
self.namespace_registry = registry;
pub fn map_warg_header(&mut self, registry: Option<HeaderValue>) {
self.warg_header = registry;
}

/// Get namespace registry
pub fn namespace_registry(&self) -> &Option<String> {
&self.namespace_registry
pub fn get_warg_header(&self) -> &Option<HeaderValue> {
&self.warg_header
}

/// Proves the inclusion of the given package log heads in the registry.
Expand All @@ -418,12 +420,14 @@ impl Client {
checkpoint: &Checkpoint,
leafs: &[LogLeaf],
) -> Result<(), ClientError> {
let url = self.home_url.join(paths::prove_inclusion());
let url = self.url.join(paths::prove_inclusion());
tracing::debug!("proving checkpoint inclusion at `{url}`");

let response = into_result::<InclusionResponse, ProofError>(
self.warg_header(self.namespace_registry(), self.client.post(url))
self.client
.post(url)
.json(&request)
.warg_header(&self.get_warg_header())
.send()
.await?,
)
Expand All @@ -439,10 +443,12 @@ impl Client {
from_log_root: Cow<'_, AnyHash>,
to_log_root: Cow<'_, AnyHash>,
) -> Result<(), ClientError> {
let url = self.home_url.join(paths::prove_consistency());
let url = self.url.join(paths::prove_consistency());
let response = into_result::<ConsistencyResponse, ProofError>(
self.warg_header(self.namespace_registry(), self.client.post(url))
self.client
.post(url)
.json(&request)
.warg_header(self.get_warg_header())
.send()
.await?,
)
Expand Down Expand Up @@ -494,7 +500,7 @@ impl Client {
content: impl Into<Body>,
) -> Result<(), ClientError> {
// Upload URLs may be relative to the registry URL.
let url = self.home_url.join(url);
let url = self.url.join(url);

let method = match method {
"POST" => Method::POST,
Expand All @@ -519,13 +525,11 @@ impl Client {
tracing::debug!("uploading content to `{url}`");

let response = self
.warg_header(
self.namespace_registry(),
self.client
.request(method, url)
.headers(headers)
.body(content),
)
.client
.request(method, url)
.warg_header(self.get_warg_header())
.headers(headers)
.body(content)
.send()
.await?;
if !response.status().is_success() {
Expand Down
Loading

0 comments on commit 2be58ea

Please sign in to comment.