From 4a51ae5bb3ac82112dd2dbbe0010fb87bc66a6eb Mon Sep 17 00:00:00 2001 From: Idan Asulin <74712806+idanasulinmemphis@users.noreply.github.com> Date: Mon, 18 Jul 2022 10:41:06 +0300 Subject: [PATCH] Staging (#44) * producer example bugfix * version update * update readme * update readme * add the client ip address of a producer * remove the IP from the msg headers * add max msg deliveries option * add the connection id as a header * remove the jenkins file * update readme file * add missing files * typo * pull consumer bugfix * defaults consumer group to consumer name * readme update * async listener for dlq events * nats connection bugfix * nats connection bugfix * version update --- index.js | 140 ++++++++++++++++++++++++++++----------------------- package.json | 2 +- src/index.ts | 28 +++++++---- 3 files changed, 96 insertions(+), 74 deletions(-) diff --git a/index.js b/index.js index 416ca45..b13dcc8 100644 --- a/index.js +++ b/index.js @@ -119,6 +119,7 @@ var Memphis = /** @class */ (function () { this.brokerConnection = null; this.brokerManager = null; this.brokerStats = null; + this.natsConnection = false; this.client.on('error', function (error) { console.error(error); }); @@ -160,7 +161,6 @@ var Memphis = /** @class */ (function () { broker_creds: connectionToken, connection_id: _this.connectionId })); - var connected = false; _this.client.on('data', function (data) { return __awaiter(_this, void 0, void 0, function () { var message, _a, _b, ex_1; return __generator(this, function (_c) { @@ -181,7 +181,7 @@ var Memphis = /** @class */ (function () { } if (message.ping_interval_ms) this._pingServer(message.ping_interval_ms); - if (!!connected) return [3 /*break*/, 5]; + if (!!this.natsConnection) return [3 /*break*/, 5]; _c.label = 1; case 1: _c.trys.push([1, 4, , 5]); @@ -201,7 +201,7 @@ var Memphis = /** @class */ (function () { return [4 /*yield*/, this.brokerManager.jetstreamManager()]; case 3: _b.brokerStats = _c.sent(); - connected = true; + this.natsConnection = true; return [2 /*return*/, resolve()]; case 4: ex_1 = _c.sent(); @@ -383,15 +383,16 @@ var Memphis = /** @class */ (function () { * @param {Number} maxMsgDeliveries - max number of message deliveries, by default is 10 */ Memphis.prototype.consumer = function (_a) { - var stationName = _a.stationName, consumerName = _a.consumerName, _b = _a.consumerGroup, consumerGroup = _b === void 0 ? consumerName : _b, _c = _a.pullIntervalMs, pullIntervalMs = _c === void 0 ? 1000 : _c, _d = _a.batchSize, batchSize = _d === void 0 ? 10 : _d, _e = _a.batchMaxTimeToWaitMs, batchMaxTimeToWaitMs = _e === void 0 ? 5000 : _e, _f = _a.maxAckTimeMs, maxAckTimeMs = _f === void 0 ? 30000 : _f, _g = _a.maxMsgDeliveries, maxMsgDeliveries = _g === void 0 ? 10 : _g; + var stationName = _a.stationName, consumerName = _a.consumerName, consumerGroup = _a.consumerGroup, _b = _a.pullIntervalMs, pullIntervalMs = _b === void 0 ? 1000 : _b, _c = _a.batchSize, batchSize = _c === void 0 ? 10 : _c, _d = _a.batchMaxTimeToWaitMs, batchMaxTimeToWaitMs = _d === void 0 ? 5000 : _d, _e = _a.maxAckTimeMs, maxAckTimeMs = _e === void 0 ? 30000 : _e, _f = _a.maxMsgDeliveries, maxMsgDeliveries = _f === void 0 ? 10 : _f; return __awaiter(this, void 0, void 0, function () { var ex_5; - return __generator(this, function (_h) { - switch (_h.label) { + return __generator(this, function (_g) { + switch (_g.label) { case 0: - _h.trys.push([0, 2, , 3]); + _g.trys.push([0, 2, , 3]); if (!this.isConnectionActive) throw new Error("Connection is dead"); + consumerGroup = consumerGroup || consumerName; return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ method: "POST", url: "http://".concat(this.host, ":").concat(this.managementPort, "/api/consumers/createConsumer"), @@ -409,10 +410,10 @@ var Memphis = /** @class */ (function () { }, })]; case 1: - _h.sent(); + _g.sent(); return [2 /*return*/, new Consumer(this, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs, maxMsgDeliveries)]; case 2: - ex_5 = _h.sent(); + ex_5 = _g.sent(); throw ex_5; case 3: return [2 /*return*/]; } @@ -588,66 +589,76 @@ var Consumer = /** @class */ (function () { durable_name: this.consumerGroup ? this.consumerGroup : this.consumerName, ack_wait: this.maxAckTimeMs, }, - }).then(function (psub) { var psub_1, psub_1_1; return __awaiter(_this, void 0, void 0, function () { - var m, e_1_1; + }).then(function (psub) { return __awaiter(_this, void 0, void 0, function () { + var sub; var _this = this; - var e_1, _a; - return __generator(this, function (_b) { - switch (_b.label) { - case 0: - psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); - this.pullInterval = setInterval(function () { - if (!_this.connection.brokerManager.isClosed()) - psub.pull({ batch: _this.batchSize, expires: _this.batchMaxTimeToWaitMs }); - else - clearInterval(_this.pullInterval); - }, this.pullIntervalMs); - this.pingConsumerInvterval = setInterval(function () { return __awaiter(_this, void 0, void 0, function () { - return __generator(this, function (_a) { - if (!this.connection.brokerManager.isClosed()) { - this._pingConsumer(); - } - else - clearInterval(this.pingConsumerInvterval); - return [2 /*return*/]; - }); - }); }, this.pingConsumerInvtervalMs); - _b.label = 1; - case 1: - _b.trys.push([1, 6, 7, 12]); - psub_1 = __asyncValues(psub); - _b.label = 2; - case 2: return [4 /*yield*/, psub_1.next()]; - case 3: - if (!(psub_1_1 = _b.sent(), !psub_1_1.done)) return [3 /*break*/, 5]; - m = psub_1_1.value; - this.eventEmitter.emit("message", new Message(m)); - _b.label = 4; - case 4: return [3 /*break*/, 2]; - case 5: return [3 /*break*/, 12]; - case 6: - e_1_1 = _b.sent(); - e_1 = { error: e_1_1 }; - return [3 /*break*/, 12]; - case 7: - _b.trys.push([7, , 10, 11]); - if (!(psub_1_1 && !psub_1_1.done && (_a = psub_1.return))) return [3 /*break*/, 9]; - return [4 /*yield*/, _a.call(psub_1)]; - case 8: - _b.sent(); - _b.label = 9; - case 9: return [3 /*break*/, 11]; - case 10: - if (e_1) throw e_1.error; - return [7 /*endfinally*/]; - case 11: return [7 /*endfinally*/]; - case 12: return [2 /*return*/]; - } + return __generator(this, function (_a) { + psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); + this.pullInterval = setInterval(function () { + if (!_this.connection.brokerManager.isClosed()) + psub.pull({ batch: _this.batchSize, expires: _this.batchMaxTimeToWaitMs }); + else + clearInterval(_this.pullInterval); + }, this.pullIntervalMs); + this.pingConsumerInvterval = setInterval(function () { return __awaiter(_this, void 0, void 0, function () { + return __generator(this, function (_a) { + if (!this.connection.brokerManager.isClosed()) { + this._pingConsumer(); + } + else + clearInterval(this.pingConsumerInvterval); + return [2 /*return*/]; + }); + }); }, this.pingConsumerInvtervalMs); + sub = this.connection.brokerManager.subscribe("$memphis_dlq_".concat(this.stationName, "_").concat(this.consumerGroup), { queue: "$memphis_".concat(this.stationName, "_").concat(this.consumerGroup) }); + this._handleAsyncIterableSubscriber(psub); + this._handleAsyncIterableSubscriber(sub); + return [2 /*return*/]; }); }); }).catch(function (error) { return _this.eventEmitter.emit("error", error); }); } this.eventEmitter.on(event, cb); }; + Consumer.prototype._handleAsyncIterableSubscriber = function (iter) { + var iter_1, iter_1_1; + var e_1, _a; + return __awaiter(this, void 0, void 0, function () { + var m, e_1_1; + return __generator(this, function (_b) { + switch (_b.label) { + case 0: + _b.trys.push([0, 5, 6, 11]); + iter_1 = __asyncValues(iter); + _b.label = 1; + case 1: return [4 /*yield*/, iter_1.next()]; + case 2: + if (!(iter_1_1 = _b.sent(), !iter_1_1.done)) return [3 /*break*/, 4]; + m = iter_1_1.value; + this.eventEmitter.emit("message", new Message(m)); + _b.label = 3; + case 3: return [3 /*break*/, 1]; + case 4: return [3 /*break*/, 11]; + case 5: + e_1_1 = _b.sent(); + e_1 = { error: e_1_1 }; + return [3 /*break*/, 11]; + case 6: + _b.trys.push([6, , 9, 10]); + if (!(iter_1_1 && !iter_1_1.done && (_a = iter_1.return))) return [3 /*break*/, 8]; + return [4 /*yield*/, _a.call(iter_1)]; + case 7: + _b.sent(); + _b.label = 8; + case 8: return [3 /*break*/, 10]; + case 9: + if (e_1) throw e_1.error; + return [7 /*endfinally*/]; + case 10: return [7 /*endfinally*/]; + case 11: return [2 /*return*/]; + } + }); + }); + }; Consumer.prototype._pingConsumer = function () { return __awaiter(this, void 0, void 0, function () { var durableName, ex_8; @@ -717,7 +728,8 @@ var Message = /** @class */ (function () { * Ack a message is done processing. */ Message.prototype.ack = function () { - this.message.ack(); + if (this.message.ack) // for dlq events which are unackable (core NATS messages) + this.message.ack(); }; Message.prototype.getData = function () { return this.message.data; diff --git a/package.json b/package.json index e33ed86..00ff075 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "memphis-dev", - "version": "0.3.3", + "version": "0.3.4", "description": "A dev-first event processing platform", "main": "index.js", "scripts": { diff --git a/src/index.ts b/src/index.ts index add51da..41f0540 100644 --- a/src/index.ts +++ b/src/index.ts @@ -65,6 +65,7 @@ class Memphis { private maxReconnect: number; private reconnectIntervalMs: number; private timeoutMs: number; + private natsConnection: boolean; public brokerConnection: any; public brokerManager: any; public brokerStats: any; @@ -90,8 +91,9 @@ class Memphis { this.brokerConnection = null; this.brokerManager = null; this.brokerStats = null; + this.natsConnection = false; - this.client.on('error', error => { + this.client.on('error', (error: any) => { console.error(error); }); @@ -137,7 +139,6 @@ class Memphis { broker_creds: connectionToken, connection_id: this.connectionId })); - let connected = false; this.client.on('data', async data => { let message: IMessage; @@ -158,7 +159,7 @@ class Memphis { if (message.ping_interval_ms) this._pingServer(message.ping_interval_ms); - if (!connected) { + if (!this.natsConnection) { try { this.brokerManager = await broker.connect({ servers: `${this.host}:${this.dataPort}`, @@ -171,7 +172,7 @@ class Memphis { this.brokerConnection = this.brokerManager.jetstream(); this.brokerStats = await this.brokerManager.jetstreamManager(); - connected = true; + this.natsConnection = true; return resolve(); } catch (ex) { return reject(ex); @@ -330,7 +331,7 @@ class Memphis { * @param {Number} maxAckTimeMs - max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it untill reaches the maxMsgDeliveries value * @param {Number} maxMsgDeliveries - max number of message deliveries, by default is 10 */ - async consumer({ stationName, consumerName, consumerGroup = consumerName, pullIntervalMs = 1000, batchSize = 10, + async consumer({ stationName, consumerName, consumerGroup, pullIntervalMs = 1000, batchSize = 10, batchMaxTimeToWaitMs = 5000, maxAckTimeMs = 30000, maxMsgDeliveries = 10 }: { stationName: string, consumerName: string, consumerGroup: string, pullIntervalMs: number, @@ -340,6 +341,8 @@ class Memphis { if (!this.isConnectionActive) throw new Error("Connection is dead"); + consumerGroup = consumerGroup || consumerName; + await httpRequest({ method: "POST", url: `http://${this.host}:${this.managementPort}/api/consumers/createConsumer`, @@ -532,15 +535,21 @@ class Consumer { clearInterval(this.pingConsumerInvterval) }, this.pingConsumerInvtervalMs); - for await (const m of psub) { - this.eventEmitter.emit("message", new Message(m)); - } + const sub = this.connection.brokerManager.subscribe(`$memphis_dlq_${this.stationName}_${this.consumerGroup}`, { queue: `$memphis_${this.stationName}_${this.consumerGroup}` }); + this._handleAsyncIterableSubscriber(psub) + this._handleAsyncIterableSubscriber(sub) }).catch((error: any) => this.eventEmitter.emit("error", error)); } this.eventEmitter.on(event, cb); } + private async _handleAsyncIterableSubscriber(iter: any) { + for await (const m of iter) { + this.eventEmitter.emit("message", new Message(m)); + } + } + private async _pingConsumer() { try { const durableName = this.consumerGroup || this.consumerName; @@ -585,7 +594,8 @@ class Message { * Ack a message is done processing. */ ack() { - this.message.ack(); + if (this.message.ack) // for dlq events which are unackable (core NATS messages) + this.message.ack(); } getData() {