Skip to content

Commit 13d5602

Browse files
committed
[MERGE] manual merge of ordered consumer updates from #699
1 parent 0d8d630 commit 13d5602

File tree

4 files changed

+206
-101
lines changed

4 files changed

+206
-101
lines changed

jetstream/consumer.ts

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ export class OrderedPullConsumerImpl implements Consumer {
886886
}
887887
const dseq = m.info.deliverySequence;
888888
if (dseq !== this.cursor.deliver_seq + 1) {
889-
this.reset(this.opts);
889+
this.notifyOrderedResetAndReset();
890890
return;
891891
}
892892
this.cursor.deliver_seq = dseq;
@@ -900,14 +900,37 @@ export class OrderedPullConsumerImpl implements Consumer {
900900
};
901901
}
902902

903-
async reset(opts: ConsumeOptions | FetchOptions = {
904-
max_messages: 100,
905-
expires: 30_000,
906-
} as ConsumeMessages, fromFetch = false): Promise<ConsumerMessages> {
907-
this.currentConsumer = await this.resetConsumer(
908-
this.cursor.stream_seq + 1,
909-
);
910-
if (this.iter === null) {
903+
async reset(
904+
opts: ConsumeOptions | FetchOptions = {
905+
max_messages: 100,
906+
expires: 30_000,
907+
} as ConsumeMessages,
908+
info?: Partial<{ fromFetch: boolean; orderedReset: boolean }>,
909+
): Promise<void> {
910+
info = info || {};
911+
// this is known to be directly related to a pull
912+
const fromFetch = info.fromFetch || false;
913+
// a sequence order caused the reset
914+
const orderedReset = info.orderedReset || false;
915+
916+
if (this.type === PullConsumerType.Fetch && orderedReset) {
917+
// the fetch pull simply needs to end the iterator
918+
this.iter?.src.stop();
919+
await this.iter?.closed();
920+
this.currentConsumer = null;
921+
return;
922+
}
923+
924+
if (this.currentConsumer === null || orderedReset) {
925+
this.currentConsumer = await this.resetConsumer(
926+
this.cursor.stream_seq + 1,
927+
);
928+
}
929+
930+
// if we don't have an iterator, or it is a fetch request
931+
// we create the iterator - otherwise this is a reset that is happening
932+
// while the OC is active, so simply bind the new OC to current iterator.
933+
if (this.iter === null || fromFetch) {
911934
this.iter = new OrderedConsumerMessages();
912935
}
913936
this.consumer = new PullConsumerImpl(this.api, this.currentConsumer);
@@ -920,19 +943,21 @@ export class OrderedPullConsumerImpl implements Consumer {
920943
msgs = await this.consumer.fetch(opts);
921944
} else if (this.type === PullConsumerType.Consume) {
922945
msgs = await this.consumer.consume(opts);
923-
} else {
924-
return Promise.reject("reset called with unset consumer type");
925946
}
926947
const msgsImpl = msgs as PullConsumerMessagesImpl;
927948
msgsImpl.forOrderedConsumer = true;
928949
msgsImpl.resetHandler = () => {
929-
this.reset(this.opts);
950+
this.notifyOrderedResetAndReset();
930951
};
931952
this.iter.setSource(msgsImpl);
932-
return this.iter;
933953
}
934954

935-
consume(opts: ConsumeOptions = {
955+
notifyOrderedResetAndReset() {
956+
this.iter?.notify(ConsumerDebugEvents.Reset, "");
957+
this.reset(this.opts, { orderedReset: true });
958+
}
959+
960+
async consume(opts: ConsumeOptions = {
936961
max_messages: 100,
937962
expires: 30_000,
938963
} as ConsumeMessages): Promise<ConsumerMessages> {
@@ -954,10 +979,11 @@ export class OrderedPullConsumerImpl implements Consumer {
954979
}
955980
this.type = PullConsumerType.Consume;
956981
this.opts = opts;
957-
return this.reset(opts);
982+
await this.reset(opts);
983+
return this.iter!;
958984
}
959985

960-
fetch(
986+
async fetch(
961987
opts: FetchOptions = { max_messages: 100, expires: 30_000 },
962988
): Promise<ConsumerMessages> {
963989
const copts = opts as ConsumeOptions;
@@ -985,7 +1011,8 @@ export class OrderedPullConsumerImpl implements Consumer {
9851011
this.type = PullConsumerType.Fetch;
9861012
this.opts = opts;
9871013
this.iter = new OrderedConsumerMessages();
988-
return this.reset(opts, true);
1014+
await this.reset(opts, { fromFetch: true });
1015+
return this.iter!;
9891016
}
9901017

9911018
async next(

jetstream/tests/consumers_ordered_test.ts

Lines changed: 157 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@ import {
2121
assertRejects,
2222
assertStringIncludes,
2323
} from "jsr:@std/assert";
24-
import { DeliverPolicy, jetstream, jetstreamManager } from "../mod.ts";
25-
import type { JsMsg } from "../mod.ts";
24+
import {
25+
ConsumerDebugEvents,
26+
DeliverPolicy,
27+
jetstream,
28+
jetstreamManager,
29+
} from "../mod.ts";
30+
import type { ConsumerMessages, JsMsg } from "../mod.ts";
2631
import type {
2732
OrderedConsumerMessages,
2833
OrderedPullConsumerImpl,
@@ -94,87 +99,6 @@ Deno.test("ordered consumers - fetch", async () => {
9499
await cleanup(ns, nc);
95100
});
96101

97-
Deno.test("ordered consumers - fetch reset", async () => {
98-
const { ns, nc } = await _setup(connect, jetstreamServerConf());
99-
const js = jetstream(nc);
100-
101-
const jsm = await jetstreamManager(nc);
102-
await jsm.streams.add({ name: "test", subjects: ["test.*"] });
103-
await js.publish("test.a");
104-
await js.publish("test.b");
105-
await js.publish("test.c");
106-
107-
const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
108-
assertExists(oc);
109-
110-
const seen: number[] = new Array(3).fill(0);
111-
let done = deferred();
112-
113-
const callback = (m: JsMsg) => {
114-
const idx = m.seq - 1;
115-
seen[idx]++;
116-
// mess with the internals so we see these again
117-
if (seen[idx] === 1) {
118-
oc.cursor.deliver_seq--;
119-
oc.cursor.stream_seq--;
120-
}
121-
iter.stop();
122-
done.resolve();
123-
};
124-
125-
let iter = await oc.fetch({
126-
max_messages: 1,
127-
//@ts-ignore: callback not exposed
128-
callback,
129-
});
130-
await done;
131-
done = deferred();
132-
133-
iter = await oc.fetch({
134-
max_messages: 1,
135-
//@ts-ignore: callback not exposed
136-
callback,
137-
});
138-
await done;
139-
done = deferred();
140-
141-
iter = await oc.fetch({
142-
max_messages: 1,
143-
//@ts-ignore: callback not exposed
144-
callback,
145-
});
146-
await done;
147-
done = deferred();
148-
149-
iter = await oc.fetch({
150-
max_messages: 1,
151-
//@ts-ignore: callback not exposed
152-
callback,
153-
});
154-
await done;
155-
done = deferred();
156-
157-
iter = await oc.fetch({
158-
max_messages: 1,
159-
//@ts-ignore: callback not exposed
160-
callback,
161-
});
162-
await done;
163-
done = deferred();
164-
165-
iter = await oc.fetch({
166-
max_messages: 1,
167-
//@ts-ignore: callback not exposed
168-
callback,
169-
});
170-
await done;
171-
172-
assertEquals(seen, [2, 2, 2]);
173-
assertEquals(oc.serial, 6);
174-
175-
await cleanup(ns, nc);
176-
});
177-
178102
Deno.test("ordered consumers - consume reset", async () => {
179103
const { ns, nc } = await _setup(connect, jetstreamServerConf());
180104
const js = jetstream(nc);
@@ -985,3 +909,153 @@ Deno.test("ordered consumers - name prefix", async () => {
985909

986910
await cleanup(ns, nc);
987911
});
912+
913+
Deno.test("ordered consumers - fetch reset", async () => {
914+
const { ns, nc } = await _setup(connect, jetstreamServerConf());
915+
const jsm = await jetstreamManager(nc);
916+
917+
await jsm.streams.add({ name: "A", subjects: ["a"] });
918+
const js = jetstream(nc);
919+
await js.publish("a", JSON.stringify(1));
920+
921+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
922+
923+
let resets = 0;
924+
function countResets(iter: ConsumerMessages): Promise<void> {
925+
return (async () => {
926+
for await (const s of await iter.status()) {
927+
if (s.type === ConsumerDebugEvents.Reset) {
928+
resets++;
929+
}
930+
}
931+
})();
932+
}
933+
934+
// after the first message others will get published
935+
let iter = await c.fetch({ max_messages: 10, expires: 3_000 });
936+
const first = countResets(iter);
937+
const sequences = [];
938+
for await (const m of iter) {
939+
sequences.push(m.json());
940+
// mess with the internal state to cause a reset
941+
if (m.seq === 1) {
942+
c.cursor.deliver_seq = 3;
943+
const buf = [];
944+
for (let i = 2; i < 20; i++) {
945+
buf.push(js.publish("a", JSON.stringify(i)));
946+
}
947+
await Promise.all(buf);
948+
}
949+
}
950+
951+
iter = await c.fetch({ max_messages: 10, expires: 2_000 });
952+
const second = countResets(iter);
953+
954+
const done = (async () => {
955+
for await (const m of iter) {
956+
sequences.push(m.json());
957+
}
958+
})().catch();
959+
960+
await Promise.all([first, second, done]);
961+
assertEquals(c.serial, 2);
962+
assertEquals(resets, 1);
963+
assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
964+
await cleanup(ns, nc);
965+
});
966+
967+
Deno.test("ordered consumers - consume reset", async () => {
968+
const { ns, nc } = await _setup(connect, jetstreamServerConf());
969+
const jsm = await jetstreamManager(nc);
970+
971+
await jsm.streams.add({ name: "A", subjects: ["a"] });
972+
const js = jetstream(nc);
973+
await js.publish("a", JSON.stringify(1));
974+
975+
let resets = 0;
976+
function countResets(iter: ConsumerMessages): Promise<void> {
977+
return (async () => {
978+
for await (const s of await iter.status()) {
979+
if (s.type === ConsumerDebugEvents.Reset) {
980+
resets++;
981+
}
982+
}
983+
})();
984+
}
985+
986+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
987+
988+
// after the first message others will get published
989+
let iter = await c.consume({ max_messages: 11, expires: 5000 });
990+
countResets(iter).catch();
991+
const sequences = [];
992+
for await (const m of iter) {
993+
sequences.push(m.json());
994+
// mess with the internal state to cause a reset
995+
if (m.seq === 1) {
996+
c.cursor.deliver_seq = 3;
997+
const buf = [];
998+
for (let i = 2; i < 20; i++) {
999+
buf.push(js.publish("a", JSON.stringify(i)));
1000+
}
1001+
await Promise.all(buf);
1002+
}
1003+
if (m.seq === 11) {
1004+
break;
1005+
}
1006+
}
1007+
1008+
assertEquals(c.serial, 2);
1009+
assertEquals(resets, 1);
1010+
assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
1011+
1012+
await cleanup(ns, nc);
1013+
});
1014+
1015+
Deno.test("ordered consumers - next reset", async () => {
1016+
const { ns, nc } = await _setup(connect, jetstreamServerConf());
1017+
const jsm = await jetstreamManager(nc);
1018+
1019+
await jsm.streams.add({ name: "A", subjects: ["a"] });
1020+
const js = jetstream(nc);
1021+
await js.publish("a", JSON.stringify(1));
1022+
await js.publish("a", JSON.stringify(2));
1023+
1024+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
1025+
1026+
// get the first
1027+
let m = await c.next({ expires: 1000 });
1028+
assertExists(m);
1029+
assertEquals(m.json(), 1);
1030+
1031+
// force a reset
1032+
c.cursor.deliver_seq = 3;
1033+
await js.publish("a", JSON.stringify(2));
1034+
1035+
m = await c.next({ expires: 1000 });
1036+
assertEquals(m, null);
1037+
assertEquals(c.serial, 1);
1038+
1039+
await cleanup(ns, nc);
1040+
});
1041+
1042+
Deno.test("ordered consumers - next reset", async () => {
1043+
const { ns, nc } = await _setup(connect, jetstreamServerConf());
1044+
const jsm = await jetstreamManager(nc);
1045+
1046+
await jsm.streams.add({ name: "A", subjects: ["a"] });
1047+
const js = jetstream(nc);
1048+
1049+
await js.publish("a", JSON.stringify(1));
1050+
await js.publish("a", JSON.stringify(2));
1051+
1052+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
1053+
await c.next();
1054+
await c.next();
1055+
1056+
assertEquals(c.serial, 1);
1057+
await c.info();
1058+
assertEquals(c.serial, 1);
1059+
1060+
await cleanup(ns, nc);
1061+
});

jetstream/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,10 @@ export enum ConsumerDebugEvents {
583583
* have the format of `{msgsLeft: number, bytesLeft: number}`.
584584
*/
585585
Discard = "discard",
586+
/**
587+
* Notifies that the current consumer will be reset
588+
*/
589+
Reset = "reset",
586590
/**
587591
* Notifies whenever there's a request for additional messages from the server.
588592
* This notification telegraphs the request options, which should be treated as

kv/tests/kv_test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import {
3737
jetstream,
3838
jetstreamManager,
3939
StorageType,
40-
} from "../../jetstream/mod.ts";
40+
} from "jsr:@nats-io/jetstream@3.0.0-3";
4141

4242
import type {
4343
JetStreamOptions,

0 commit comments

Comments
 (0)