From 47338f46b6041f011d180808d708aab5d668da2b Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 5 Feb 2025 08:30:30 -0600 Subject: [PATCH] Add heartbeat handling to key iteration (#203) * Add heartbeat detection to listing keys and history. In cases where the stream is purged as the client is making progress it could stall it as client is relying on numpending to signal out of the processing. By detecting a heartbeat, that means the server didn't have anything to send in the last 5s, providing a hint that we can use to signal that the operation is done. Introduce a heartbeat case to handle key iteration status updates. This ensures proper key fetching and stops the iteration appropriately upon receiving a heartbeat. * history for kv has the same issue - if values are purged in flight, the iteration may hang. Signed-off-by: Alberto Ricart --------- Signed-off-by: Alberto Ricart --- kv/src/kv.ts | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/kv/src/kv.ts b/kv/src/kv.ts index e7970970..710ba375 100644 --- a/kv/src/kv.ts +++ b/kv/src/kv.ts @@ -797,6 +797,19 @@ export class Bucket implements KV { qi.push(fn); }); + (async () => { + for await (const s of iter.status()) { + switch (s.type) { + // if we get a heartbeat we got all the keys + case "heartbeat": + qi.push(() => { + qi.stop(); + }); + break; + } + } + })().then(); + // if they break from the iterator stop the consumer qi.iterClosed.then(() => { iter.stop(); @@ -899,6 +912,19 @@ export class Bucket implements KV { }, }); + (async () => { + for await (const s of iter.status()) { + switch (s.type) { + // if we get a heartbeat we got all the keys + case "heartbeat": + keys.push(() => { + keys.stop(); + }); + break; + } + } + })().then(); + iter.closed().then(() => { keys.push(() => { keys.stop();