Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumerStatus changed to simple type ConsumerNotification #151

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jetstream/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-29",
"version": "3.0.0-31",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down
2 changes: 1 addition & 1 deletion jetstream/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-29",
"version": "3.0.0-31",
"files": [
"lib/",
"LICENSE",
Expand Down
64 changes: 36 additions & 28 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import type {
PullOptions,
} from "./jsapi_types.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";

import type {
ConsumeMessages,
Expand All @@ -54,7 +53,7 @@ import type {
ConsumerAPI,
ConsumerCallbackFn,
ConsumerMessages,
ConsumerStatus,
ConsumerNotification,
Expires,
FetchMessages,
FetchOptions,
Expand Down Expand Up @@ -119,7 +118,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
pending: { msgs: number; bytes: number; requests: number };
isConsume: boolean;
callback: ConsumerCallbackFn | null;
listeners: QueuedIterator<ConsumerStatus>[];
listeners: QueuedIterator<ConsumerNotification>[];
statusIterator?: QueuedIteratorImpl<Status>;
abortOnMissingResource?: boolean;
bind: boolean;
Expand Down Expand Up @@ -207,8 +206,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
const status = new JetStreamStatus(msg);

if (status.isIdleHeartbeat()) {
this.notify(ConsumerDebugEvents.Heartbeat, status.parseHeartbeat());
const hb = status.parseHeartbeat();
if (hb) {
this.notify(hb);
return;
}
const code = status.code;
Expand All @@ -219,7 +219,11 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.pending.msgs -= msgsLeft;
this.pending.bytes -= bytesLeft;
this.pending.requests--;
this.notify(ConsumerDebugEvents.Discard, { msgsLeft, bytesLeft });
this.notify({
type: "discard",
messagesLeft: msgsLeft,
bytesLeft: bytesLeft,
});
} else {
// Examine the error codes
// FIXME: 408 can be a Timeout or bad request,
Expand Down Expand Up @@ -247,8 +251,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
default:
this.notify(
ConsumerDebugEvents.DebugEvent,
{ code, description },
{ type: "debug", code, description },
);
}
}
Expand Down Expand Up @@ -319,11 +322,11 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
if (idle_heartbeat) {
this.monitor = new IdleHeartbeatMonitor(
idle_heartbeat,
(data): boolean => {
(count): boolean => {
// for the pull consumer - missing heartbeats may be corrected
// on the next pull etc - the only assumption here is we should
// reset and check if the consumer was deleted from under us
this.notify(ConsumerEvents.HeartbeatsMissed, data);
this.notify({ type: "heartbeats_missed", count });
if (!this.isConsume && !this.consumer.ordered) {
// if we are not a consume, give up - this was masked by an
// external timer on fetch - the hb is a more reliable timeout
Expand Down Expand Up @@ -390,9 +393,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
handle409(status: JetStreamStatus): Error | null {
const { code, description } = status;
if (status.isConsumerDeleted()) {
this.notify(ConsumerEvents.ConsumerDeleted, { code, description });
this.notify({ type: "consumer_deleted", code, description });
} else if (status.isExceededLimit()) {
this.notify(ConsumerEvents.ExceededLimit, { code, description });
this.notify({ type: "exceeded_limits", code, description });
}
if (!this.isConsume) {
return status.toError();
Expand All @@ -410,7 +413,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
const ocs = this.consumer.orderedConsumerState!;
const { name } = this.consumer._info?.config;
if (name) {
this.notify(ConsumerDebugEvents.Reset, name);
this.notify({ type: "reset", name });
this.consumer.api.delete(this.consumer.stream, name)
.catch(() => {
// ignored
Expand All @@ -434,16 +437,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
).then((ci) => {
ocs.createFails = 0;
this.consumer._info = ci;
this.notify(ConsumerEvents.OrderedConsumerRecreated, ci.name);
this.notify({ type: "ordered_consumer_recreated", name: ci.name });
this.monitor?.restart();
this.pull(this.pullOptions());
}).catch((err) => {
ocs.createFails++;
if (err.message === "stream not found") {
this.notify(
ConsumerEvents.StreamNotFound,
ocs.createFails,
);
this.notify({
type: "stream_not_found",
consumerCreateFails: ocs.createFails,
name: this.consumer.stream,
});
if (this.abortOnMissingResource) {
this.stop(err);
return;
Expand Down Expand Up @@ -494,13 +498,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
}

notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) {
notify(n: ConsumerNotification) {
if (this.listeners.length > 0) {
(() => {
this.listeners.forEach((l) => {
const qi = l as QueuedIteratorImpl<ConsumerStatus>;
const qi = l as QueuedIteratorImpl<ConsumerNotification>;
if (!qi.done) {
qi.push({ type, data });
qi.push(n);
}
});
})();
Expand Down Expand Up @@ -549,14 +553,18 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
// game over
if ((err as Error).message === "stream not found") {
streamNotFound++;
this.notify(ConsumerEvents.StreamNotFound, streamNotFound);
this.notify({ type: "stream_not_found", name: this.consumer.stream });
if (!this.isConsume || this.abortOnMissingResource) {
this.stop(err as Error);
return false;
}
} else if ((err as Error).message === "consumer not found") {
notFound++;
this.notify(ConsumerEvents.ConsumerNotFound, notFound);
this.notify({
type: "consumer_not_found",
name: this.consumer.name,
count: notFound,
});
if (!this.isConsume || this.abortOnMissingResource) {
if (this.consumer.ordered) {
const ocs = this.consumer.orderedConsumerState!;
Expand Down Expand Up @@ -598,7 +606,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
JSON.stringify(opts),
{ reply: this.inbox },
);
this.notify(ConsumerDebugEvents.Next, opts);
this.notify({ type: "next", options: opts as PullOptions });
});
}

Expand Down Expand Up @@ -646,8 +654,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
//@ts-ignore: fn
this._push(() => {
super.stop(err);
this.listeners.forEach((n) => {
n.stop();
this.listeners.forEach((iter) => {
iter.stop();
});
});
}
Expand Down Expand Up @@ -728,8 +736,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return args;
}

status(): AsyncIterable<ConsumerStatus> {
const iter = new QueuedIteratorImpl<ConsumerStatus>();
status(): AsyncIterable<ConsumerNotification> {
const iter = new QueuedIteratorImpl<ConsumerNotification>();
this.listeners.push(iter);
return iter;
}
Expand Down
4 changes: 1 addition & 3 deletions jetstream/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

export {
AdvisoryKind,
ConsumerDebugEvents,
ConsumerEvents,
DirectMsgHeaders,
isBoundPushConsumerOptions,
isOrderedPushConsumerOptions,
Expand Down Expand Up @@ -48,8 +46,8 @@ export type {
ConsumerCallbackFn,
ConsumerKind,
ConsumerMessages,
ConsumerNotification,
Consumers,
ConsumerStatus,
DeleteableConsumer,
Destroyable,
DirectStreamAPI,
Expand Down
9 changes: 5 additions & 4 deletions jetstream/src/jserrors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/

import type { Msg } from "@nats-io/nats-core";
import { JsHeaders } from "./types.ts";
import { type Heartbeat, JsHeaders } from "./types.ts";
import type { ApiError } from "./jsapi_types.ts";

export class JetStreamNotEnabled extends Error {
Expand Down Expand Up @@ -89,14 +89,15 @@ export class JetStreamStatus {
}

parseHeartbeat():
| { natsLastConsumer: number; natsLastStream: number }
| Heartbeat
| null {
if (this.isIdleHeartbeat()) {
return {
natsLastConsumer: parseInt(
type: "heartbeat",
lastConsumerSequence: parseInt(
this.msg.headers?.get("Nats-Last-Consumer") || "0",
),
natsLastStream: parseInt(
lastStreamSequence: parseInt(
this.msg.headers?.get("Nats-Last-Stream") || "0",
),
};
Expand Down
4 changes: 1 addition & 3 deletions jetstream/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ export { jetstream, jetstreamManager } from "./internal_mod.ts";
export {
AckPolicy,
AdvisoryKind,
ConsumerDebugEvents,
ConsumerEvents,
DeliverPolicy,
DirectMsgHeaders,
DiscardPolicy,
Expand Down Expand Up @@ -56,8 +54,8 @@ export type {
ConsumerInfo,
ConsumerKind,
ConsumerMessages,
ConsumerNotification,
Consumers,
ConsumerStatus,
ConsumerUpdateConfig,
DeleteableConsumer,
DeliveryInfo,
Expand Down
49 changes: 27 additions & 22 deletions jetstream/src/pushconsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import { toJsMsg } from "./jsmsg.ts";
import type { JsMsg } from "./jsmsg.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
import type { ConsumerConfig, ConsumerInfo } from "./jsapi_types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";
import type { ConsumerNotification } from "./types.ts";

import type {
ConsumerAPI,
ConsumerCallbackFn,
ConsumerMessages,
ConsumerStatus,
PushConsumer,
PushConsumerOptions,
} from "./types.ts";
Expand Down Expand Up @@ -54,7 +53,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
consumer: PushConsumerImpl;
sub!: Subscription;
monitor: IdleHeartbeatMonitor | null;
listeners: QueuedIterator<ConsumerStatus>[];
listeners: QueuedIterator<ConsumerNotification>[];
abortOnMissingResource: boolean;
callback: ConsumerCallbackFn | null;
ordered: boolean;
Expand Down Expand Up @@ -123,11 +122,15 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
).then((ci) => {
this.createFails = 0;
this.consumer._info = ci;
this.notify(ConsumerEvents.OrderedConsumerRecreated, ci.name);
this.notify({ type: "ordered_consumer_recreated", name: ci.name });
}).catch((err) => {
this.createFails++;
if (err.message === "stream not found") {
this.notify(ConsumerEvents.StreamNotFound, this.createFails);
this.notify({
type: "stream_not_found",
name: this.consumer.stream,
consumerCreateFails: this.createFails,
});
if (this.abortOnMissingResource) {
this.stop(err);
return;
Expand Down Expand Up @@ -226,8 +229,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
}
}
status(): AsyncIterable<ConsumerStatus> {
const iter = new QueuedIteratorImpl<ConsumerStatus>();
status(): AsyncIterable<ConsumerNotification> {
const iter = new QueuedIteratorImpl<ConsumerNotification>();
this.listeners.push(iter);
return iter;
}
Expand All @@ -247,8 +250,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
const ms = millis(hbNanos);
this.monitor = new IdleHeartbeatMonitor(
ms,
(data): boolean => {
this.notify(ConsumerEvents.HeartbeatsMissed, data);
(count): boolean => {
this.notify({ type: "heartbeats_missed", count });
return false;
},
{ maxOut: 2 },
Expand Down Expand Up @@ -296,17 +299,22 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
if (status.isFlowControlRequest()) {
this._push(() => {
msg.respond();
this.notify(ConsumerDebugEvents.FlowControl, null);
this.notify({ type: "flow_control" });
});
return;
}

if (status.isIdleHeartbeat()) {
const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer");
const natsLastStream = msg.headers?.get("Nats-Last-Stream");
this.notify(ConsumerDebugEvents.Heartbeat, {
natsLastConsumer,
natsLastStream,
const lastConsumerSequence = parseInt(
msg.headers?.get("Nats-Last-Consumer") || "0",
);
const lastStreamSequence = parseInt(
msg.headers?.get("Nats-Last-Stream") ?? "0",
);
this.notify({
type: "heartbeat",
lastStreamSequence,
lastConsumerSequence,
});
return;
}
Expand All @@ -315,10 +323,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
const description = status.description;

if (status.isConsumerDeleted()) {
this.notify(
ConsumerEvents.ConsumerDeleted,
`${code} ${description}`,
);
this.notify({ type: "consumer_deleted", code, description });
}
if (this.abortOnMissingResource) {
this._push(() => {
Expand Down Expand Up @@ -354,13 +359,13 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
});
}

notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) {
notify(n: ConsumerNotification) {
if (this.listeners.length > 0) {
(() => {
this.listeners.forEach((l) => {
const qi = l as QueuedIteratorImpl<ConsumerStatus>;
const qi = l as QueuedIteratorImpl<ConsumerNotification>;
if (!qi.done) {
qi.push({ type, data });
qi.push(n);
}
});
})();
Expand Down
Loading