Skip to content

Commit 4fc3e17

Browse files
committed
[TEST] flappers that were related to jetstream clustering
[TEST] fixed tests that imported utils from other tests and were run twice.
1 parent a655743 commit 4fc3e17

File tree

6 files changed

+105
-72
lines changed

6 files changed

+105
-72
lines changed

jetstream/tests/consumers_consume_test.ts renamed to jetstream/tests/consume_test.ts

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import {
1717
cleanup,
1818
jetstreamServerConf,
19+
NatsServer,
1920
setup,
2021
} from "../../tests/helpers/mod.ts";
2122
import { setupStreamAndConsumer } from "../../examples/jetstream/util.ts";
@@ -25,13 +26,17 @@ import {
2526
assertExists,
2627
assertRejects,
2728
} from "https://deno.land/std@0.221.0/assert/mod.ts";
28-
import { consumerHbTest } from "./consumers_test.ts";
2929
import { initStream } from "./jstest_util.ts";
3030
import { AckPolicy, DeliverPolicy } from "../jsapi_types.ts";
3131
import { deadline, deferred, delay } from "../../nats-base-client/util.ts";
3232
import { nanos } from "../jsutil.ts";
33-
import { ConsumerEvents, PullConsumerMessagesImpl } from "../consumer.ts";
33+
import {
34+
ConsumerEvents,
35+
ConsumerStatus,
36+
PullConsumerMessagesImpl,
37+
} from "../consumer.ts";
3438
import { syncIterator } from "../../nats-base-client/core.ts";
39+
import { connect } from "../../src/connect.ts";
3540

3641
Deno.test("consumers - consume", async () => {
3742
const { ns, nc } = await setup(jetstreamServerConf());
@@ -88,11 +93,60 @@ Deno.test("consumers - consume callback rejects iter", async () => {
8893
await cleanup(ns, nc);
8994
});
9095

91-
Deno.test("consumers - consume heartbeats", async () => {
92-
await consumerHbTest(false);
96+
Deno.test("consume - heartbeats", async () => {
97+
const servers = await NatsServer.setupDataConnCluster(4);
98+
const nc = await connect({ port: servers[0].port });
99+
const { stream } = await initStream(nc);
100+
const jsm = await nc.jetstreamManager();
101+
await jsm.consumers.add(stream, {
102+
durable_name: "a",
103+
ack_policy: AckPolicy.Explicit,
104+
});
105+
106+
const js = nc.jetstream();
107+
const c = await js.consumers.get(stream, "a");
108+
const iter = await c.consume({
109+
max_messages: 100,
110+
idle_heartbeat: 1000,
111+
expires: 30000,
112+
});
113+
114+
const buf: Promise<void>[] = [];
115+
// stop the data serverss
116+
setTimeout(() => {
117+
buf.push(servers[1].stop());
118+
buf.push(servers[2].stop());
119+
buf.push(servers[3].stop());
120+
}, 1000);
121+
122+
await Promise.all(buf);
123+
124+
const d = deferred<ConsumerStatus>();
125+
126+
await (async () => {
127+
const status = await iter.status();
128+
for await (const s of status) {
129+
d.resolve(s);
130+
iter.stop();
131+
break;
132+
}
133+
})();
134+
135+
await (async () => {
136+
for await (const _r of iter) {
137+
// nothing
138+
}
139+
})();
140+
141+
const cs = await d;
142+
assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed);
143+
assertEquals(cs.data, 2);
144+
145+
await nc.close();
146+
await NatsServer.stopAll(servers, true);
93147
});
94148

95-
Deno.test("consumers - consume deleted consumer", async () => {
149+
Deno.test("consume - deleted consumer", async () => {
96150
const { ns, nc } = await setup(jetstreamServerConf({}));
97151
const { stream } = await initStream(nc);
98152
const jsm = await nc.jetstreamManager();
@@ -142,7 +196,7 @@ Deno.test("consumers - consume deleted consumer", async () => {
142196
await cleanup(ns, nc);
143197
});
144198

145-
Deno.test("consumers - sub leaks consume()", async () => {
199+
Deno.test("consume - sub leaks", async () => {
146200
const { ns, nc } = await setup(jetstreamServerConf());
147201
const { stream } = await initStream(nc);
148202

@@ -171,7 +225,7 @@ Deno.test("consumers - sub leaks consume()", async () => {
171225
await cleanup(ns, nc);
172226
});
173227

174-
Deno.test("consumers - consume drain", async () => {
228+
Deno.test("consume - drain", async () => {
175229
const { ns, nc } = await setup(jetstreamServerConf());
176230
const { stream } = await initStream(nc);
177231

@@ -198,7 +252,7 @@ Deno.test("consumers - consume drain", async () => {
198252
await cleanup(ns, nc);
199253
});
200254

201-
Deno.test("consumers - consume sync", async () => {
255+
Deno.test("consume - sync", async () => {
202256
const { ns, nc } = await setup(jetstreamServerConf());
203257
const jsm = await nc.jetstreamManager();
204258
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
@@ -225,7 +279,7 @@ Deno.test("consumers - consume sync", async () => {
225279
await cleanup(ns, nc);
226280
});
227281

228-
Deno.test("consumers - consume stream not found request abort", async () => {
282+
Deno.test("consume - stream not found request abort", async () => {
229283
const { ns, nc } = await setup(jetstreamServerConf());
230284

231285
const jsm = await nc.jetstreamManager();
@@ -258,7 +312,7 @@ Deno.test("consumers - consume stream not found request abort", async () => {
258312
await cleanup(ns, nc);
259313
});
260314

261-
Deno.test("consumers - consume consumer deleted request abort", async () => {
315+
Deno.test("consume - consumer deleted request abort", async () => {
262316
const { ns, nc } = await setup(jetstreamServerConf());
263317

264318
const jsm = await nc.jetstreamManager();
@@ -294,7 +348,7 @@ Deno.test("consumers - consume consumer deleted request abort", async () => {
294348
await cleanup(ns, nc);
295349
});
296350

297-
Deno.test("consumers - consume consumer not found request abort", async () => {
351+
Deno.test("consume - consumer not found request abort", async () => {
298352
const { ns, nc } = await setup(jetstreamServerConf());
299353

300354
const jsm = await nc.jetstreamManager();
@@ -328,7 +382,7 @@ Deno.test("consumers - consume consumer not found request abort", async () => {
328382
await cleanup(ns, nc);
329383
});
330384

331-
Deno.test("consumers - consume consumer bind", async () => {
385+
Deno.test("consume - consumer bind", async () => {
332386
const { ns, nc } = await setup(jetstreamServerConf());
333387

334388
const jsm = await nc.jetstreamManager();

jetstream/tests/consumers_test.ts

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,7 @@ Deno.test("consumers - push consumer not supported", async () => {
185185
});
186186

187187
Deno.test("consumers - fetch heartbeats", async () => {
188-
await consumerHbTest(true);
189-
});
190-
191-
export async function consumerHbTest(fetch: boolean) {
192-
const servers = await NatsServer.setupDataConnCluster();
188+
const servers = await NatsServer.setupDataConnCluster(4);
193189

194190
const nc = await connect({ port: servers[0].port });
195191
const { stream } = await initStream(nc);
@@ -201,25 +197,22 @@ export async function consumerHbTest(fetch: boolean) {
201197

202198
const js = nc.jetstream();
203199
const c = await js.consumers.get(stream, "a");
204-
const iter: ConsumerMessages = fetch
205-
? await c.fetch({
206-
max_messages: 100,
207-
idle_heartbeat: 1000,
208-
expires: 30000,
209-
})
210-
: await c.consume({
211-
max_messages: 100,
212-
idle_heartbeat: 1000,
213-
expires: 30000,
214-
});
200+
const iter: ConsumerMessages = await c.fetch({
201+
max_messages: 100,
202+
idle_heartbeat: 1000,
203+
expires: 30000,
204+
});
215205

206+
const buf: Promise<void>[] = [];
216207
// stop the data serverss
217208
setTimeout(() => {
218-
servers[1].stop();
219-
servers[2].stop();
220-
servers[3].stop();
209+
buf.push(servers[1].stop());
210+
buf.push(servers[2].stop());
211+
buf.push(servers[3].stop());
221212
}, 1000);
222213

214+
await Promise.all(buf);
215+
223216
const d = deferred<ConsumerStatus>();
224217

225218
await (async () => {
@@ -243,7 +236,7 @@ export async function consumerHbTest(fetch: boolean) {
243236

244237
await nc.close();
245238
await NatsServer.stopAll(servers, true);
246-
}
239+
});
247240

248241
Deno.test("consumers - bad options", async () => {
249242
const { ns, nc } = await setup(jetstreamServerConf({}));

jetstream/tests/consumers_fetch_test.ts renamed to jetstream/tests/fetch_test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { NatsConnectionImpl } from "../../nats-base-client/nats.ts";
3333
import { syncIterator } from "../../nats-base-client/core.ts";
3434
import { PullConsumerMessagesImpl } from "../consumer.ts";
3535

36-
Deno.test("consumers - fetch no messages", async () => {
36+
Deno.test("fetch - no messages", async () => {
3737
const { ns, nc } = await setup(jetstreamServerConf());
3838

3939
const { stream } = await initStream(nc);
@@ -58,7 +58,7 @@ Deno.test("consumers - fetch no messages", async () => {
5858
await cleanup(ns, nc);
5959
});
6060

61-
Deno.test("consumers - fetch less messages", async () => {
61+
Deno.test("fetch - less messages", async () => {
6262
const { ns, nc } = await setup(jetstreamServerConf());
6363

6464
const { stream, subj } = await initStream(nc);
@@ -83,7 +83,7 @@ Deno.test("consumers - fetch less messages", async () => {
8383
await cleanup(ns, nc);
8484
});
8585

86-
Deno.test("consumers - fetch exactly messages", async () => {
86+
Deno.test("fetch - exactly messages", async () => {
8787
const { ns, nc } = await setup(jetstreamServerConf());
8888

8989
const { stream, subj } = await initStream(nc);
@@ -115,7 +115,7 @@ Deno.test("consumers - fetch exactly messages", async () => {
115115
await cleanup(ns, nc);
116116
});
117117

118-
Deno.test("consumers - fetch consumer not found", async () => {
118+
Deno.test("fetch - consumer not found", async () => {
119119
const { ns, nc } = await setup(jetstreamServerConf());
120120
const jsm = await nc.jetstreamManager();
121121
await jsm.streams.add({ name: "A", subjects: ["hello"] });
@@ -149,7 +149,7 @@ Deno.test("consumers - fetch consumer not found", async () => {
149149
await cleanup(ns, nc);
150150
});
151151

152-
Deno.test("consumers - fetch deleted consumer", async () => {
152+
Deno.test("fetch - deleted consumer", async () => {
153153
const { ns, nc } = await setup(jetstreamServerConf());
154154
const jsm = await nc.jetstreamManager();
155155
await jsm.streams.add({ name: "A", subjects: ["a"] });
@@ -184,7 +184,7 @@ Deno.test("consumers - fetch deleted consumer", async () => {
184184
await cleanup(ns, nc);
185185
});
186186

187-
Deno.test("consumers - fetch stream not found", async () => {
187+
Deno.test("fetch - stream not found", async () => {
188188
const { ns, nc } = await setup(jetstreamServerConf());
189189

190190
const jsm = await nc.jetstreamManager();
@@ -216,7 +216,7 @@ Deno.test("consumers - fetch stream not found", async () => {
216216
await cleanup(ns, nc);
217217
});
218218

219-
Deno.test("consumers - fetch listener leaks", async () => {
219+
Deno.test("fetch - listener leaks", async () => {
220220
const { ns, nc } = await setup(jetstreamServerConf());
221221
const jsm = await nc.jetstreamManager();
222222
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
@@ -256,7 +256,7 @@ Deno.test("consumers - fetch listener leaks", async () => {
256256
await cleanup(ns, nc);
257257
});
258258

259-
Deno.test("consumers - fetch sync", async () => {
259+
Deno.test("fetch - sync", async () => {
260260
const { ns, nc } = await setup(jetstreamServerConf());
261261
const jsm = await nc.jetstreamManager();
262262
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
@@ -282,7 +282,7 @@ Deno.test("consumers - fetch sync", async () => {
282282
await cleanup(ns, nc);
283283
});
284284

285-
Deno.test("consumers - fetch consumer bind", async () => {
285+
Deno.test("fetch - consumer bind", async () => {
286286
const { ns, nc } = await setup(jetstreamServerConf());
287287

288288
const jsm = await nc.jetstreamManager();

jetstream/tests/jetstream_test.ts

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,7 +1103,7 @@ Deno.test("jetstream - mirror alternates", async () => {
11031103
const servers = await NatsServer.jetstreamCluster(3);
11041104
const nc = await connect({ port: servers[0].port });
11051105
if (await notCompatible(servers[0], nc, "2.8.2")) {
1106-
await NatsServer.stopAll([servers[1], servers[2]]);
1106+
await NatsServer.stopAll(servers, true);
11071107
return;
11081108
}
11091109

@@ -1213,22 +1213,8 @@ Deno.test("jetstream - detailed errors", async () => {
12131213
});
12141214

12151215
Deno.test("jetstream - repub on 503", async () => {
1216-
let servers = await NatsServer.jetstreamCluster(4, {});
1217-
servers[0].config.jetstream = "disabled";
1218-
await NatsServer.stopAll(servers);
1219-
const proms = servers.map((s) => {
1220-
return s.restart();
1221-
});
1222-
1223-
const connection = await proms[0];
1224-
const data = await NatsServer.dataClusterFormed([
1225-
proms[1],
1226-
proms[2],
1227-
proms[3],
1228-
]);
1229-
servers = [connection, data[0], data[1], data[2]];
1230-
1231-
const nc = await connect({ port: connection.port });
1216+
let servers = await NatsServer.setupDataConnCluster(4);
1217+
const nc = await connect({ port: servers[0].port });
12321218

12331219
const { stream, subj } = await initStream(nc, nuid.next(), {
12341220
num_replicas: 3,
@@ -1346,7 +1332,7 @@ Deno.test("jetstream - mem_storage consumer option", async () => {
13461332
});
13471333

13481334
Deno.test("jetstream - num_replicas consumer option", async () => {
1349-
const servers = await NatsServer.jetstreamCluster(3);
1335+
const servers = await NatsServer.setupDataConnCluster();
13501336
const nc = await connect({ port: servers[0].port });
13511337
if (await notCompatible(servers[0], nc, "2.9.0")) {
13521338
await NatsServer.stopAll(servers, true);

0 commit comments

Comments
 (0)