Skip to content

Commit e2738ee

Browse files
committed
moved additional types to prevent circ deps in the module
1 parent 292097d commit e2738ee

File tree

8 files changed

+89
-84
lines changed

8 files changed

+89
-84
lines changed

jetstream/consumer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
Timeout,
2121
timeout,
2222
} from "../nats-base-client/util.ts";
23-
import { ConsumerAPI, ConsumerAPIImpl } from "./jsmconsumer_api.ts";
23+
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
2424
import { nuid } from "../nats-base-client/nuid.ts";
2525
import { isHeartbeatMsg } from "./jsutil.ts";
2626
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
@@ -47,6 +47,7 @@ import {
4747
ConsumeMessages,
4848
ConsumeOptions,
4949
Consumer,
50+
ConsumerAPI,
5051
ConsumerCallbackFn,
5152
ConsumerDebugEvents,
5253
ConsumerEvents,

jetstream/internal_mod.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export type {
4545
ConsumeMessages,
4646
ConsumeOptions,
4747
Consumer,
48+
ConsumerAPI,
4849
ConsumerCallbackFn,
4950
ConsumerInfoable,
5051
ConsumerMessages,
@@ -78,6 +79,7 @@ export type {
7879
KvPutOptions,
7980
KvStatus,
8081
KvWatchOptions,
82+
Lister,
8183
MaxBytes,
8284
MaxMessages,
8385
NextOptions,
@@ -142,7 +144,6 @@ export type {
142144
} from "./jsapi_types.ts";
143145

144146
export type { JsMsg } from "./jsmsg.ts";
145-
export type { Lister } from "./jslister.ts";
146147

147148
export {
148149
AckPolicy,
@@ -154,6 +155,5 @@ export {
154155
StoreCompression,
155156
} from "./jsapi_types.ts";
156157

157-
export type { ConsumerAPI } from "./jsmconsumer_api.ts";
158158
export type { DeliveryInfo, StreamInfoRequestOptions } from "./jsapi_types.ts";
159159
export { jetstreamManager } from "./jsclient.ts";

jetstream/jsclient.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
validateDurableName,
2626
validateStreamName,
2727
} from "./jsutil.ts";
28-
import { ConsumerAPI, ConsumerAPIImpl } from "./jsmconsumer_api.ts";
28+
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
2929
import { JsMsg, JsMsgImpl, toJsMsg } from "./jsmsg.ts";
3030
import {
3131
MsgAdapter,
@@ -53,6 +53,7 @@ import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts";
5353
import {
5454
Advisory,
5555
AdvisoryKind,
56+
ConsumerAPI,
5657
ConsumerInfoable,
5758
ConsumerOpts,
5859
consumerOpts,

jetstream/jslister.ts

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,7 @@ import {
2020
ConsumerListResponse,
2121
StreamListResponse,
2222
} from "./jsapi_types.ts";
23-
24-
/**
25-
* An interface for listing. Returns a promise with typed list.
26-
*/
27-
export interface Lister<T> {
28-
[Symbol.asyncIterator](): AsyncIterator<T>;
29-
30-
next(): Promise<T[]>;
31-
}
32-
33-
export type ListerFieldFilter<T> = (v: unknown) => T[];
23+
import { Lister, ListerFieldFilter } from "./types.ts";
3424

3525
export class ListerImpl<T> implements Lister<T>, AsyncIterable<T> {
3626
err?: Error;
@@ -85,8 +75,7 @@ export class ListerImpl<T> implements Lister<T>, AsyncIterable<T> {
8575
return [];
8676
}
8777
this.offset += count;
88-
const a = this.filter(r);
89-
return a;
78+
return this.filter(r);
9079
} catch (err) {
9180
this.err = err;
9281
throw err;
@@ -107,7 +96,6 @@ export class ListerImpl<T> implements Lister<T>, AsyncIterable<T> {
10796
// has to be a stream...
10897
return (r as StreamListResponse).streams?.length || 0;
10998
}
110-
return 0;
11199
}
112100

113101
async *[Symbol.asyncIterator]() {

jetstream/jsmconsumer_api.ts

Lines changed: 8 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 The NATS Authors
2+
* Copyright 2021-2024 The NATS Authors
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
55
* You may obtain a copy of the License at
@@ -13,7 +13,7 @@
1313
* limitations under the License.
1414
*/
1515
import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
16-
import { Lister, ListerFieldFilter, ListerImpl } from "./jslister.ts";
16+
import { ListerImpl } from "./jslister.ts";
1717
import {
1818
minValidation,
1919
validateDurableName,
@@ -32,60 +32,12 @@ import {
3232
SuccessResponse,
3333
} from "./jsapi_types.ts";
3434

35-
import { JetStreamOptions } from "./types.ts";
36-
37-
export interface ConsumerAPI {
38-
/**
39-
* Returns the ConsumerInfo for the specified consumer in the specified stream.
40-
* @param stream
41-
* @param consumer
42-
*/
43-
info(stream: string, consumer: string): Promise<ConsumerInfo>;
44-
45-
/**
46-
* Adds a new consumer to the specified stream with the specified consumer options.
47-
* @param stream
48-
* @param cfg
49-
*/
50-
add(stream: string, cfg: Partial<ConsumerConfig>): Promise<ConsumerInfo>;
51-
52-
/**
53-
* Updates the consumer configuration for the specified consumer on the specified
54-
* stream that has the specified durable name.
55-
* @param stream
56-
* @param durable
57-
* @param cfg
58-
*/
59-
update(
60-
stream: string,
61-
durable: string,
62-
cfg: Partial<ConsumerUpdateConfig>,
63-
): Promise<ConsumerInfo>;
64-
65-
/**
66-
* Deletes the specified consumer name/durable from the specified stream.
67-
* @param stream
68-
* @param consumer
69-
*/
70-
delete(stream: string, consumer: string): Promise<boolean>;
71-
72-
/**
73-
* Lists all the consumers on the specfied streams
74-
* @param stream
75-
*/
76-
list(stream: string): Lister<ConsumerInfo>;
77-
78-
pause(
79-
stream: string,
80-
name: string,
81-
until?: Date,
82-
): Promise<{ paused: boolean; pause_until?: string }>;
83-
84-
resume(
85-
stream: string,
86-
name: string,
87-
): Promise<{ paused: boolean; pause_until?: string }>;
88-
}
35+
import {
36+
ConsumerAPI,
37+
JetStreamOptions,
38+
Lister,
39+
ListerFieldFilter,
40+
} from "./types.ts";
8941

9042
export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI {
9143
constructor(nc: NatsConnection, opts?: JetStreamOptions) {

jetstream/jsmstream_api.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { Empty, MsgHdrs } from "../nats-base-client/types.ts";
1717
import { BaseApiClientImpl, StreamNames } from "./jsbaseclient_api.ts";
18-
import { Lister, ListerFieldFilter, ListerImpl } from "./jslister.ts";
18+
import { ListerImpl } from "./jslister.ts";
1919
import { validateStreamName } from "./jsutil.ts";
2020
import { headers, MsgHdrsImpl } from "../nats-base-client/headers.ts";
2121
import { KvStatusImpl } from "./kv.ts";
@@ -26,10 +26,13 @@ import { Feature } from "../nats-base-client/semver.ts";
2626
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
2727
import {
2828
Consumer,
29+
ConsumerAPI,
2930
Consumers,
3031
JetStreamOptions,
3132
kvPrefix,
3233
KvStatus,
34+
Lister,
35+
ListerFieldFilter,
3336
ObjectStoreStatus,
3437
OrderedConsumerOptions,
3538
StoredMsg,
@@ -58,7 +61,7 @@ import {
5861
SuccessResponse,
5962
} from "./jsapi_types.ts";
6063
import { OrderedPullConsumerImpl, PullConsumerImpl } from "./consumer.ts";
61-
import { ConsumerAPI, ConsumerAPIImpl } from "./jsmconsumer_api.ts";
64+
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
6265

6366
export function convertStreamSourceDomain(s?: StreamSource) {
6467
if (s === undefined) {
@@ -463,10 +466,9 @@ export class StreamAPIImpl extends BaseApiClientImpl implements StreamAPI {
463466
if (kvStreams.length) {
464467
cluster = this.nc.info?.cluster ?? "";
465468
}
466-
const status = kvStreams.map((si) => {
469+
return kvStreams.map((si) => {
467470
return new KvStatusImpl(si, cluster);
468471
});
469-
return status;
470472
};
471473
const subj = `${this.prefix}.STREAM.LIST`;
472474
return new ListerImpl<KvStatus>(subj, filter, this);
@@ -483,10 +485,9 @@ export class StreamAPIImpl extends BaseApiClientImpl implements StreamAPI {
483485
objStreams.forEach((si) => {
484486
this._fixInfo(si);
485487
});
486-
const status = objStreams.map((si) => {
488+
return objStreams.map((si) => {
487489
return new ObjectStoreStatusImpl(si);
488490
});
489-
return status;
490491
};
491492
const subj = `${this.prefix}.STREAM.LIST`;
492493
return new ListerImpl<ObjectStoreStatus>(subj, filter, this);

jetstream/kv.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ export class Bucket implements KV, KvRemove {
755755
}
756756

757757
canSetWatcherName(): boolean {
758-
//@ts-ignore: don't import JetStreamClientImpl
758+
//@ts-ignore: avoiding circular dependencies
759759
const nci = this.js.nc as NatsConnectionImpl;
760760
const { ok } = nci.features.get(
761761
Feature.JS_NEW_CONSUMER_CREATE_API,

jetstream/types.ts

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
AckPolicy,
3131
ConsumerConfig,
3232
ConsumerInfo,
33+
ConsumerUpdateConfig,
3334
defaultConsumer,
3435
DeliverPolicy,
3536
DirectBatchOptions,
@@ -51,10 +52,7 @@ import {
5152
StreamUpdateConfig,
5253
} from "./jsapi_types.ts";
5354
import { JsMsg } from "./jsmsg.ts";
54-
// import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
55-
import { ConsumerAPI } from "./jsmconsumer_api.ts";
5655
import { validateDurableName } from "./jsutil.ts";
57-
import { Lister } from "./jslister.ts";
5856
import { nanos } from "../nats-base-client/util.ts";
5957
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
6058
import { Codec } from "../nats-base-client/codec.ts";
@@ -241,6 +239,17 @@ export interface Views {
241239
) => Promise<ObjectStore>;
242240
}
243241

242+
/**
243+
* An interface for listing. Returns a promise with typed list.
244+
*/
245+
export interface Lister<T> {
246+
[Symbol.asyncIterator](): AsyncIterator<T>;
247+
248+
next(): Promise<T[]>;
249+
}
250+
251+
export type ListerFieldFilter<T> = (v: unknown) => T[];
252+
244253
export interface StreamAPI {
245254
/**
246255
* Returns the information about the specified stream
@@ -332,6 +341,59 @@ export interface StreamAPI {
332341
get(name: string): Promise<Stream>;
333342
}
334343

344+
export interface ConsumerAPI {
345+
/**
346+
* Returns the ConsumerInfo for the specified consumer in the specified stream.
347+
* @param stream
348+
* @param consumer
349+
*/
350+
info(stream: string, consumer: string): Promise<ConsumerInfo>;
351+
352+
/**
353+
* Adds a new consumer to the specified stream with the specified consumer options.
354+
* @param stream
355+
* @param cfg
356+
*/
357+
add(stream: string, cfg: Partial<ConsumerConfig>): Promise<ConsumerInfo>;
358+
359+
/**
360+
* Updates the consumer configuration for the specified consumer on the specified
361+
* stream that has the specified durable name.
362+
* @param stream
363+
* @param durable
364+
* @param cfg
365+
*/
366+
update(
367+
stream: string,
368+
durable: string,
369+
cfg: Partial<ConsumerUpdateConfig>,
370+
): Promise<ConsumerInfo>;
371+
372+
/**
373+
* Deletes the specified consumer name/durable from the specified stream.
374+
* @param stream
375+
* @param consumer
376+
*/
377+
delete(stream: string, consumer: string): Promise<boolean>;
378+
379+
/**
380+
* Lists all the consumers on the specfied streams
381+
* @param stream
382+
*/
383+
list(stream: string): Lister<ConsumerInfo>;
384+
385+
pause(
386+
stream: string,
387+
name: string,
388+
until?: Date,
389+
): Promise<{ paused: boolean; pause_until?: string }>;
390+
391+
resume(
392+
stream: string,
393+
name: string,
394+
): Promise<{ paused: boolean; pause_until?: string }>;
395+
}
396+
335397
/**
336398
* The API for interacting with JetStream resources
337399
*/

0 commit comments

Comments
 (0)