Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of ìnto_future` for requests and messages #782

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 149 additions & 109 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use lazy_static::lazy_static;
use regex::Regex;
use std::error;
use std::fmt;
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{self, ErrorKind};
use tokio::sync::mpsc;
use tracing::trace;

lazy_static! {
static ref VERSION_RE: Regex = Regex::new(r#"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?"#).unwrap();
Expand All @@ -37,6 +38,63 @@ lazy_static! {
/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
pub struct PublishError(mpsc::error::SendError<Command>);

pub struct Publish {
sender: mpsc::Sender<Command>,
subject: String,
payload: Bytes,
headers: Option<HeaderMap>,
respond: Option<String>,
}

impl Publish {
pub fn new(sender: mpsc::Sender<Command>, subject: String, payload: Bytes) -> Publish {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not.

Publish {
sender,
subject,
payload,
headers: None,
respond: None,
}
}

pub fn headers(mut self, headers: HeaderMap) -> Publish {
self.headers = Some(headers);
self
}

pub fn reply(mut self, subject: String) -> Publish {
self.respond = Some(subject);
self
}
}

impl IntoFuture for Publish {
type Output = Result<(), PublishError>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let sender = self.sender.clone();
let subject = self.subject;
let payload = self.payload;
let respond = self.respond;
let headers = self.headers;

Box::pin(async move {
sender
.send(Command::Publish {
subject,
payload,
respond,
headers,
})
.map_err(PublishError)
.await?;

Ok(())
})
}
}

impl fmt::Debug for PublishError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PublishError").finish_non_exhaustive()
Expand Down Expand Up @@ -154,17 +212,8 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub async fn publish(&self, subject: String, payload: Bytes) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: None,
headers: None,
})
.map_err(PublishError)
.await?;
Ok(())
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.sender.clone(), subject, payload)
}

/// Publish a [Message] with headers to a given subject.
Expand All @@ -187,15 +236,8 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: None,
headers: Some(headers),
})
.map_err(PublishError)
.await?;
self.publish(subject, payload).headers(headers).await?;

Ok(())
}

Expand All @@ -219,15 +261,8 @@ impl Client {
reply: String,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: Some(reply),
headers: None,
})
.map_err(PublishError)
.await?;
self.publish(subject, payload).reply(reply).await?;

Ok(())
}

Expand All @@ -254,15 +289,11 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: Some(reply),
headers: Some(headers),
})
.map_err(PublishError)
self.publish(subject, payload)
.headers(headers)
.reply(reply)
.await?;

Ok(())
}

Expand All @@ -278,10 +309,8 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub async fn request(&self, subject: String, payload: Bytes) -> Result<Message, Error> {
trace!("request sent to subject: {} ({})", subject, payload.len());
let request = Request::new().payload(payload);
self.send_request(subject, request).await
pub fn request(&self, subject: String, payload: Bytes) -> Request {
Request::new(self.clone(), subject, payload)
}

/// Sends the request with headers.
Expand All @@ -304,59 +333,11 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, Error> {
let request = Request::new().headers(headers).payload(payload);
self.send_request(subject, request).await
}
let message = Request::new(self.clone(), subject, payload)
.headers(headers)
.await?;

/// Sends the request created by the [Request].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new().payload("data".into());
/// let response = client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub async fn send_request(&self, subject: String, request: Request) -> Result<Message, Error> {
let inbox = request.inbox.unwrap_or_else(|| self.new_inbox());
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush().await?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|_| std::io::Error::new(ErrorKind::TimedOut, "request timed out"))
.await?
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(Box::new(std::io::Error::new(
ErrorKind::NotFound,
"nats: no responders",
)));
}
Ok(message)
}
None => Err(Box::new(io::Error::new(
ErrorKind::BrokenPipe,
"did not receive any message",
))),
}
Ok(message)
}

/// Create a new globally unique inbox which can be used for replies.
Expand Down Expand Up @@ -480,18 +461,27 @@ impl Client {
}
}

/// Used for building customized requests.
#[derive(Default)]
/// Used for building and sending requests.
#[derive(Debug)]
pub struct Request {
client: Client,
subject: String,
payload: Option<Bytes>,
headers: Option<HeaderMap>,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}

impl Request {
pub fn new() -> Request {
Default::default()
pub fn new(client: Client, subject: String, payload: Bytes) -> Request {
Request {
client,
subject,
payload: Some(payload),
headers: None,
timeout: None,
inbox: None,
}
}

/// Sets the payload of the request. If not used, empty payload will be sent.
Expand All @@ -501,8 +491,7 @@ impl Request {
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new().payload("data".into());
/// client.send_request("service".into(), request).await?;
/// client.request("subject".into(), "data".into()).await?;
/// # Ok(())
/// # }
/// ```
Expand All @@ -521,10 +510,11 @@ impl Request {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut headers = async_nats::HeaderMap::new();
/// headers.insert("X-Example", async_nats::HeaderValue::from_str("Value").unwrap());
/// let request = async_nats::Request::new()
///
/// client.request("subject".into(), "payload".into())
/// .headers(headers)
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// .await?;
///
/// # Ok(())
/// # }
/// ```
Expand All @@ -542,10 +532,10 @@ impl Request {
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new()
/// client.request("service".into(), "data".into())
/// .timeout(Some(std::time::Duration::from_secs(15)))
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// .await?;
///
/// # Ok(())
/// # }
/// ```
Expand All @@ -562,15 +552,65 @@ impl Request {
/// # async fn main() -> Result<(), async_nats::Error> {
/// use std::str::FromStr;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new()
/// client.request("subject".into(), "payload".into())
/// .inbox("custom_inbox".into())
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn inbox(mut self, inbox: String) -> Request {
self.inbox = Some(inbox);
self
}

async fn send(self) -> Result<Message, Error> {
let inbox = self.inbox.unwrap_or_else(|| self.client.new_inbox());
let mut subscriber = self.client.subscribe(inbox.clone()).await?;
let mut publish = self
.client
.publish(self.subject, self.payload.unwrap_or_else(Bytes::new));
if let Some(headers) = self.headers {
publish = publish.headers(headers);
}

publish = publish.reply(inbox);
publish.into_future().await?;

self.client.flush().await?;

let period = self.timeout.unwrap_or(self.client.request_timeout);
let message = match period {
Some(period) => {
tokio::time::timeout(period, subscriber.next())
.map_err(|_| std::io::Error::new(ErrorKind::TimedOut, "request timed out"))
.await?
}
None => subscriber.next().await,
};

match message {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(Box::new(std::io::Error::new(
ErrorKind::NotFound,
"nats: no responders",
)));
}
Ok(message)
}
None => Err(Box::new(io::Error::new(
ErrorKind::BrokenPipe,
"did not receive any message",
))),
}
}
}

impl IntoFuture for Request {
type Output = Result<Message, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Message, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(self.send())
}
}
Loading