Skip to content

Commit

Permalink
refactor retry logic to occur at command level
Browse files Browse the repository at this point in the history
  • Loading branch information
macovedj committed Feb 17, 2024
1 parent fbe1a4a commit b2bea99
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 100 deletions.
3 changes: 1 addition & 2 deletions crates/client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ impl WithWargHeader for RequestBuilder {
type Client = Client;
fn warg_header(self, registry_header: &Option<HeaderValue>) -> reqwest::RequestBuilder {
if let Some(reg) = registry_header {
let registry_header = HeaderName::try_from(REGISTRY_HEADER_NAME).unwrap();
self.header(registry_header, reg)
self.header(REGISTRY_HEADER_NAME, reg)
} else {
self
}
Expand Down
80 changes: 25 additions & 55 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C,
}

/// Check operator log for namespace mapping
pub async fn fetch_namespace(&mut self, namespace: &str) -> ClientResult<()> {
pub async fn refresh_namespace(&mut self, namespace: &str) -> ClientResult<()> {
self.update_checkpoint(&self.api.latest_checkpoint().await?, vec![])
.await?;
let operator = self.registry().load_operator(&None).await.unwrap();
let operator = self.registry().load_operator(&None).await?;
let operator_log_maps_namespace = if let Some(op) = operator {
let namespace_state = op.state.namespace_state(namespace);
if let Ok(Some(nm)) = namespace_state {
Expand Down Expand Up @@ -607,7 +607,8 @@ impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C,
.collect::<HashMap<_, _>>();

loop {
let response: FetchLogsResponse = match self
// let response: FetchLogsResponse = match self
let response: FetchLogsResponse = self
.api
.fetch_logs(FetchLogsRequest {
log_length: checkpoint.log_length,
Expand All @@ -619,59 +620,11 @@ impl<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage> Client<R, C,
packages: Cow::Borrowed(&last_known),
})
.await
{
Ok(res) => res,
Err(api::ClientError::LogNotFoundWithHint(log_id, header)) => {
let hint_reg = header.to_str().unwrap();
let mut terms = hint_reg.split('=');
let namespace = terms.next();
let registry = terms.next();
let resp = if let (Some(namespace), Some(registry)) = (namespace, registry) {
print!(
"One of the packages you're requesting does not exist in the registry you're using.
However, the package namespace `{namespace}` does exist in the registry at {registry}.\nWould you like to configure your warg cli to use this registry for packages with this namespace in the future? y/N\n",
);
std::io::Write::flush(&mut std::io::stdout()).expect("flush failed!");
let mut buf = String::new();
std::io::stdin().read_line(&mut buf).unwrap();
let lowered = buf.to_lowercase();
if lowered == "y" || lowered == "yes" {
self.store_namespace(namespace.to_string(), registry.to_string())
.await?;
Some(
self.api
.fetch_logs(FetchLogsRequest {
log_length: checkpoint.log_length,
operator: operator
.head_fetch_token
.as_ref()
.map(|t| Cow::Borrowed(t.as_str())),
limit: None,
packages: Cow::Borrowed(&last_known),
})
.await?,
)
} else {
None
}
} else {
None
};
if let Some(resp) = resp {
resp
} else {
return Err(ClientError::translate_log_not_found(
api::ClientError::Fetch(FetchError::LogNotFound(log_id)),
|id| packages.get(id).map(|p| p.name.clone()),
));
}
}
Err(e) => {
return Err(ClientError::translate_log_not_found(e, |id| {
.map_err(|e| {
ClientError::translate_log_not_found(e, |id| {
packages.get(id).map(|p| p.name.clone())
}))
}
};
})
})?;

for record in response.operator {
let proto_envelope: PublishedProtoEnvelope<operator::OperatorRecord> =
Expand Down Expand Up @@ -1096,6 +1049,15 @@ pub enum ClientError {
name: PackageName,
},

/// The package does not exist with hint.
#[error("package `{name}` does not exist")]
PackageDoesNotExistWithHint {
/// The missing package.
name: PackageName,
/// The registry hint
hint: HeaderValue,
},

/// The package version does not exist.
#[error("version `{version}` of package `{name}` does not exist")]
PackageVersionDoesNotExist {
Expand Down Expand Up @@ -1191,6 +1153,14 @@ impl ClientError {
return Self::PackageDoesNotExist { name };
}
}
api::ClientError::LogNotFoundWithHint(log_id, hint) => {
if let Some(name) = lookup(log_id) {
return Self::PackageDoesNotExistWithHint {
name,
hint: hint.clone(),
};
}
}
_ => {}
}

Expand Down
117 changes: 107 additions & 10 deletions src/bin/warg.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use anyhow::Result;
use clap::Parser;
use dialoguer::{theme::ColorfulTheme, Confirm};
use std::process::exit;
use tracing_subscriber::EnvFilter;
use warg_cli::commands::{
BundleCommand, ClearCommand, ConfigCommand, DependenciesCommand, DownloadCommand, InfoCommand,
KeyCommand, LockCommand, PublishCommand, ResetCommand, UpdateCommand,
KeyCommand, LockCommand, PublishCommand, ResetCommand, Retry, UpdateCommand,
};
use warg_client::ClientError;

Expand Down Expand Up @@ -46,17 +47,17 @@ async fn main() -> Result<()> {
WargCli::Config(cmd) => cmd.exec().await,
WargCli::Info(cmd) => cmd.exec().await,
WargCli::Key(cmd) => cmd.exec().await,
WargCli::Lock(cmd) => cmd.exec().await,
WargCli::Bundle(cmd) => cmd.exec().await,
WargCli::Dependencies(cmd) => cmd.exec().await,
WargCli::Download(cmd) => cmd.exec().await,
WargCli::Update(cmd) => cmd.exec().await,
WargCli::Publish(cmd) => cmd.exec().await,
WargCli::Lock(cmd) => cmd.exec(None).await,
WargCli::Bundle(cmd) => cmd.exec(None).await,
WargCli::Dependencies(cmd) => cmd.exec(None).await,
WargCli::Download(cmd) => cmd.exec(None).await,
WargCli::Update(cmd) => cmd.exec(None).await,
WargCli::Publish(cmd) => cmd.exec(None).await,
WargCli::Reset(cmd) => cmd.exec().await,
WargCli::Clear(cmd) => cmd.exec().await,
} {
if let Some(e) = e.downcast_ref::<ClientError>() {
describe_client_error(e);
describe_client_error_or_retry(e).await?;
} else {
eprintln!("error: {e:?}");
}
Expand All @@ -66,7 +67,7 @@ async fn main() -> Result<()> {
Ok(())
}

fn describe_client_error(e: &ClientError) {
async fn describe_client_error_or_retry(e: &ClientError) -> Result<()> {
match e {
ClientError::NoHomeRegistryUrl => {
eprintln!("error: {e}; use the `config` subcommand to set a home registry URL");
Expand All @@ -78,6 +79,102 @@ fn describe_client_error(e: &ClientError) {
eprintln!("error: the log for package `{name}` is empty (the registry could be lying)");
eprintln!("see issue https://github.com/bytecodealliance/registry/issues/66");
}
_ => eprintln!("error: {e}"),
ClientError::PackageDoesNotExistWithHint { name, hint } => {
let hint_reg = hint.to_str().unwrap();
let mut terms = hint_reg.split('=');
let namespace = terms.next();
let registry = terms.next();
if let (Some(namespace), Some(registry)) = (namespace, registry) {
let prompt = format!(
"The package `{}`, does not exist in the registry you're using.\nHowever, the package namespace `{namespace}` does exist in the registry at {registry}.\nWould you like to configure your warg cli to use this registry for packages with this namespace in the future? y/N\n",
name.name()
);
if Confirm::with_theme(&ColorfulTheme::default())
.with_prompt(prompt)
.interact()
.unwrap()
{
if let Err(e) = match WargCli::parse() {
WargCli::Config(cmd) => cmd.exec().await,
WargCli::Info(cmd) => cmd.exec().await,
WargCli::Key(cmd) => cmd.exec().await,
WargCli::Lock(cmd) => {
cmd.exec(Some(Retry::new(
namespace.to_string(),
registry.to_string(),
)))
.await
}
WargCli::Bundle(cmd) => {
cmd.exec(Some(Retry::new(
namespace.to_string(),
registry.to_string(),
)))
.await
}
WargCli::Dependencies(cmd) => {
cmd.exec(Some(Retry::new(
namespace.to_string(),
registry.to_string(),
)))
.await
}
WargCli::Download(cmd) => {
cmd.exec(Some(Retry::new(
namespace.to_string(),
registry.to_string(),
)))
.await
}
WargCli::Update(cmd) => {
cmd.exec(Some(Retry::new(
namespace.to_string(),
registry.to_string(),
)))
.await
}
WargCli::Publish(cmd) => {
cmd.exec(Some(Retry::new(
namespace.to_string(),
registry.to_string(),
)))
.await
}
WargCli::Reset(cmd) => cmd.exec().await,
WargCli::Clear(cmd) => cmd.exec().await,
} {
if let Some(e) = e.downcast_ref::<ClientError>() {
describe_client_error(e).await?;
} else {
eprintln!("error: {e:?}");
}
exit(1);
}
}
}
}
_ => {
eprintln!("error: {e}")
}
}
Ok(())
}

async fn describe_client_error(e: &ClientError) -> Result<()> {
match e {
ClientError::NoHomeRegistryUrl => {
eprintln!("error: {e}; use the `config` subcommand to set a default URL");
}
ClientError::PackageValidationFailed { name, inner } => {
eprintln!("error: the log for package `{name}` is invalid: {inner}")
}
ClientError::PackageLogEmpty { name } => {
eprintln!("error: the log for package `{name}` is empty (the registry could be lying)");
eprintln!("see issue https://github.com/bytecodealliance/registry/issues/66");
}
_ => {
eprintln!("error: {e}")
}
}
Ok(())
}
31 changes: 31 additions & 0 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use anyhow::Context;
use anyhow::Result;
use clap::Args;
use std::path::PathBuf;
use warg_client::storage::ContentStorage;
use warg_client::storage::NamespaceMapStorage;
use warg_client::storage::RegistryStorage;
use warg_client::Client;
use warg_client::RegistryUrl;
use warg_client::{ClientError, Config, FileSystemClient, StorageLockResult};
use warg_crypto::signing::PrivateKey;
Expand Down Expand Up @@ -98,3 +102,30 @@ impl CommonOptions {
}
}
}

/// Namespace mapping to store when retrying a command after receiving a hint header
pub struct Retry {
namespace: String,
registry: String,
}

impl Retry {
/// New Retry
pub fn new(namespace: String, registry: String) -> Self {
Self {
namespace,
registry,
}
}

/// Map namespace using Retry information
pub async fn store_namespace<R: RegistryStorage, C: ContentStorage, N: NamespaceMapStorage>(
&self,
client: &Client<R, C, N>,
) -> Result<()> {
client
.store_namespace(self.namespace.clone(), self.registry.clone())
.await?;
Ok(())
}
}
9 changes: 6 additions & 3 deletions src/commands/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::CommonOptions;
use super::{CommonOptions, Retry};
use anyhow::{bail, Result};
use clap::Args;
use semver::VersionReq;
Expand All @@ -18,10 +18,13 @@ pub struct BundleCommand {

impl BundleCommand {
/// Executes the command.
pub async fn exec(self) -> Result<()> {
pub async fn exec(self, retry: Option<Retry>) -> Result<()> {
let config = self.common.read_config()?;
let mut client = self.common.create_client(&config)?;
client.fetch_namespace(self.package.namespace()).await?;
if let Some(retry) = retry {
retry.store_namespace(&client).await?
}
client.refresh_namespace(self.package.namespace()).await?;
println!("registry: {url}", url = client.url());
if let Some(info) = client
.registry()
Expand Down
9 changes: 6 additions & 3 deletions src/commands/dependencies.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::CommonOptions;
use super::{CommonOptions, Retry};
use anyhow::{bail, Result};
use async_recursion::async_recursion;
use clap::Args;
Expand Down Expand Up @@ -28,10 +28,13 @@ pub struct DependenciesCommand {

impl DependenciesCommand {
/// Executes the command.
pub async fn exec(self) -> Result<()> {
pub async fn exec(self, retry: Option<Retry>) -> Result<()> {
let config = self.common.read_config()?;
let mut client = self.common.create_client(&config)?;
client.fetch_namespace(self.package.namespace()).await?;
if let Some(retry) = retry {
retry.store_namespace(&client).await?
}
client.refresh_namespace(self.package.namespace()).await?;

if let Some(info) = client
.registry()
Expand Down
9 changes: 6 additions & 3 deletions src/commands/download.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::CommonOptions;
use super::{CommonOptions, Retry};
use anyhow::{anyhow, Result};
use clap::Args;
use warg_protocol::{registry::PackageName, VersionReq};
Expand All @@ -20,10 +20,13 @@ pub struct DownloadCommand {

impl DownloadCommand {
/// Executes the command.
pub async fn exec(self) -> Result<()> {
pub async fn exec(self, retry: Option<Retry>) -> Result<()> {
let config = self.common.read_config()?;
let mut client = self.common.create_client(&config)?;
client.fetch_namespace(self.name.namespace()).await?;
if let Some(retry) = retry {
retry.store_namespace(&client).await?
}
client.refresh_namespace(self.name.namespace()).await?;

println!("downloading package `{name}`...", name = self.name);

Expand Down
Loading

0 comments on commit b2bea99

Please sign in to comment.