Skip to content

Commit 975ef00

Browse files
committed
[FIX] ackAck() now provides a timeout option, and inherits default timeout from jetstream context.
1 parent b8fba59 commit 975ef00

File tree

5 files changed

+124
-44
lines changed

5 files changed

+124
-44
lines changed

jetstream/consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
206206
}
207207
} else {
208208
// push the user message
209-
this._push(toJsMsg(msg));
209+
this._push(toJsMsg(msg, this.consumer.api.timeout));
210210
this.received++;
211211
if (this.pending.msgs) {
212212
this.pending.msgs--;

jetstream/jsclient.ts

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ export class JetStreamClientImpl extends BaseApiClientImpl
323323
if (err) {
324324
throw err;
325325
}
326-
return toJsMsg(msg);
326+
return toJsMsg(msg, this.timeout);
327327
}
328328

329329
/*
@@ -436,7 +436,7 @@ export class JetStreamClientImpl extends BaseApiClientImpl
436436
// if we are doing heartbeats, message resets
437437
monitor?.work();
438438
qi.received++;
439-
qi.push(toJsMsg(msg));
439+
qi.push(toJsMsg(msg, this.timeout));
440440
}
441441
},
442442
});
@@ -700,7 +700,7 @@ export class JetStreamClientImpl extends BaseApiClientImpl
700700
jsi: JetStreamSubscriptionInfo,
701701
): TypedSubscriptionOptions<JsMsg> {
702702
const so = {} as TypedSubscriptionOptions<JsMsg>;
703-
so.adapter = msgAdapter(jsi.callbackFn === undefined);
703+
so.adapter = msgAdapter(jsi.callbackFn === undefined, this.timeout);
704704
so.ingestionFilterFn = JetStreamClientImpl.ingestionFn(jsi.ordered);
705705
so.protocolFilterFn = (jm, ingest = false): boolean => {
706706
const jsmi = jm as JsMsgImpl;
@@ -1026,44 +1026,48 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl
10261026
}
10271027
}
10281028

1029-
function msgAdapter(iterator: boolean): MsgAdapter<JsMsg> {
1029+
function msgAdapter(iterator: boolean, ackTimeout: number): MsgAdapter<JsMsg> {
10301030
if (iterator) {
1031-
return iterMsgAdapter;
1031+
return iterMsgAdapter(ackTimeout);
10321032
} else {
1033-
return cbMsgAdapter;
1033+
return cbMsgAdapter(ackTimeout);
10341034
}
10351035
}
10361036

1037-
function cbMsgAdapter(
1038-
err: NatsError | null,
1039-
msg: Msg,
1040-
): [NatsError | null, JsMsg | null] {
1041-
if (err) {
1042-
return [err, null];
1043-
}
1044-
err = checkJsError(msg);
1045-
if (err) {
1046-
return [err, null];
1047-
}
1048-
// assuming that the protocolFilterFn is set!
1049-
return [null, toJsMsg(msg)];
1037+
function cbMsgAdapter(ackTimeout: number): MsgAdapter<JsMsg> {
1038+
return (
1039+
err: NatsError | null,
1040+
msg: Msg,
1041+
): [NatsError | null, JsMsg | null] => {
1042+
if (err) {
1043+
return [err, null];
1044+
}
1045+
err = checkJsError(msg);
1046+
if (err) {
1047+
return [err, null];
1048+
}
1049+
// assuming that the protocolFilterFn is set!
1050+
return [null, toJsMsg(msg, ackTimeout)];
1051+
};
10501052
}
10511053

1052-
function iterMsgAdapter(
1053-
err: NatsError | null,
1054-
msg: Msg,
1055-
): [NatsError | null, JsMsg | null] {
1056-
if (err) {
1057-
return [err, null];
1058-
}
1059-
// iterator will close if we have an error
1060-
// check for errors that shouldn't close it
1061-
const ne = checkJsError(msg);
1062-
if (ne !== null) {
1063-
return [hideNonTerminalJsErrors(ne), null];
1064-
}
1065-
// assuming that the protocolFilterFn is set
1066-
return [null, toJsMsg(msg)];
1054+
function iterMsgAdapter(ackTimeout: number): MsgAdapter<JsMsg> {
1055+
return (
1056+
err: NatsError | null,
1057+
msg: Msg,
1058+
): [NatsError | null, JsMsg | null] => {
1059+
if (err) {
1060+
return [err, null];
1061+
}
1062+
// iterator will close if we have an error
1063+
// check for errors that shouldn't close it
1064+
const ne = checkJsError(msg);
1065+
if (ne !== null) {
1066+
return [hideNonTerminalJsErrors(ne), null];
1067+
}
1068+
// assuming that the protocolFilterFn is set
1069+
return [null, toJsMsg(msg, ackTimeout)];
1070+
};
10671071
}
10681072

10691073
function hideNonTerminalJsErrors(ne: NatsError): NatsError | null {

jetstream/jsmsg.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,11 @@ export interface JsMsg {
117117
* Indicate to the JetStream server that the message was processed
118118
* successfully and that the JetStream server should acknowledge back
119119
* that the acknowledgement was received.
120+
* @param opts are optional options (currently only a timeout value
121+
* if not specified uses the timeout specified in the JetStreamOptions
122+
* when creating the JetStream context.
120123
*/
121-
ackAck(): Promise<boolean>;
124+
ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;
122125

123126
/**
124127
* Convenience method to parse the message payload as JSON. This method
@@ -133,8 +136,8 @@ export interface JsMsg {
133136
string(): string;
134137
}
135138

136-
export function toJsMsg(m: Msg): JsMsg {
137-
return new JsMsgImpl(m);
139+
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
140+
return new JsMsgImpl(m, ackTimeout);
138141
}
139142

140143
export function parseInfo(s: string): DeliveryInfo {
@@ -172,10 +175,12 @@ export class JsMsgImpl implements JsMsg {
172175
msg: Msg;
173176
di?: DeliveryInfo;
174177
didAck: boolean;
178+
timeout: number;
175179

176-
constructor(msg: Msg) {
180+
constructor(msg: Msg, timeout: number) {
177181
this.msg = msg;
178182
this.didAck = false;
183+
this.timeout = timeout;
179184
}
180185

181186
get subject(): string {
@@ -228,16 +233,18 @@ export class JsMsgImpl implements JsMsg {
228233

229234
// this has to dig into the internals as the message has access
230235
// to the protocol but not the high-level client.
231-
async ackAck(): Promise<boolean> {
236+
async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
232237
const d = deferred<boolean>();
233238
if (!this.didAck) {
234239
this.didAck = true;
235240
if (this.msg.reply) {
241+
opts = opts || {};
242+
opts.timeout = opts.timeout || this.timeout;
236243
const mi = this.msg as MsgImpl;
237244
const proto = mi.publisher as unknown as ProtocolHandler;
238245
const trace = !(proto.options?.noAsyncTraces || false);
239246
const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
240-
timeout: 1000,
247+
timeout: this.timeout,
241248
}, trace);
242249
proto.request(r);
243250
try {

jetstream/tests/jetstream_test.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import type {
6262
import { defaultJsOptions } from "../jsbaseclient_api.ts";
6363
import {
6464
_setup,
65+
assertBetween,
6566
cleanup,
6667
jetstreamServerConf,
6768
Lock,
@@ -1104,9 +1105,9 @@ Deno.test("jetstream - backoff", async () => {
11041105
return v - start;
11051106
});
11061107

1107-
assert(delta[1] > 250 && delta[1] < 1000, `250 < ${delta[1]} < 1000`);
1108-
assert(delta[2] > 1250 && delta[2] < 1500, `1250 < ${delta[2]} < 1500`);
1109-
assert(delta[3] > 4250 && delta[3] < 4500, `4250 < ${delta[3]} < 4500`);
1108+
assertBetween(delta[1], 250, 1000);
1109+
assertBetween(delta[2], 1250, 1500);
1110+
assertBetween(delta[3], 4250, 4500);
11101111

11111112
await cleanup(ns, nc);
11121113
});

jetstream/tests/jsmsg_test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import type { JsMsgImpl } from "../jsmsg.ts";
2929
import { parseInfo, toJsMsg } from "../jsmsg.ts";
3030
import {
3131
_setup,
32+
assertBetween,
3233
cleanup,
3334
jetstreamServerConf,
3435
} from "../../test_helpers/mod.ts";
@@ -213,13 +214,80 @@ Deno.test("jsmsg - explicit consumer ackAck timeout", async () => {
213214
// change the subject
214215
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
215216
nc.subscribe("xxxx");
217+
const start = Date.now();
218+
await assertRejects(
219+
(): Promise<boolean> => {
220+
return jm!.ackAck({ timeout: 1500 });
221+
},
222+
Error,
223+
"TIMEOUT",
224+
);
225+
assertBetween(Date.now() - start, 1300, 1700);
226+
227+
await cleanup(ns, nc);
228+
});
229+
230+
Deno.test("jsmsg - ackAck js options timeout", async () => {
231+
const { ns, nc } = await _setup(connect, jetstreamServerConf());
232+
const jsm = await jetstreamManager(nc) as JetStreamManagerImpl;
233+
await jsm.streams.add({
234+
name: "A",
235+
subjects: ["a.>"],
236+
storage: StorageType.Memory,
237+
allow_direct: true,
238+
});
239+
240+
// default is 5000
241+
const js = jetstream(nc, { timeout: 1500 });
242+
await js.publish("a.a");
243+
244+
await jsm.consumers.add("A", { durable_name: "a" });
245+
const c = await js.consumers.get("A", "a");
246+
const jm = await c.next();
247+
// change the subject
248+
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
249+
nc.subscribe("xxxx");
250+
const start = Date.now();
251+
await assertRejects(
252+
(): Promise<boolean> => {
253+
return jm!.ackAck();
254+
},
255+
Error,
256+
"TIMEOUT",
257+
);
258+
assertBetween(Date.now() - start, 1300, 1700);
259+
260+
await cleanup(ns, nc);
261+
});
262+
263+
Deno.test("jsmsg - ackAck legacy timeout", async () => {
264+
const { ns, nc } = await _setup(connect, jetstreamServerConf());
265+
const jsm = await jetstreamManager(nc) as JetStreamManagerImpl;
266+
await jsm.streams.add({
267+
name: "A",
268+
subjects: ["a.>"],
269+
storage: StorageType.Memory,
270+
allow_direct: true,
271+
});
272+
273+
// default is 5000
274+
const js = jetstream(nc, { timeout: 1500 });
275+
await js.publish("a.a");
276+
277+
await jsm.consumers.add("A", { durable_name: "a" });
278+
const jm = await js.pull("A", "a");
279+
// change the subject
280+
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
281+
nc.subscribe("xxxx");
282+
const start = Date.now();
216283
await assertRejects(
217284
(): Promise<boolean> => {
218285
return jm!.ackAck();
219286
},
220287
Error,
221288
"TIMEOUT",
222289
);
290+
assertBetween(Date.now() - start, 1300, 1700);
223291

224292
await cleanup(ns, nc);
225293
});

0 commit comments

Comments
 (0)