Skip to content

Commit

Permalink
Add heartbeat handling to key iteration (#203)
Browse files Browse the repository at this point in the history
* Add heartbeat detection to listing keys and history. In cases where the stream is purged as the client is making progress it could stall it as client is relying on numpending to signal out of the processing. By detecting a heartbeat, that means the server didn't have anything to send in the last 5s, providing a hint that we can use to signal that the operation is done.

Introduce a heartbeat case to handle key iteration status updates. This ensures proper key fetching and stops the iteration appropriately upon receiving a heartbeat.

* history for kv has the same issue - if values are purged in flight, the iteration may hang.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>
  • Loading branch information
aricart authored Feb 5, 2025
1 parent 3d448c9 commit 47338f4
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions kv/src/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,19 @@ export class Bucket implements KV {
qi.push(fn);
});

(async () => {
for await (const s of iter.status()) {
switch (s.type) {
// if we get a heartbeat we got all the keys
case "heartbeat":
qi.push(() => {
qi.stop();
});
break;
}
}
})().then();

// if they break from the iterator stop the consumer
qi.iterClosed.then(() => {
iter.stop();
Expand Down Expand Up @@ -899,6 +912,19 @@ export class Bucket implements KV {
},
});

(async () => {
for await (const s of iter.status()) {
switch (s.type) {
// if we get a heartbeat we got all the keys
case "heartbeat":
keys.push(() => {
keys.stop();
});
break;
}
}
})().then();

iter.closed().then(() => {
keys.push(() => {
keys.stop();
Expand Down

0 comments on commit 47338f4

Please sign in to comment.