Skip to content

Commit

Permalink
Add updates_only mode to object store watch
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb committed Feb 17, 2025
1 parent 60f58f6 commit 55489ba
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 3 deletions.
17 changes: 14 additions & 3 deletions nats/js/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,16 @@ async def watch(
ignore_deletes=False,
include_history=False,
meta_only=False,
updates_only=False,
) -> ObjectWatcher:
"""
watch for changes in the underlying store and receive meta information updates.
:param ignore_deletes: Whether to ignore deleted objects in the updates
:param include_history: Whether to include historical values
:param meta_only: Whether to only receive metadata
:param updates_only: Whether to only receive updates after the current state
:return: An ObjectWatcher instance
"""
all_meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=self._name, )
watcher = ObjectStore.ObjectWatcher(self)
Expand All @@ -484,18 +491,22 @@ async def watch_updates(msg):

# When there are no more updates send an empty marker
# to signal that it is done, this will unblock iterators
if (not watcher._init_done) and meta.num_pending == 0:
# Only send None marker when not in updates_only mode
if (not watcher._init_done) and meta.num_pending == 0 and not updates_only:
watcher._init_done = True
await watcher._updates.put(None)

try:
await self._js.get_last_msg(self._stream, all_meta)
except NotFoundError:
watcher._init_done = True
await watcher._updates.put(None)
if not updates_only:
await watcher._updates.put(None)

deliver_policy = None
if not include_history:
if updates_only:
deliver_policy = api.DeliverPolicy.NEW
elif not include_history:
deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT

watcher._sub = await self._js.subscribe(
Expand Down
61 changes: 61 additions & 0 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -4494,3 +4494,64 @@ async def test_add_stream_invalid_names(self):
),
):
await js.add_stream(name=name)

@async_test
async def test_object_watch_updates_only(self):
errors = []

async def error_handler(e):
print("Error:", e, type(e))
errors.append(e)

nc = await nats.connect(error_cb=error_handler)
js = nc.jetstream()

obs = await js.create_object_store(
"TEST_FILES",
config=nats.js.api.ObjectStoreConfig(description="updates_only_test", ),
)

# Put some initial objects
await obs.put("A", b"A")
await obs.put("B", b"B")
await obs.put("C", b"C")

# Start watching with updates_only=True
watcher = await obs.watch(updates_only=True)

# Since updates_only=True, we should not receive any initial state
# and no None marker since there are existing objects
with pytest.raises(asyncio.TimeoutError):
await watcher.updates(timeout=1)

# New updates should be received
await obs.put("D", b"D")
e = await watcher.updates()
assert e.name == "D"
assert e.bucket == "TEST_FILES"
assert e.size == 1
assert e.chunks == 1

# Updates to existing objects should be received
await obs.put("A", b"AA")
e = await watcher.updates()
assert e.name == "A"
assert e.bucket == "TEST_FILES"
assert e.size == 2

# Deletes should be received
await obs.delete("B")
e = await watcher.updates()
assert e.name == "B"
assert e.deleted == True

# Meta updates should be received
res = await obs.get("C")
to_update_meta = res.info.meta
to_update_meta.description = "changed"
await obs.update_meta("C", to_update_meta)
e = await watcher.updates()
assert e.name == "C"
assert e.description == "changed"

await nc.close()

0 comments on commit 55489ba

Please sign in to comment.