From 33dbbdc36f97400350839bfc51816ac4ad6d022f Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 23 Sep 2024 16:18:42 +0200 Subject: [PATCH] Improve kv::Watcher without messages Until now, if underlying watcher for given consumer did not have any pending messages, it would indefinitely wait for the first one. This commit improves it by checking message pending count on initial consumer info, and returning `None` if there are no messages. Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/kv/mod.rs | 6 ++++++ async-nats/tests/kv_tests.rs | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 9f48668e0..439a20d91 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -601,6 +601,8 @@ impl Store { })?; Ok(Watch { + no_messages: deliver_policy != DeliverPolicy::New + && consumer.cached_info().num_pending == 0, subscription: consumer.messages().await.map_err(|err| match err.kind() { crate::jetstream::consumer::StreamErrorKind::TimedOut => { WatchError::new(WatchErrorKind::TimedOut) @@ -1072,6 +1074,7 @@ impl Store { /// A structure representing a watch on a key-value bucket, yielding values whenever there are changes. pub struct Watch { + no_messages: bool, seen_current: bool, subscription: super::consumer::push::Ordered, prefix: String, @@ -1085,6 +1088,9 @@ impl futures::Stream for Watch { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + if self.no_messages { + return Poll::Ready(None); + } match self.subscription.poll_next_unpin(cx) { Poll::Ready(message) => match message { None => Poll::Ready(None), diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 1901ca092..19b670478 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -532,6 +532,29 @@ mod kv { } } } + + #[tokio::test] + async fn watch_no_messages() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let context = async_nats::jetstream::new(client); + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "history".to_string(), + description: "test_description".to_string(), + history: 15, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + + let mut watcher = kv.watch_with_history("foo").await.unwrap(); + assert!(watcher.next().await.is_none()); + } + #[tokio::test] async fn watch() { let server = nats_server::run_server("tests/configs/jetstream.conf");