Skip to content

Commit e266cc2

Browse files
committed
[FIX] improved ordered consumer - api calls on the consumer no longer re-create the consumer
1 parent 8fe83cc commit e266cc2

File tree

2 files changed

+205
-100
lines changed

2 files changed

+205
-100
lines changed

jetstream/consumer.ts

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ export enum ConsumerDebugEvents {
239239
* have the format of `{msgsLeft: number, bytesLeft: number}`.
240240
*/
241241
Discard = "discard",
242+
243+
/**
244+
* Notifies that the current consumer will be reset
245+
*/
246+
Reset = "reset",
242247
/**
243248
* Notifies whenever there's a request for additional messages from the server.
244249
* This notification telegraphs the request options, which should be treated as
@@ -1110,7 +1115,7 @@ export class OrderedPullConsumerImpl implements Consumer {
11101115
}
11111116
const dseq = m.info.deliverySequence;
11121117
if (dseq !== this.cursor.deliver_seq + 1) {
1113-
this.reset(this.opts);
1118+
this.notifyOrderedResetAndReset();
11141119
return;
11151120
}
11161121
this.cursor.deliver_seq = dseq;
@@ -1124,14 +1129,37 @@ export class OrderedPullConsumerImpl implements Consumer {
11241129
};
11251130
}
11261131

1127-
async reset(opts: ConsumeOptions | FetchOptions = {
1128-
max_messages: 100,
1129-
expires: 30_000,
1130-
} as ConsumeMessages, fromFetch = false): Promise<ConsumerMessages> {
1131-
this.currentConsumer = await this.resetConsumer(
1132-
this.cursor.stream_seq + 1,
1133-
);
1134-
if (this.iter === null) {
1132+
async reset(
1133+
opts: ConsumeOptions | FetchOptions = {
1134+
max_messages: 100,
1135+
expires: 30_000,
1136+
} as ConsumeMessages,
1137+
info?: Partial<{ fromFetch: boolean; orderedReset: boolean }>,
1138+
): Promise<void> {
1139+
info = info || {};
1140+
// this is known to be directly related to a pull
1141+
const fromFetch = info.fromFetch || false;
1142+
// a sequence order caused the reset
1143+
const orderedReset = info.orderedReset || false;
1144+
1145+
if (this.type === PullConsumerType.Fetch && orderedReset) {
1146+
// the fetch pull simply needs to end the iterator
1147+
this.iter?.src.stop();
1148+
await this.iter?.closed();
1149+
this.currentConsumer = null;
1150+
return;
1151+
}
1152+
1153+
if (this.currentConsumer === null || orderedReset) {
1154+
this.currentConsumer = await this.resetConsumer(
1155+
this.cursor.stream_seq + 1,
1156+
);
1157+
}
1158+
1159+
// if we don't have an iterator, or it is a fetch request
1160+
// we create the iterator - otherwise this is a reset that is happening
1161+
// while the OC is active, so simply bind the new OC to current iterator.
1162+
if (this.iter === null || fromFetch) {
11351163
this.iter = new OrderedConsumerMessages();
11361164
}
11371165
this.consumer = new PullConsumerImpl(this.api, this.currentConsumer);
@@ -1144,19 +1172,21 @@ export class OrderedPullConsumerImpl implements Consumer {
11441172
msgs = await this.consumer.fetch(opts);
11451173
} else if (this.type === PullConsumerType.Consume) {
11461174
msgs = await this.consumer.consume(opts);
1147-
} else {
1148-
return Promise.reject("reset called with unset consumer type");
11491175
}
11501176
const msgsImpl = msgs as PullConsumerMessagesImpl;
11511177
msgsImpl.forOrderedConsumer = true;
11521178
msgsImpl.resetHandler = () => {
1153-
this.reset(this.opts);
1179+
this.notifyOrderedResetAndReset();
11541180
};
11551181
this.iter.setSource(msgsImpl);
1156-
return this.iter;
11571182
}
11581183

1159-
consume(opts: ConsumeOptions = {
1184+
notifyOrderedResetAndReset() {
1185+
this.iter?.notify(ConsumerDebugEvents.Reset, "");
1186+
this.reset(this.opts, { orderedReset: true });
1187+
}
1188+
1189+
async consume(opts: ConsumeOptions = {
11601190
max_messages: 100,
11611191
expires: 30_000,
11621192
} as ConsumeMessages): Promise<ConsumerMessages> {
@@ -1178,10 +1208,11 @@ export class OrderedPullConsumerImpl implements Consumer {
11781208
}
11791209
this.type = PullConsumerType.Consume;
11801210
this.opts = opts;
1181-
return this.reset(opts);
1211+
await this.reset(opts);
1212+
return this.iter!;
11821213
}
11831214

1184-
fetch(
1215+
async fetch(
11851216
opts: FetchOptions = { max_messages: 100, expires: 30_000 },
11861217
): Promise<ConsumerMessages> {
11871218
const copts = opts as ConsumeOptions;
@@ -1208,8 +1239,8 @@ export class OrderedPullConsumerImpl implements Consumer {
12081239
}
12091240
this.type = PullConsumerType.Fetch;
12101241
this.opts = opts;
1211-
this.iter = new OrderedConsumerMessages();
1212-
return this.reset(opts, true);
1242+
await this.reset(opts, { fromFetch: true });
1243+
return this.iter!;
12131244
}
12141245

12151246
async next(

jetstream/tests/consumers_ordered_test.ts

Lines changed: 156 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ import {
2121
assertRejects,
2222
assertStringIncludes,
2323
} from "https://deno.land/std@0.221.0/assert/mod.ts";
24-
import { DeliverPolicy, JsMsg } from "../mod.ts";
24+
import {
25+
ConsumerDebugEvents,
26+
ConsumerMessages,
27+
DeliverPolicy,
28+
JsMsg,
29+
} from "../mod.ts";
2530
import {
2631
OrderedConsumerMessages,
2732
OrderedPullConsumerImpl,
@@ -89,87 +94,6 @@ Deno.test("ordered consumers - fetch", async () => {
8994
await cleanup(ns, nc);
9095
});
9196

92-
Deno.test("ordered consumers - fetch reset", async () => {
93-
const { ns, nc } = await setup(jetstreamServerConf());
94-
const js = nc.jetstream();
95-
96-
const jsm = await nc.jetstreamManager();
97-
await jsm.streams.add({ name: "test", subjects: ["test.*"] });
98-
await js.publish("test.a");
99-
await js.publish("test.b");
100-
await js.publish("test.c");
101-
102-
const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
103-
assertExists(oc);
104-
105-
const seen: number[] = new Array(3).fill(0);
106-
let done = deferred();
107-
108-
const callback = (m: JsMsg) => {
109-
const idx = m.seq - 1;
110-
seen[idx]++;
111-
// mess with the internals so we see these again
112-
if (seen[idx] === 1) {
113-
oc.cursor.deliver_seq--;
114-
oc.cursor.stream_seq--;
115-
}
116-
iter.stop();
117-
done.resolve();
118-
};
119-
120-
let iter = await oc.fetch({
121-
max_messages: 1,
122-
//@ts-ignore: callback not exposed
123-
callback,
124-
});
125-
await done;
126-
done = deferred();
127-
128-
iter = await oc.fetch({
129-
max_messages: 1,
130-
//@ts-ignore: callback not exposed
131-
callback,
132-
});
133-
await done;
134-
done = deferred();
135-
136-
iter = await oc.fetch({
137-
max_messages: 1,
138-
//@ts-ignore: callback not exposed
139-
callback,
140-
});
141-
await done;
142-
done = deferred();
143-
144-
iter = await oc.fetch({
145-
max_messages: 1,
146-
//@ts-ignore: callback not exposed
147-
callback,
148-
});
149-
await done;
150-
done = deferred();
151-
152-
iter = await oc.fetch({
153-
max_messages: 1,
154-
//@ts-ignore: callback not exposed
155-
callback,
156-
});
157-
await done;
158-
done = deferred();
159-
160-
iter = await oc.fetch({
161-
max_messages: 1,
162-
//@ts-ignore: callback not exposed
163-
callback,
164-
});
165-
await done;
166-
167-
assertEquals(seen, [2, 2, 2]);
168-
assertEquals(oc.serial, 6);
169-
170-
await cleanup(ns, nc);
171-
});
172-
17397
Deno.test("ordered consumers - consume reset", async () => {
17498
const { ns, nc } = await setup(jetstreamServerConf());
17599
const js = nc.jetstream();
@@ -978,3 +902,153 @@ Deno.test("ordered consumers - name prefix", async () => {
978902

979903
await cleanup(ns, nc);
980904
});
905+
906+
Deno.test("ordered consumers - fetch reset", async () => {
907+
const { ns, nc } = await setup(jetstreamServerConf());
908+
const jsm = await nc.jetstreamManager();
909+
910+
await jsm.streams.add({ name: "A", subjects: ["a"] });
911+
const js = nc.jetstream();
912+
await js.publish("a", JSON.stringify(1));
913+
914+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
915+
916+
let resets = 0;
917+
function countResets(iter: ConsumerMessages): Promise<void> {
918+
return (async () => {
919+
for await (const s of await iter.status()) {
920+
if (s.type === ConsumerDebugEvents.Reset) {
921+
resets++;
922+
}
923+
}
924+
})();
925+
}
926+
927+
// after the first message others will get published
928+
let iter = await c.fetch({ max_messages: 10, expires: 3_000 });
929+
const first = countResets(iter);
930+
const sequences = [];
931+
for await (const m of iter) {
932+
sequences.push(m.json());
933+
// mess with the internal state to cause a reset
934+
if (m.seq === 1) {
935+
c.cursor.deliver_seq = 3;
936+
const buf = [];
937+
for (let i = 2; i < 20; i++) {
938+
buf.push(js.publish("a", JSON.stringify(i)));
939+
}
940+
await Promise.all(buf);
941+
}
942+
}
943+
944+
iter = await c.fetch({ max_messages: 10, expires: 2_000 });
945+
const second = countResets(iter);
946+
947+
const done = (async () => {
948+
for await (const m of iter) {
949+
sequences.push(m.json());
950+
}
951+
})().catch();
952+
953+
await Promise.all([first, second, done]);
954+
assertEquals(c.serial, 2);
955+
assertEquals(resets, 1);
956+
assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
957+
await cleanup(ns, nc);
958+
});
959+
960+
Deno.test("ordered consumers - consume reset", async () => {
961+
const { ns, nc } = await setup(jetstreamServerConf());
962+
const jsm = await nc.jetstreamManager();
963+
964+
await jsm.streams.add({ name: "A", subjects: ["a"] });
965+
const js = nc.jetstream();
966+
await js.publish("a", JSON.stringify(1));
967+
968+
let resets = 0;
969+
function countResets(iter: ConsumerMessages): Promise<void> {
970+
return (async () => {
971+
for await (const s of await iter.status()) {
972+
if (s.type === ConsumerDebugEvents.Reset) {
973+
resets++;
974+
}
975+
}
976+
})();
977+
}
978+
979+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
980+
981+
// after the first message others will get published
982+
let iter = await c.consume({ max_messages: 11, expires: 5000 });
983+
countResets(iter).catch();
984+
const sequences = [];
985+
for await (const m of iter) {
986+
sequences.push(m.json());
987+
// mess with the internal state to cause a reset
988+
if (m.seq === 1) {
989+
c.cursor.deliver_seq = 3;
990+
const buf = [];
991+
for (let i = 2; i < 20; i++) {
992+
buf.push(js.publish("a", JSON.stringify(i)));
993+
}
994+
await Promise.all(buf);
995+
}
996+
if (m.seq === 11) {
997+
break;
998+
}
999+
}
1000+
1001+
assertEquals(c.serial, 2);
1002+
assertEquals(resets, 1);
1003+
assertEquals(sequences, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
1004+
1005+
await cleanup(ns, nc);
1006+
});
1007+
1008+
Deno.test("ordered consumers - next reset", async () => {
1009+
const { ns, nc } = await setup(jetstreamServerConf());
1010+
const jsm = await nc.jetstreamManager();
1011+
1012+
await jsm.streams.add({ name: "A", subjects: ["a"] });
1013+
const js = nc.jetstream();
1014+
await js.publish("a", JSON.stringify(1));
1015+
await js.publish("a", JSON.stringify(2));
1016+
1017+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
1018+
1019+
// get the first
1020+
let m = await c.next({ expires: 1000 });
1021+
assertExists(m);
1022+
assertEquals(m.json(), 1);
1023+
1024+
// force a reset
1025+
c.cursor.deliver_seq = 3;
1026+
await js.publish("a", JSON.stringify(2));
1027+
1028+
m = await c.next({ expires: 1000 });
1029+
assertEquals(m, null);
1030+
assertEquals(c.serial, 1);
1031+
1032+
await cleanup(ns, nc);
1033+
});
1034+
1035+
Deno.test("ordered consumers - next reset", async () => {
1036+
const { ns, nc } = await setup(jetstreamServerConf());
1037+
const jsm = await nc.jetstreamManager();
1038+
1039+
await jsm.streams.add({ name: "A", subjects: ["a"] });
1040+
const js = nc.jetstream();
1041+
1042+
await js.publish("a", JSON.stringify(1));
1043+
await js.publish("a", JSON.stringify(2));
1044+
1045+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
1046+
await c.next();
1047+
await c.next();
1048+
1049+
assertEquals(c.serial, 1);
1050+
await c.info();
1051+
assertEquals(c.serial, 1);
1052+
1053+
await cleanup(ns, nc);
1054+
});

0 commit comments

Comments
 (0)