Skip to content

Commit f7c41a8

Browse files
committed
[FIX] fixed an issue with ordered consumer where resetting the consumer didn't send the UNSUB, creating a situation where if there was no disconnect from the client, two subscriptions would be active on the server.
1 parent 7fe57c3 commit f7c41a8

File tree

4 files changed

+130
-2
lines changed

4 files changed

+130
-2
lines changed

jetstream/jsclient.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,8 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
795795
this.js._request(subj, req, { retries: -1 })
796796
.then((v) => {
797797
const ci = v as ConsumerInfo;
798+
const jinfo = this.sub.info as JetStreamSubscriptionInfo;
799+
jinfo.last = ci;
798800
this.info!.config = ci.config;
799801
this.info!.name = ci.name;
800802
})

nats-base-client/protocol.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
995995
if (!s || this.isClosed()) {
996996
return;
997997
}
998+
this.unsub(s);
998999
s.subject = subject;
9991000
this.subscriptions.resub(s);
10001001
// we don't auto-unsub here because we don't

tests/helpers/launcher.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,45 @@ export interface JSZ {
7272
};
7373
}
7474

75+
export interface SubDetails {
76+
subject: string;
77+
sid: string;
78+
msgs: number;
79+
cid: number;
80+
}
81+
82+
export interface Conn {
83+
cid: number;
84+
kind: string;
85+
type: string;
86+
ip: string;
87+
port: number;
88+
start: string;
89+
"last_activity": string;
90+
"rtt": string;
91+
uptime: string;
92+
idle: string;
93+
"pending_bytes": number;
94+
"in_msgs": number;
95+
"out_msgs": number;
96+
subscriptions: number;
97+
name: string;
98+
lang: string;
99+
version: string;
100+
subscriptions_list?: string[];
101+
subscriptions_list_detail?: SubDetails[];
102+
}
103+
104+
export interface ConnZ {
105+
"server_id": string;
106+
now: string;
107+
"num_connections": number;
108+
"total": number;
109+
"offset": number;
110+
"limit": number;
111+
"connections": Conn[];
112+
}
113+
75114
function parseHostport(
76115
s?: string,
77116
): { hostname: string; port: number } | undefined {
@@ -316,6 +355,21 @@ export class NatsServer implements PortInfo {
316355
return await resp.json();
317356
}
318357

358+
async connz(cid?: number, subs: boolean | "detail" = true): Promise<ConnZ> {
359+
if (!this.monitoring) {
360+
return Promise.reject(new Error("server is not monitoring"));
361+
}
362+
const args = [];
363+
args.push(`subs=${subs}`);
364+
if (cid) {
365+
args.push(`cid=${cid}`);
366+
}
367+
368+
const qs = args.length ? args.join("&") : "";
369+
const resp = await fetch(`http://127.0.0.1:${this.monitoring}/connz?${qs}`);
370+
return await resp.json();
371+
}
372+
319373
async dataDir(): Promise<string | null> {
320374
const jsz = await this.jsz();
321375
return jsz.config.store_dir;

tests/resub_test.ts

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515

1616
import { cleanup, setup } from "./helpers/mod.ts";
1717
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
18-
import { assertEquals } from "https://deno.land/std@0.221.0/assert/mod.ts";
19-
import { createInbox, Msg } from "../nats-base-client/core.ts";
18+
import {
19+
assertEquals,
20+
assertExists,
21+
fail,
22+
} from "https://deno.land/std@0.221.0/assert/mod.ts";
23+
import { createInbox, Msg, NatsConnection } from "../nats-base-client/core.ts";
24+
import { NatsServer } from "./helpers/launcher.ts";
2025

2126
Deno.test("resub - iter", async () => {
2227
const { ns, nc } = await setup();
@@ -75,3 +80,69 @@ Deno.test("resub - callback", async () => {
7580
assertEquals(buf[1].subject, subjb);
7681
await cleanup(ns, nc);
7782
});
83+
84+
async function assertEqualSubs(
85+
ns: NatsServer,
86+
nc: NatsConnection,
87+
): Promise<void> {
88+
const nci = nc as NatsConnectionImpl;
89+
const cid = nc.info?.client_id || -1;
90+
if (cid === -1) {
91+
fail("client_id not found");
92+
}
93+
94+
const connz = await ns.connz(cid, "detail");
95+
96+
const conn = connz.connections.find((c) => {
97+
return c.cid === cid;
98+
});
99+
assertExists(conn);
100+
assertExists(conn.subscriptions_list_detail);
101+
102+
const subs = nci.protocol.subscriptions.all();
103+
subs.forEach((sub) => {
104+
const ssub = conn.subscriptions_list_detail?.find((d) => {
105+
return d.sid === `${sub.sid}`;
106+
});
107+
assertExists(ssub);
108+
assertEquals(ssub.subject, sub.subject);
109+
});
110+
}
111+
112+
Deno.test("resub - removes server interest", async () => {
113+
const { ns, nc } = await setup();
114+
115+
nc.subscribe("a", {
116+
callback() {
117+
// nothing
118+
},
119+
});
120+
121+
const nci = nc as NatsConnectionImpl;
122+
let sub = nci.protocol.subscriptions.all().find((s) => {
123+
return s.subject === "a";
124+
});
125+
assertExists(sub);
126+
127+
// assert the server sees the same subscriptions
128+
await assertEqualSubs(ns, nc);
129+
130+
// change it
131+
nci._resub(sub, "b");
132+
133+
// make sure we don't find a
134+
sub = nci.protocol.subscriptions.all().find((s) => {
135+
return s.subject === "a";
136+
});
137+
assertEquals(sub, undefined);
138+
139+
// make sure we find b
140+
sub = nci.protocol.subscriptions.all().find((s) => {
141+
return s.subject === "b";
142+
});
143+
assertExists(sub);
144+
145+
// assert server thinks the same thing
146+
await assertEqualSubs(ns, nc);
147+
await cleanup(ns, nc);
148+
});

0 commit comments

Comments
 (0)