Skip to content

Commit 786c994

Browse files
committed
[FIX] [JS] JsMsg#ackAck() didn't provide a way to customize the timeout. Added optional option with timeout for ackAck(), and also defaulted the timeout to match the JSClient's timeout (as provided by the JetStreamOptions)
1 parent 8b7c1f4 commit 786c994

File tree

3 files changed

+51
-40
lines changed

3 files changed

+51
-40
lines changed

jetstream/consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
420420
}
421421
} else {
422422
// push the user message
423-
this._push(toJsMsg(msg));
423+
this._push(toJsMsg(msg, this.consumer.api.timeout));
424424
this.received++;
425425
if (this.pending.msgs) {
426426
this.pending.msgs--;

jetstream/jsclient.ts

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ export class JetStreamClientImpl extends BaseApiClient
276276
if (err) {
277277
throw err;
278278
}
279-
return toJsMsg(msg);
279+
return toJsMsg(msg, this.timeout);
280280
}
281281

282282
/*
@@ -389,7 +389,7 @@ export class JetStreamClientImpl extends BaseApiClient
389389
// if we are doing heartbeats, message resets
390390
monitor?.work();
391391
qi.received++;
392-
qi.push(toJsMsg(msg));
392+
qi.push(toJsMsg(msg, this.timeout));
393393
}
394394
},
395395
});
@@ -653,7 +653,7 @@ export class JetStreamClientImpl extends BaseApiClient
653653
jsi: JetStreamSubscriptionInfo,
654654
): TypedSubscriptionOptions<JsMsg> {
655655
const so = {} as TypedSubscriptionOptions<JsMsg>;
656-
so.adapter = msgAdapter(jsi.callbackFn === undefined);
656+
so.adapter = msgAdapter(jsi.callbackFn === undefined, this.timeout);
657657
so.ingestionFilterFn = JetStreamClientImpl.ingestionFn(jsi.ordered);
658658
so.protocolFilterFn = (jm, ingest = false): boolean => {
659659
const jsmi = jm as JsMsgImpl;
@@ -979,44 +979,50 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl
979979
}
980980
}
981981

982-
function msgAdapter(iterator: boolean): MsgAdapter<JsMsg> {
982+
function msgAdapter(iterator: boolean, ackTimeout: number): MsgAdapter<JsMsg> {
983983
if (iterator) {
984-
return iterMsgAdapter;
984+
return iterMsgAdapter(ackTimeout);
985985
} else {
986-
return cbMsgAdapter;
986+
return cbMsgAdapter(ackTimeout);
987987
}
988988
}
989989

990-
function cbMsgAdapter(
991-
err: NatsError | null,
992-
msg: Msg,
993-
): [NatsError | null, JsMsg | null] {
994-
if (err) {
995-
return [err, null];
996-
}
997-
err = checkJsError(msg);
998-
if (err) {
999-
return [err, null];
1000-
}
1001-
// assuming that the protocolFilterFn is set!
1002-
return [null, toJsMsg(msg)];
990+
function cbMsgAdapter(ackTimeout: number): MsgAdapter<JsMsg> {
991+
return (
992+
err: NatsError | null,
993+
msg: Msg,
994+
): [NatsError | null, JsMsg | null] => {
995+
if (err) {
996+
return [err, null];
997+
}
998+
err = checkJsError(msg);
999+
if (err) {
1000+
return [err, null];
1001+
}
1002+
// assuming that the protocolFilterFn is set!
1003+
return [null, toJsMsg(msg, ackTimeout)];
1004+
};
10031005
}
10041006

10051007
function iterMsgAdapter(
1006-
err: NatsError | null,
1007-
msg: Msg,
1008-
): [NatsError | null, JsMsg | null] {
1009-
if (err) {
1010-
return [err, null];
1011-
}
1012-
// iterator will close if we have an error
1013-
// check for errors that shouldn't close it
1014-
const ne = checkJsError(msg);
1015-
if (ne !== null) {
1016-
return [hideNonTerminalJsErrors(ne), null];
1017-
}
1018-
// assuming that the protocolFilterFn is set
1019-
return [null, toJsMsg(msg)];
1008+
ackTimeout: number,
1009+
): MsgAdapter<JsMsg> {
1010+
return (
1011+
err: NatsError | null,
1012+
msg: Msg,
1013+
): [NatsError | null, JsMsg | null] => {
1014+
if (err) {
1015+
return [err, null];
1016+
}
1017+
// iterator will close if we have an error
1018+
// check for errors that shouldn't close it
1019+
const ne = checkJsError(msg);
1020+
if (ne !== null) {
1021+
return [hideNonTerminalJsErrors(ne), null];
1022+
}
1023+
// assuming that the protocolFilterFn is set
1024+
return [null, toJsMsg(msg, ackTimeout)];
1025+
};
10201026
}
10211027

10221028
function hideNonTerminalJsErrors(ne: NatsError): NatsError | null {

jetstream/jsmsg.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ export interface JsMsg {
110110
* successfully and that the JetStream server should acknowledge back
111111
* that the acknowledgement was received.
112112
*/
113-
ackAck(): Promise<boolean>;
113+
ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;
114114

115115
/**
116116
* Convenience method to parse the message payload as JSON. This method
@@ -125,8 +125,8 @@ export interface JsMsg {
125125
string(): string;
126126
}
127127

128-
export function toJsMsg(m: Msg): JsMsg {
129-
return new JsMsgImpl(m);
128+
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
129+
return new JsMsgImpl(m, ackTimeout);
130130
}
131131

132132
export function parseInfo(s: string): DeliveryInfo {
@@ -164,10 +164,12 @@ export class JsMsgImpl implements JsMsg {
164164
msg: Msg;
165165
di?: DeliveryInfo;
166166
didAck: boolean;
167+
timeout: number;
167168

168-
constructor(msg: Msg) {
169+
constructor(msg: Msg, timeout: number) {
169170
this.msg = msg;
170171
this.didAck = false;
172+
this.timeout = timeout;
171173
}
172174

173175
get subject(): string {
@@ -220,7 +222,10 @@ export class JsMsgImpl implements JsMsg {
220222

221223
// this has to dig into the internals as the message has access
222224
// to the protocol but not the high-level client.
223-
async ackAck(): Promise<boolean> {
225+
async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
226+
opts = opts || {};
227+
opts.timeout = opts.timeout || this.timeout;
228+
console.log(opts.timeout);
224229
const d = deferred<boolean>();
225230
if (!this.didAck) {
226231
this.didAck = true;
@@ -229,7 +234,7 @@ export class JsMsgImpl implements JsMsg {
229234
const proto = mi.publisher as unknown as ProtocolHandler;
230235
const trace = !(proto.options?.noAsyncTraces || false);
231236
const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
232-
timeout: 1000,
237+
timeout: opts.timeout,
233238
}, trace);
234239
proto.request(r);
235240
try {

0 commit comments

Comments
 (0)