16
16
import {
17
17
cleanup ,
18
18
jetstreamServerConf ,
19
+ NatsServer ,
19
20
setup ,
20
21
} from "../../tests/helpers/mod.ts" ;
21
22
import { setupStreamAndConsumer } from "../../examples/jetstream/util.ts" ;
@@ -25,13 +26,17 @@ import {
25
26
assertExists ,
26
27
assertRejects ,
27
28
} from "https://deno.land/std@0.221.0/assert/mod.ts" ;
28
- import { consumerHbTest } from "./consumers_test.ts" ;
29
29
import { initStream } from "./jstest_util.ts" ;
30
30
import { AckPolicy , DeliverPolicy } from "../jsapi_types.ts" ;
31
31
import { deadline , deferred , delay } from "../../nats-base-client/util.ts" ;
32
32
import { nanos } from "../jsutil.ts" ;
33
- import { ConsumerEvents , PullConsumerMessagesImpl } from "../consumer.ts" ;
33
+ import {
34
+ ConsumerEvents ,
35
+ ConsumerStatus ,
36
+ PullConsumerMessagesImpl ,
37
+ } from "../consumer.ts" ;
34
38
import { syncIterator } from "../../nats-base-client/core.ts" ;
39
+ import { connect } from "../../src/connect.ts" ;
35
40
36
41
Deno . test ( "consumers - consume" , async ( ) => {
37
42
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
@@ -88,11 +93,60 @@ Deno.test("consumers - consume callback rejects iter", async () => {
88
93
await cleanup ( ns , nc ) ;
89
94
} ) ;
90
95
91
- Deno . test ( "consumers - consume heartbeats" , async ( ) => {
92
- await consumerHbTest ( false ) ;
96
+ Deno . test ( "consume - heartbeats" , async ( ) => {
97
+ const servers = await NatsServer . setupDataConnCluster ( 4 ) ;
98
+ const nc = await connect ( { port : servers [ 0 ] . port } ) ;
99
+ const { stream } = await initStream ( nc ) ;
100
+ const jsm = await nc . jetstreamManager ( ) ;
101
+ await jsm . consumers . add ( stream , {
102
+ durable_name : "a" ,
103
+ ack_policy : AckPolicy . Explicit ,
104
+ } ) ;
105
+
106
+ const js = nc . jetstream ( ) ;
107
+ const c = await js . consumers . get ( stream , "a" ) ;
108
+ const iter = await c . consume ( {
109
+ max_messages : 100 ,
110
+ idle_heartbeat : 1000 ,
111
+ expires : 30000 ,
112
+ } ) ;
113
+
114
+ const buf : Promise < void > [ ] = [ ] ;
115
+ // stop the data serverss
116
+ setTimeout ( ( ) => {
117
+ buf . push ( servers [ 1 ] . stop ( ) ) ;
118
+ buf . push ( servers [ 2 ] . stop ( ) ) ;
119
+ buf . push ( servers [ 3 ] . stop ( ) ) ;
120
+ } , 1000 ) ;
121
+
122
+ await Promise . all ( buf ) ;
123
+
124
+ const d = deferred < ConsumerStatus > ( ) ;
125
+
126
+ await ( async ( ) => {
127
+ const status = await iter . status ( ) ;
128
+ for await ( const s of status ) {
129
+ d . resolve ( s ) ;
130
+ iter . stop ( ) ;
131
+ break ;
132
+ }
133
+ } ) ( ) ;
134
+
135
+ await ( async ( ) => {
136
+ for await ( const _r of iter ) {
137
+ // nothing
138
+ }
139
+ } ) ( ) ;
140
+
141
+ const cs = await d ;
142
+ assertEquals ( cs . type , ConsumerEvents . HeartbeatsMissed ) ;
143
+ assertEquals ( cs . data , 2 ) ;
144
+
145
+ await nc . close ( ) ;
146
+ await NatsServer . stopAll ( servers , true ) ;
93
147
} ) ;
94
148
95
- Deno . test ( "consumers - consume deleted consumer" , async ( ) => {
149
+ Deno . test ( "consume - deleted consumer" , async ( ) => {
96
150
const { ns, nc } = await setup ( jetstreamServerConf ( { } ) ) ;
97
151
const { stream } = await initStream ( nc ) ;
98
152
const jsm = await nc . jetstreamManager ( ) ;
@@ -142,7 +196,7 @@ Deno.test("consumers - consume deleted consumer", async () => {
142
196
await cleanup ( ns , nc ) ;
143
197
} ) ;
144
198
145
- Deno . test ( "consumers - sub leaks consume() " , async ( ) => {
199
+ Deno . test ( "consume - sub leaks" , async ( ) => {
146
200
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
147
201
const { stream } = await initStream ( nc ) ;
148
202
@@ -171,7 +225,7 @@ Deno.test("consumers - sub leaks consume()", async () => {
171
225
await cleanup ( ns , nc ) ;
172
226
} ) ;
173
227
174
- Deno . test ( "consumers - consume drain" , async ( ) => {
228
+ Deno . test ( "consume - drain" , async ( ) => {
175
229
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
176
230
const { stream } = await initStream ( nc ) ;
177
231
@@ -198,7 +252,7 @@ Deno.test("consumers - consume drain", async () => {
198
252
await cleanup ( ns , nc ) ;
199
253
} ) ;
200
254
201
- Deno . test ( "consumers - consume sync" , async ( ) => {
255
+ Deno . test ( "consume - sync" , async ( ) => {
202
256
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
203
257
const jsm = await nc . jetstreamManager ( ) ;
204
258
await jsm . streams . add ( { name : "messages" , subjects : [ "hello" ] } ) ;
@@ -225,7 +279,7 @@ Deno.test("consumers - consume sync", async () => {
225
279
await cleanup ( ns , nc ) ;
226
280
} ) ;
227
281
228
- Deno . test ( "consumers - consume stream not found request abort" , async ( ) => {
282
+ Deno . test ( "consume - stream not found request abort" , async ( ) => {
229
283
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
230
284
231
285
const jsm = await nc . jetstreamManager ( ) ;
@@ -258,7 +312,7 @@ Deno.test("consumers - consume stream not found request abort", async () => {
258
312
await cleanup ( ns , nc ) ;
259
313
} ) ;
260
314
261
- Deno . test ( "consumers - consume consumer deleted request abort" , async ( ) => {
315
+ Deno . test ( "consume - consumer deleted request abort" , async ( ) => {
262
316
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
263
317
264
318
const jsm = await nc . jetstreamManager ( ) ;
@@ -294,7 +348,7 @@ Deno.test("consumers - consume consumer deleted request abort", async () => {
294
348
await cleanup ( ns , nc ) ;
295
349
} ) ;
296
350
297
- Deno . test ( "consumers - consume consumer not found request abort" , async ( ) => {
351
+ Deno . test ( "consume - consumer not found request abort" , async ( ) => {
298
352
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
299
353
300
354
const jsm = await nc . jetstreamManager ( ) ;
@@ -328,7 +382,7 @@ Deno.test("consumers - consume consumer not found request abort", async () => {
328
382
await cleanup ( ns , nc ) ;
329
383
} ) ;
330
384
331
- Deno . test ( "consumers - consume consumer bind" , async ( ) => {
385
+ Deno . test ( "consume - consumer bind" , async ( ) => {
332
386
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
333
387
334
388
const jsm = await nc . jetstreamManager ( ) ;
0 commit comments