diff --git a/core/deno.json b/core/deno.json index ddbc0ad0..b2bf9f59 100644 --- a/core/deno.json +++ b/core/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/nats-core", - "version": "3.0.0-34", + "version": "3.0.0-35", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" diff --git a/core/package.json b/core/package.json index 27b1de88..c98422c8 100644 --- a/core/package.json +++ b/core/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/nats-core", - "version": "3.0.0-34", + "version": "3.0.0-35", "files": [ "lib/", "LICENSE", diff --git a/core/src/version.ts b/core/src/version.ts index 837c22a5..dd48dd97 100644 --- a/core/src/version.ts +++ b/core/src/version.ts @@ -1,2 +1,2 @@ // This file is generated - do not edit -export const version = "3.0.0-34"; +export const version = "3.0.0-35"; diff --git a/jetstream/deno.json b/jetstream/deno.json index 63b305f2..6c677a8b 100644 --- a/jetstream/deno.json +++ b/jetstream/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-22", + "version": "3.0.0-23", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,6 +33,6 @@ "test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35" } } diff --git a/jetstream/import_map.json b/jetstream/import_map.json index 3d1a7d73..33ac3f28 100644 --- a/jetstream/import_map.json +++ b/jetstream/import_map.json @@ -2,8 +2,8 @@ "imports": { "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-35/internal", "test_helpers": "../test_helpers/mod.ts", "@std/io": "jsr:@std/io@0.224.0" } diff --git a/jetstream/package.json b/jetstream/package.json index 36b72f14..ede82a3f 100644 --- a/jetstream/package.json +++ b/jetstream/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-22", + "version": "3.0.0-23", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients", "dependencies": { - "@nats-io/nats-core": "3.0.0-34" + "@nats-io/nats-core": "3.0.0-35" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index 559673ce..a2b38cbd 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -29,6 +29,7 @@ import { delay, errors, Events, + Feature, IdleHeartbeatMonitor, nanos, nuid, @@ -43,6 +44,7 @@ import { toJsMsg } from "./jsmsg.ts"; import type { ConsumerConfig, ConsumerInfo, + OverflowMinPendingAndMinAck, PullOptions, } from "./jsapi_types.ts"; import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; @@ -54,14 +56,21 @@ import type { ConsumerCallbackFn, ConsumerMessages, ConsumerStatus, + Expires, FetchMessages, FetchOptions, + IdleHeartbeat, + MaxBytes, + MaxMessages, NextOptions, OrderedConsumerOptions, PullConsumerOptions, + ThresholdBytes, + ThresholdMessages, } from "./types.ts"; import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts"; import { JetStreamStatus } from "./jserrors.ts"; +import { minValidation } from "./jsutil.ts"; enum PullConsumerType { Unset = -1, @@ -85,10 +94,28 @@ export type PullConsumerInternalOptions = { ordered?: OrderedConsumerOptions; }; +type InternalPullOptions = + & MaxMessages + & MaxBytes + & Expires + & IdleHeartbeat + & ThresholdMessages + & OverflowMinPendingAndMinAck + & ThresholdBytes; + +export function isOverflowOptions( + opts: unknown, +): opts is OverflowMinPendingAndMinAck { + const oo = opts as OverflowMinPendingAndMinAck; + return oo && typeof oo.group === "string" || + typeof oo.min_pending === "number" || + typeof oo.min_ack_pending === "number"; +} + export class PullConsumerMessagesImpl extends QueuedIteratorImpl implements ConsumerMessages { consumer: PullConsumerImpl; - opts: Record; + opts: InternalPullOptions; sub!: Subscription; monitor: IdleHeartbeatMonitor | null; pending: { msgs: number; bytes: number; requests: number }; @@ -117,6 +144,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.inbox = `${this.inboxPrefix}.${this.consumer.serial}`; if (this.consumer.ordered) { + if (isOverflowOptions(opts)) { + throw errors.InvalidArgumentError.format([ + "group", + "min_pending", + "min_ack_pending", + ], "cannot be specified for ordered consumers"); + } if (this.consumer.orderedConsumerState === undefined) { // initialize the state for the order consumer const ocs = {} as OrderedConsumerState; @@ -564,9 +598,21 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl pullOptions(): Partial { const batch = this.opts.max_messages - this.pending.msgs; const max_bytes = this.opts.max_bytes - this.pending.bytes; - const idle_heartbeat = nanos(this.opts.idle_heartbeat); - const expires = nanos(this.opts.expires); - return { batch, max_bytes, idle_heartbeat, expires }; + const idle_heartbeat = nanos(this.opts.idle_heartbeat!); + const expires = nanos(this.opts.expires!); + + const opts = { batch, max_bytes, idle_heartbeat, expires } as PullOptions; + + if (isOverflowOptions(this.opts)) { + opts.group = this.opts.group; + if (this.opts.min_pending) { + opts.min_pending = this.opts.min_pending; + } + if (this.opts.min_ack_pending) { + opts.min_ack_pending = this.opts.min_ack_pending; + } + } + return opts; } trackTimeout(t: Timeout) { @@ -608,14 +654,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl parseOptions( opts: PullConsumerOptions, refilling = false, - ): Record { - const args = (opts || {}) as Record; + ): InternalPullOptions { + const args = (opts || {}) as InternalPullOptions; args.max_messages = args.max_messages || 0; args.max_bytes = args.max_bytes || 0; if (args.max_messages !== 0 && args.max_bytes !== 0) { - throw new Error( - `only specify one of max_messages or max_bytes`, + throw errors.InvalidArgumentError.format( + ["max_messages", "max_bytes"], + "are mutually exclusive", ); } @@ -654,6 +701,25 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl args.threshold_bytes = args.threshold_bytes || minBytes; } + if (isOverflowOptions(opts)) { + const { min, ok } = this.consumer.api.nc.features.get( + Feature.JS_PRIORITY_GROUPS, + ); + if (!ok) { + throw new Error(`priority_groups require server ${min}`); + } + validateOverflowPullOptions(opts); + if (opts.group) { + args.group = opts.group; + } + if (opts.min_ack_pending) { + args.min_ack_pending = opts.min_ack_pending; + } + if (opts.min_pending) { + args.min_pending = opts.min_pending; + } + } + return args; } @@ -784,7 +850,7 @@ export class PullConsumerImpl implements Consumer { this.messages = m; } // FIXME: need some way to pad this correctly - const to = Math.round(m.opts.expires * 1.05); + const to = Math.round(m.opts.expires! * 1.05); const timer = timeout(to); m.closed().catch((err) => { console.log(err); @@ -857,3 +923,37 @@ export class PullConsumerImpl implements Consumer { return this._info; } } + +export function validateOverflowPullOptions(opts: unknown) { + if (isOverflowOptions(opts)) { + minValidation("group", opts.group); + if (opts.group.length > 16) { + throw errors.InvalidArgumentError.format( + "group", + "must be 16 characters or less", + ); + } + + const { min_pending, min_ack_pending } = opts; + if (!min_pending && !min_ack_pending) { + throw errors.InvalidArgumentError.format( + ["min_pending", "min_ack_pending"], + "at least one must be specified", + ); + } + + if (min_pending && typeof min_pending !== "number") { + throw errors.InvalidArgumentError.format( + ["min_pending"], + "must be a number", + ); + } + + if (min_ack_pending && typeof min_ack_pending !== "number") { + throw errors.InvalidArgumentError.format( + ["min_ack_pending"], + "must be a number", + ); + } + } +} diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 0f8097be..85ceb80f 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -895,7 +895,12 @@ export interface JetStreamApiStats { export interface AccountInfoResponse extends ApiResponse, JetStreamAccountStats {} -export interface ConsumerConfig extends ConsumerUpdateConfig { +export type PriorityGroups = { + priority_groups?: string[]; + priority_policy?: PriorityPolicy; +}; + +export type ConsumerConfig = ConsumerUpdateConfig & { /** * The type of acknowledgment required by the Consumer */ @@ -952,9 +957,9 @@ export interface ConsumerConfig extends ConsumerUpdateConfig { * Specified as an ISO date time string (Date#toISOString()). */ "pause_until"?: string; -} +}; -export interface ConsumerUpdateConfig { +export type ConsumerUpdateConfig = PriorityGroups & { /** * A short description of the purpose of this consume */ @@ -1037,6 +1042,11 @@ export interface ConsumerUpdateConfig { * 2.10.x and better. */ metadata?: Record; +}; + +export enum PriorityPolicy { + None = "none", + Overflow = "overflow", } export function defaultConsumer( @@ -1052,12 +1062,54 @@ export function defaultConsumer( }, opts); } +export type OverflowMinPending = { + /** + * The name of the priority_group + */ + group: string; + /** + * Only deliver messages when num_pending for the consumer is greater than this value + */ + min_pending: number; +}; + +export type OverflowMinAckPending = { + /** + * The name of the priority_group + */ + group: string; + /** + * Only deliver messages when num_ack_pending for the consumer is greater than this value + */ + min_ack_pending: number; +}; + +export type OverflowMinPendingAndMinAck = { + /** + * The name of the priority_group + */ + group: string; + /** + * Only deliver messages when num_pending for the consumer is greater than this value + */ + min_pending: number; + /** + * Only deliver messages when num_ack_pending for the consumer is greater than this value + */ + min_ack_pending: number; +}; + +export type OverflowOptions = + | OverflowMinPending + | OverflowMinAckPending + | OverflowMinPendingAndMinAck; + /** * Options for a JetStream pull subscription which define how long * the pull request will remain open and limits the amount of data * that the server could return. */ -export interface PullOptions { +export type PullOptions = Partial & { /** * Max number of messages to retrieve in a pull. */ @@ -1076,8 +1128,12 @@ export interface PullOptions { * number of messages in the batch to fit within this setting. */ "max_bytes": number; + + /** + * Number of nanos between messages for the server to emit an idle_heartbeat + */ "idle_heartbeat": number; -} +}; export interface DeliveryInfo { /** diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index af1a2345..4f6c0ab4 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -15,7 +15,12 @@ import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; -import { delay, Empty, QueuedIteratorImpl } from "@nats-io/nats-core/internal"; +import { + backoff, + delay, + Empty, + QueuedIteratorImpl, +} from "@nats-io/nats-core/internal"; import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts"; @@ -34,7 +39,7 @@ import type { StreamAPI, Streams, } from "./types.ts"; -import { errors, headers } from "@nats-io/nats-core/internal"; +import { errors, headers, RequestError } from "@nats-io/nats-core/internal"; import type { Msg, @@ -49,7 +54,7 @@ import type { JetStreamAccountStats, } from "./jsapi_types.ts"; import { DirectStreamAPIImpl } from "./jsm.ts"; -import { JetStreamError } from "./jserrors.ts"; +import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts"; export function toJetStreamClient( nc: NatsConnection | JetStreamClient, @@ -205,29 +210,35 @@ export class JetStreamClientImpl extends BaseApiClientImpl ro.headers = mh; } - let { retries, retry_delay } = opts as { + let { retries } = opts as { retries: number; - retry_delay: number; }; retries = retries || 1; - retry_delay = retry_delay || 250; + const bo = backoff(); - let r: Msg; + let r: Msg | null = null; for (let i = 0; i < retries; i++) { try { r = await this.nc.request(subj, data, ro); // if here we succeeded break; } catch (err) { + const re = err instanceof RequestError ? err as RequestError : null; if ( - err instanceof errors.RequestError && err.isNoResponders() + err instanceof errors.TimeoutError || + re?.isNoResponders() && i + 1 < retries ) { - await delay(retry_delay); + await delay(bo.backoff(i)); } else { - throw err; + throw re?.isNoResponders() + ? new JetStreamNotEnabled(`jetstream is not enabled`, { + cause: err, + }) + : err; } } } + const pa = this.parseJsResponse(r!) as PubAck; if (pa.stream === "") { throw new JetStreamError("invalid ack response"); diff --git a/jetstream/src/jsmconsumer_api.ts b/jetstream/src/jsmconsumer_api.ts index e507ca24..77c4f21f 100644 --- a/jetstream/src/jsmconsumer_api.ts +++ b/jetstream/src/jsmconsumer_api.ts @@ -24,17 +24,21 @@ import type { NatsConnection, NatsConnectionImpl, } from "@nats-io/nats-core/internal"; -import { Feature, InvalidArgumentError } from "@nats-io/nats-core/internal"; -import { ConsumerApiAction } from "./jsapi_types.ts"; - +import { + errors, + Feature, + InvalidArgumentError, +} from "@nats-io/nats-core/internal"; import type { ConsumerConfig, ConsumerInfo, ConsumerListResponse, ConsumerUpdateConfig, CreateConsumerRequest, + PriorityGroups, SuccessResponse, } from "./jsapi_types.ts"; +import { ConsumerApiAction, PriorityPolicy } from "./jsapi_types.ts"; import type { ConsumerAPI, @@ -68,6 +72,20 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI { ); } + if (isPriorityGroup(cfg)) { + const { min, ok } = this.nc.features.get(Feature.JS_PRIORITY_GROUPS); + if (!ok) { + throw new Error(`priority_groups require server ${min}`); + } + if (cfg.deliver_subject) { + throw InvalidArgumentError.format( + "deliver_subject", + "cannot be set when using priority groups", + ); + } + validatePriorityGroups(cfg); + } + const cr = {} as CreateConsumerRequest; cr.config = cfg; cr.stream_name = stream; @@ -214,3 +232,44 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI { >; } } + +function isPriorityGroup(config: unknown): config is PriorityGroups { + const pg = config as PriorityGroups; + return pg && pg.priority_groups !== undefined || + pg.priority_policy !== undefined; +} + +function validatePriorityGroups(pg: unknown): void { + if (isPriorityGroup(pg)) { + if (!Array.isArray(pg.priority_groups)) { + throw InvalidArgumentError.format( + ["priority_groups"], + "must be an array", + ); + } + if (pg.priority_groups.length === 0) { + throw InvalidArgumentError.format( + ["priority_groups"], + "must have at least one group", + ); + } + pg.priority_groups.forEach((g) => { + minValidation("priority_group", g); + if (g.length > 16) { + throw errors.InvalidArgumentError.format( + "group", + "must be 16 characters or less", + ); + } + }); + if ( + pg.priority_policy !== PriorityPolicy.None && + pg.priority_policy !== PriorityPolicy.Overflow + ) { + throw InvalidArgumentError.format( + ["priority_policy"], + "must be 'none' or 'overflow'", + ); + } + } +} diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 957cf574..37022700 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -31,6 +31,7 @@ import type { DirectMsgRequest, JetStreamAccountStats, MsgRequest, + OverflowOptions, PurgeOpts, PurgeResponse, StreamAlternate, @@ -350,7 +351,8 @@ export type ConsumeBytes = & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource - & Bind; + & Bind + & Partial; export type ConsumeMessages = & Partial & Partial @@ -358,7 +360,8 @@ export type ConsumeMessages = & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource - & Bind; + & Bind + & Partial; export type ConsumeOptions = | ConsumeBytes | ConsumeMessages; @@ -370,7 +373,8 @@ export type FetchBytes = & Partial & Expires & IdleHeartbeat - & Bind; + & Bind + & Partial; /** * Options for fetching messages */ @@ -378,7 +382,9 @@ export type FetchMessages = & Partial & Expires & IdleHeartbeat - & Bind; + & Bind + & Partial; + export type FetchOptions = FetchBytes | FetchMessages; export type PullConsumerOptions = FetchOptions | ConsumeOptions; export type MaxMessages = { diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index f1029bda..d3a4401c 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -250,7 +250,7 @@ Deno.test("consumers - bad options", async () => { await c.consume({ max_messages: 100, max_bytes: 100 }); }, Error, - "only specify one of max_messages or max_bytes", + "'max_messages','max_bytes' are mutually exclusive", ); await assertRejects( diff --git a/jetstream/tests/jetstream_pullconsumer_test.ts b/jetstream/tests/jetstream_pullconsumer_test.ts index af28e70c..2cf70a32 100644 --- a/jetstream/tests/jetstream_pullconsumer_test.ts +++ b/jetstream/tests/jetstream_pullconsumer_test.ts @@ -18,14 +18,24 @@ import { connect, jetstreamExportServerConf, jetstreamServerConf, + notCompatible, setup, } from "test_helpers"; import { initStream } from "./jstest_util.ts"; -import type { ConsumerConfig } from "../src/jsapi_types.ts"; -import { AckPolicy, DeliverPolicy } from "../src/jsapi_types.ts"; +import { + AckPolicy, + type ConsumerConfig, + DeliverPolicy, + type OverflowMinPendingAndMinAck, + PriorityPolicy, +} from "../src/jsapi_types.ts"; import { assertEquals, assertExists } from "jsr:@std/assert"; -import { Empty, nanos, nuid } from "@nats-io/nats-core"; -import { jetstream, jetstreamManager } from "../src/mod.ts"; +import { deferred, Empty, type Msg, nanos, nuid } from "@nats-io/nats-core"; +import { + type ConsumeOptions, + jetstream, + jetstreamManager, +} from "../src/mod.ts"; Deno.test("jetstream - pull consumer options", async () => { const { ns, nc } = await setup(jetstreamServerConf({})); @@ -120,3 +130,171 @@ Deno.test("jetstream - last of", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - priority group", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ + name: "A", + subjects: [`a`], + }); + + const js = jetstream(nc); + + const buf = []; + for (let i = 0; i < 100; i++) { + buf.push(js.publish("a", Empty)); + } + + await Promise.all(buf); + + const opts = { + durable_name: "a", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + priority_groups: ["overflow"], + priority_policy: PriorityPolicy.Overflow, + }; + + await jsm.consumers.add("A", opts); + + function spyPull(): Promise { + const d = deferred(); + nc.subscribe(`$JS.API.CONSUMER.MSG.NEXT.A.a`, { + callback: (err, msg) => { + if (err) { + d.reject(err); + } + d.resolve(msg); + }, + }); + + return d; + } + + await t.step("consume", async () => { + async function check(opts: ConsumeOptions): Promise { + const c = await js.consumers.get("A", "a"); + + const d = spyPull(); + const c1 = await c.consume(opts); + const done = (async () => { + for await (const m of c1) { + m.ack(); + } + })(); + + const m = await d; + c1.stop(); + await done; + + const po = m.json(); + const oopts = opts as OverflowMinPendingAndMinAck; + assertEquals(po.group, opts.group); + assertEquals(po.min_ack_pending, oopts.min_ack_pending); + assertEquals(po.min_pending, oopts.min_pending); + } + + await check({ + max_messages: 2, + group: "overflow", + min_ack_pending: 2, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + min_ack_pending: 100, + }); + }); + + await t.step("fetch", async () => { + async function check(opts: ConsumeOptions): Promise { + const c = await js.consumers.get("A", "a"); + + const d = spyPull(); + const iter = await c.fetch(opts); + for await (const m of iter) { + m.ack(); + } + + const m = await d; + const po = m.json(); + const oopts = opts as OverflowMinPendingAndMinAck; + assertEquals(po.group, opts.group); + assertEquals(po.min_ack_pending, oopts.min_ack_pending); + assertEquals(po.min_pending, oopts.min_pending); + } + + await check({ + max_messages: 2, + group: "overflow", + min_ack_pending: 2, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + min_ack_pending: 100, + expires: 1000, + }); + }); + + await t.step("next", async () => { + async function check(opts: ConsumeOptions): Promise { + const c = await js.consumers.get("A", "a"); + const d = spyPull(); + await c.next(opts); + + const m = await d; + const po = m.json(); + const oopts = opts as OverflowMinPendingAndMinAck; + assertEquals(po.group, opts.group); + assertEquals(po.min_ack_pending, oopts.min_ack_pending); + assertEquals(po.min_pending, oopts.min_pending); + } + + await check({ + max_messages: 2, + group: "overflow", + min_ack_pending: 2, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + min_ack_pending: 100, + expires: 1000, + }); + }); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jetstream_pushconsumer_test.ts b/jetstream/tests/jetstream_pushconsumer_test.ts index fcf69781..9d5146b5 100644 --- a/jetstream/tests/jetstream_pushconsumer_test.ts +++ b/jetstream/tests/jetstream_pushconsumer_test.ts @@ -35,15 +35,20 @@ import { nuid, syncIterator, } from "@nats-io/nats-core"; -import { ConsumerDebugEvents, ConsumerEvents } from "../src/types.ts"; import type { BoundPushConsumerOptions, PubAck } from "../src/types.ts"; +import { ConsumerDebugEvents, ConsumerEvents } from "../src/types.ts"; import { assert, assertEquals, assertExists, assertRejects, } from "jsr:@std/assert"; -import { AckPolicy, DeliverPolicy, StorageType } from "../src/jsapi_types.ts"; +import { + AckPolicy, + DeliverPolicy, + PriorityPolicy, + StorageType, +} from "../src/jsapi_types.ts"; import type { JsMsg } from "../src/jsmsg.ts"; import { jetstream, jetstreamManager } from "../src/mod.ts"; import type { @@ -989,3 +994,26 @@ Deno.test("jetstream - ordered push consumer honors inbox prefix", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - push consumer doesn't support priority groups", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.12.0")) { + return; + } + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "A", subjects: ["test"] }); + + await assertRejects( + () => { + return jsm.consumers.add("A", { + ack_policy: AckPolicy.None, + deliver_subject: "foo", + priority_groups: ["hello"], + priority_policy: PriorityPolicy.Overflow, + }); + }, + Error, + "'deliver_subject' cannot be set when using priority groups", + ); + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 90f0dc00..aa3ecf59 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -32,12 +32,15 @@ import { Empty, headers, nanos, + NoRespondersError, nuid, + RequestError, } from "@nats-io/nats-core"; import { assert, assertEquals, assertExists, + assertInstanceOf, assertRejects, assertThrows, } from "jsr:@std/assert"; @@ -52,7 +55,7 @@ import { setup, } from "test_helpers"; import { PubHeaders } from "../src/jsapi_types.ts"; -import { JetStreamApiError } from "../src/jserrors.ts"; +import { JetStreamApiError, JetStreamNotEnabled } from "../src/jserrors.ts"; Deno.test("jetstream - default options", () => { const opts = defaultJsOptions(); @@ -1076,3 +1079,39 @@ Deno.test("jetstream - term reason", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - publish no responder", async (t) => { + await t.step("not a jetstream server", async () => { + const { ns, nc } = await setup(); + const js = jetstream(nc); + const err = await assertRejects( + () => { + return js.publish("hello"); + }, + JetStreamNotEnabled, + ); + + assertInstanceOf(err.cause, RequestError); + assertInstanceOf(err.cause?.cause, NoRespondersError); + + await cleanup(ns, nc); + }); + + await t.step("jetstream not listening for subject", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "s", subjects: ["a", "b"] }); + const js = jetstream(nc); + const err = await assertRejects( + () => { + return js.publish("c"); + }, + JetStreamNotEnabled, + ); + + assertInstanceOf(err.cause, RequestError); + assertInstanceOf(err.cause?.cause, NoRespondersError); + + await cleanup(ns, nc); + }); +}); diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index 923178d6..db8b587b 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -72,7 +72,11 @@ import { } from "jsr:@nats-io/jwt@0.0.9-3"; import { convertStreamSourceDomain } from "../src/jsmstream_api.ts"; import type { ConsumerAPIImpl } from "../src/jsmconsumer_api.ts"; -import { ConsumerApiAction, StoreCompression } from "../src/jsapi_types.ts"; +import { + ConsumerApiAction, + PriorityPolicy, + StoreCompression, +} from "../src/jsapi_types.ts"; import type { JetStreamManagerImpl } from "../src/jsclient.ts"; import { stripNatsMetadata } from "./util.ts"; import { jserrors } from "../src/jserrors.ts"; @@ -2754,3 +2758,88 @@ Deno.test("jsm - storage", async () => { await cleanup(ns, nc); }); + +Deno.test("jsm - pull consumer priority groups", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ + name: "A", + subjects: [`a`], + }); + + await t.step("priority group is not an array", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + //@ts-ignore: testing + priority_groups: "hello", + }); + }, + Error, + "'priority_groups' must be an array", + ); + }); + + await t.step("priority_group empty array", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + //@ts-ignore: testing + priority_groups: [], + }); + }, + Error, + "'priority_groups' must have at least one group", + ); + }); + + await t.step("missing priority_policy ", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + priority_groups: ["hello"], + }); + }, + Error, + "'priority_policy' must be 'none' or 'overflow'", + ); + }); + + await t.step("bad priority_policy ", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + priority_groups: ["hello"], + //@ts-ignore: test + priority_policy: "hello", + }); + }, + Error, + "'priority_policy' must be 'none' or 'overflow'", + ); + }); + + await t.step("check config", async () => { + const ci = await jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + priority_groups: ["hello"], + priority_policy: PriorityPolicy.Overflow, + }); + assertEquals(ci.config.priority_policy, PriorityPolicy.Overflow); + assertEquals(ci.config.priority_groups, ["hello"]); + }); + + await cleanup(ns, nc); +}); diff --git a/kv/deno.json b/kv/deno.json index c39bce2f..716599cd 100644 --- a/kv/deno.json +++ b/kv/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-16", + "version": "3.0.0-17", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,7 +33,7 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-23" } } diff --git a/kv/import_map.json b/kv/import_map.json index 541bde98..a74f64ac 100644 --- a/kv/import_map.json +++ b/kv/import_map.json @@ -1,9 +1,9 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-35/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-23", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-23/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/kv/package.json b/kv/package.json index 2824b019..a40c7116 100644 --- a/kv/package.json +++ b/kv/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-16", + "version": "3.0.0-17", "files": [ "lib/", "LICENSE", @@ -34,8 +34,8 @@ }, "description": "kv library - this library implements all the base functionality for NATS KV javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-22", - "@nats-io/nats-core": "3.0.0-34" + "@nats-io/jetstream": "3.0.0-23", + "@nats-io/nats-core": "3.0.0-35" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/obj/deno.json b/obj/deno.json index f7555bac..8613a5ca 100644 --- a/obj/deno.json +++ b/obj/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-17", + "version": "3.0.0-18", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,7 +33,7 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-23" } } diff --git a/obj/import_map.json b/obj/import_map.json index 541bde98..a74f64ac 100644 --- a/obj/import_map.json +++ b/obj/import_map.json @@ -1,9 +1,9 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-35/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-23", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-23/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/obj/package.json b/obj/package.json index 39395c1d..9d990f95 100644 --- a/obj/package.json +++ b/obj/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-17", + "version": "3.0.0-18", "files": [ "lib/", "LICENSE", @@ -34,8 +34,8 @@ }, "description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-22", - "@nats-io/nats-core": "3.0.0-34" + "@nats-io/jetstream": "3.0.0-23", + "@nats-io/nats-core": "3.0.0-35" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/services/deno.json b/services/deno.json index acfb300e..6ae63eba 100644 --- a/services/deno.json +++ b/services/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/services", - "version": "3.0.0-12", + "version": "3.0.0-13", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,6 +33,6 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35" } } diff --git a/services/import_map.json b/services/import_map.json index 6c3ae4d4..6228967f 100644 --- a/services/import_map.json +++ b/services/import_map.json @@ -1,7 +1,7 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-35/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/services/package.json b/services/package.json index fc3efdb4..1c43c744 100644 --- a/services/package.json +++ b/services/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/services", - "version": "3.0.0-12", + "version": "3.0.0-13", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "services library - this library implements all the base functionality for NATS services for javascript clients", "dependencies": { - "@nats-io/nats-core": "3.0.0-34" + "@nats-io/nats-core": "3.0.0-35" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/services/src/service.ts b/services/src/service.ts index c6d0e1b2..f20c7807 100644 --- a/services/src/service.ts +++ b/services/src/service.ts @@ -162,7 +162,7 @@ export class ServiceGroupImpl implements ServiceGroup { } else if (parent instanceof ServiceGroupImpl) { const sg = parent as ServiceGroupImpl; this.srv = sg.srv; - if (queue === "" && sg.queue !== "") { + if (queue === undefined) { queue = sg.queue; } root = sg.subject; @@ -198,7 +198,10 @@ export class ServiceGroupImpl implements ServiceGroup { return this.srv._addEndpoint(ne); } - addGroup(name = "", queue = ""): ServiceGroup { + addGroup(name = "", queue?: string): ServiceGroup { + if (queue === undefined) { + queue = this.queue; + } return new ServiceGroupImpl(this, name, queue); } } @@ -286,13 +289,18 @@ export class ServiceImpl implements Service { ) { this.nc = nc; this.config = Object.assign({}, config); - if (!this.config.queue) { + if (this.config.queue === undefined) { this.config.queue = "q"; } + // don't allow changing metadata + config.metadata = Object.freeze(config.metadata || {}); + // this will throw if no name validateName("name", this.config.name); - validateName("queue", this.config.queue); + if (this.config.queue) { + validateName("queue", this.config.queue); + } // this will throw if not semver parseSemVer(this.config.version); diff --git a/services/src/types.ts b/services/src/types.ts index ae9cbb7e..39d6ed74 100644 --- a/services/src/types.ts +++ b/services/src/types.ts @@ -51,7 +51,8 @@ export type Endpoint = { metadata?: Record; /** * Optional queue group to run this particular endpoint in. The service's configuration - * queue configuration will be used. See {@link ServiceConfig}. + * queue configuration will be used. See {@link ServiceConfig}. Note that if the queue + * is set to an empty string, it will not be run in a queue. */ queue?: string; }; @@ -81,6 +82,8 @@ export interface ServiceGroup { * without requiring editing of the service. * Note that an optional queue can be specified, all endpoints added to * the group, will use the specified queue unless the endpoint overrides it. + * When not set, it uses the parent group configuration. An empty string + * means no queue. * see {@link EndpointOptions} and {@link ServiceConfig}. * @param subject * @param queue @@ -208,9 +211,11 @@ export type ServiceConfig = { */ metadata?: Record; /** - * Optional queue group to run the service in. By default, - * then queue name is "q". Note that this configuration will - * be the default for all endpoints and groups. + * Optional queue group to run the service in. If not set, default, + * is set to "q". Note that this configuration will + * be the default for all endpoints and groups. If set to an empty + * string, the service subscription will NOT have queue, and will + * not be run in a queue. */ queue?: string; }; diff --git a/services/tests/service_test.ts b/services/tests/service_test.ts index 1ed9058e..35f5f958 100644 --- a/services/tests/service_test.ts +++ b/services/tests/service_test.ts @@ -1047,3 +1047,61 @@ Deno.test("service - endpoint default queue group", async () => { await cleanup(ns, nc); }); + +Deno.test("service - endpoint no queue group", async () => { + const { ns, nc } = await setup(); + + const svc = new Svc(nc); + const srv = await svc.add({ + name: "example", + version: "0.0.1", + metadata: { service: "1" }, + // no queue + queue: "", + }) as ServiceImpl; + + // svc config doesn't specify a queue group so we expect q + srv.addEndpoint("a"); + checkQueueGroup(srv, "a", ""); + + // we add another group, no queue + const dg = srv.addGroup("G"); + dg.addEndpoint("a"); + checkQueueGroup(srv, "G.a", ""); + + // the above have no queue, no override, and set a queue + const g = srv.addGroup("g", "qq"); + g.addEndpoint("a"); + checkQueueGroup(srv, "g.a", "qq"); + // override + g.addEndpoint("b", { queue: "bb" }); + checkQueueGroup(srv, "g.b", "bb"); + // add a subgroup without, should inherit + const g2 = g.addGroup("g"); + g2.addEndpoint("a"); + checkQueueGroup(srv, "g.g.a", "qq"); + + await cleanup(ns, nc); +}); + +Deno.test("service - metadata is not editable", async () => { + const { ns, nc } = await setup(); + + const svc = new Svc(nc); + const srv = await svc.add({ + name: "example", + version: "0.0.1", + metadata: { service: "1", hello: "world" }, + queue: "", + }) as ServiceImpl; + + assertThrows( + () => { + srv.config.metadata!.hello = "hello"; + }, + TypeError, + "Cannot assign to read only property", + ); + + await cleanup(ns, nc); +}); diff --git a/transport-deno/deno.json b/transport-deno/deno.json index 857603ab..10907b61 100644 --- a/transport-deno/deno.json +++ b/transport-deno/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/transport-deno", - "version": "3.0.0-9", + "version": "3.0.0-10", "exports": { ".": "./src/mod.ts" }, @@ -20,7 +20,7 @@ }, "imports": { "@std/io": "jsr:@std/io@0.225.0", - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2" } diff --git a/transport-deno/src/version.ts b/transport-deno/src/version.ts index 75951750..0cc26851 100644 --- a/transport-deno/src/version.ts +++ b/transport-deno/src/version.ts @@ -1,2 +1,2 @@ // This file is generated - do not edit -export const version = "3.0.0-9"; +export const version = "3.0.0-10"; diff --git a/transport-node/package.json b/transport-node/package.json index 2e19531c..dfec3ea5 100644 --- a/transport-node/package.json +++ b/transport-node/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/transport-node", - "version": "3.0.0-20", + "version": "3.0.0-21", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", @@ -54,7 +54,7 @@ "node": ">= 18.0.0" }, "dependencies": { - "@nats-io/nats-core": "3.0.0-34", + "@nats-io/nats-core": "3.0.0-35", "@nats-io/nkeys": "1.2.0-7", "@nats-io/nuid": "2.0.1-2" }, @@ -64,8 +64,8 @@ "nats-jwt": "^0.0.9", "shx": "^0.3.3", "typescript": "5.6.3", - "@nats-io/jetstream": "3.0.0-22", - "@nats-io/kv": "3.0.0-16", - "@nats-io/obj": "3.0.0-17" + "@nats-io/jetstream": "3.0.0-23", + "@nats-io/kv": "3.0.0-17", + "@nats-io/obj": "3.0.0-18" } } diff --git a/transport-node/src/version.ts b/transport-node/src/version.ts index b98255d5..e7c5dc48 100644 --- a/transport-node/src/version.ts +++ b/transport-node/src/version.ts @@ -1,2 +1,2 @@ // This file is generated - do not edit -export const version = "3.0.0-20"; +export const version = "3.0.0-21";