Skip to content

Commit

Permalink
Staging (#44)
Browse files Browse the repository at this point in the history
* producer example bugfix

* version update

* update readme

* update readme

* add  the client ip address of a producer

* remove the IP from the msg headers

* add max msg deliveries option

* add the connection id as a header

* remove the jenkins file

* update readme file

* add missing files

* typo

* pull consumer bugfix

* defaults consumer group to consumer name

* readme update

* async listener for dlq events

* nats connection bugfix

* nats connection bugfix

* version update
  • Loading branch information
idanasulin2706 authored Jul 18, 2022
1 parent 1ccee2a commit 4a51ae5
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 74 deletions.
140 changes: 76 additions & 64 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var Memphis = /** @class */ (function () {
this.brokerConnection = null;
this.brokerManager = null;
this.brokerStats = null;
this.natsConnection = false;
this.client.on('error', function (error) {
console.error(error);
});
Expand Down Expand Up @@ -160,7 +161,6 @@ var Memphis = /** @class */ (function () {
broker_creds: connectionToken,
connection_id: _this.connectionId
}));
var connected = false;
_this.client.on('data', function (data) { return __awaiter(_this, void 0, void 0, function () {
var message, _a, _b, ex_1;
return __generator(this, function (_c) {
Expand All @@ -181,7 +181,7 @@ var Memphis = /** @class */ (function () {
}
if (message.ping_interval_ms)
this._pingServer(message.ping_interval_ms);
if (!!connected) return [3 /*break*/, 5];
if (!!this.natsConnection) return [3 /*break*/, 5];
_c.label = 1;
case 1:
_c.trys.push([1, 4, , 5]);
Expand All @@ -201,7 +201,7 @@ var Memphis = /** @class */ (function () {
return [4 /*yield*/, this.brokerManager.jetstreamManager()];
case 3:
_b.brokerStats = _c.sent();
connected = true;
this.natsConnection = true;
return [2 /*return*/, resolve()];
case 4:
ex_1 = _c.sent();
Expand Down Expand Up @@ -383,15 +383,16 @@ var Memphis = /** @class */ (function () {
* @param {Number} maxMsgDeliveries - max number of message deliveries, by default is 10
*/
Memphis.prototype.consumer = function (_a) {
var stationName = _a.stationName, consumerName = _a.consumerName, _b = _a.consumerGroup, consumerGroup = _b === void 0 ? consumerName : _b, _c = _a.pullIntervalMs, pullIntervalMs = _c === void 0 ? 1000 : _c, _d = _a.batchSize, batchSize = _d === void 0 ? 10 : _d, _e = _a.batchMaxTimeToWaitMs, batchMaxTimeToWaitMs = _e === void 0 ? 5000 : _e, _f = _a.maxAckTimeMs, maxAckTimeMs = _f === void 0 ? 30000 : _f, _g = _a.maxMsgDeliveries, maxMsgDeliveries = _g === void 0 ? 10 : _g;
var stationName = _a.stationName, consumerName = _a.consumerName, consumerGroup = _a.consumerGroup, _b = _a.pullIntervalMs, pullIntervalMs = _b === void 0 ? 1000 : _b, _c = _a.batchSize, batchSize = _c === void 0 ? 10 : _c, _d = _a.batchMaxTimeToWaitMs, batchMaxTimeToWaitMs = _d === void 0 ? 5000 : _d, _e = _a.maxAckTimeMs, maxAckTimeMs = _e === void 0 ? 30000 : _e, _f = _a.maxMsgDeliveries, maxMsgDeliveries = _f === void 0 ? 10 : _f;
return __awaiter(this, void 0, void 0, function () {
var ex_5;
return __generator(this, function (_h) {
switch (_h.label) {
return __generator(this, function (_g) {
switch (_g.label) {
case 0:
_h.trys.push([0, 2, , 3]);
_g.trys.push([0, 2, , 3]);
if (!this.isConnectionActive)
throw new Error("Connection is dead");
consumerGroup = consumerGroup || consumerName;
return [4 /*yield*/, (0, httpRequest_1.httpRequest)({
method: "POST",
url: "http://".concat(this.host, ":").concat(this.managementPort, "/api/consumers/createConsumer"),
Expand All @@ -409,10 +410,10 @@ var Memphis = /** @class */ (function () {
},
})];
case 1:
_h.sent();
_g.sent();
return [2 /*return*/, new Consumer(this, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs, maxMsgDeliveries)];
case 2:
ex_5 = _h.sent();
ex_5 = _g.sent();
throw ex_5;
case 3: return [2 /*return*/];
}
Expand Down Expand Up @@ -588,66 +589,76 @@ var Consumer = /** @class */ (function () {
durable_name: this.consumerGroup ? this.consumerGroup : this.consumerName,
ack_wait: this.maxAckTimeMs,
},
}).then(function (psub) { var psub_1, psub_1_1; return __awaiter(_this, void 0, void 0, function () {
var m, e_1_1;
}).then(function (psub) { return __awaiter(_this, void 0, void 0, function () {
var sub;
var _this = this;
var e_1, _a;
return __generator(this, function (_b) {
switch (_b.label) {
case 0:
psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs });
this.pullInterval = setInterval(function () {
if (!_this.connection.brokerManager.isClosed())
psub.pull({ batch: _this.batchSize, expires: _this.batchMaxTimeToWaitMs });
else
clearInterval(_this.pullInterval);
}, this.pullIntervalMs);
this.pingConsumerInvterval = setInterval(function () { return __awaiter(_this, void 0, void 0, function () {
return __generator(this, function (_a) {
if (!this.connection.brokerManager.isClosed()) {
this._pingConsumer();
}
else
clearInterval(this.pingConsumerInvterval);
return [2 /*return*/];
});
}); }, this.pingConsumerInvtervalMs);
_b.label = 1;
case 1:
_b.trys.push([1, 6, 7, 12]);
psub_1 = __asyncValues(psub);
_b.label = 2;
case 2: return [4 /*yield*/, psub_1.next()];
case 3:
if (!(psub_1_1 = _b.sent(), !psub_1_1.done)) return [3 /*break*/, 5];
m = psub_1_1.value;
this.eventEmitter.emit("message", new Message(m));
_b.label = 4;
case 4: return [3 /*break*/, 2];
case 5: return [3 /*break*/, 12];
case 6:
e_1_1 = _b.sent();
e_1 = { error: e_1_1 };
return [3 /*break*/, 12];
case 7:
_b.trys.push([7, , 10, 11]);
if (!(psub_1_1 && !psub_1_1.done && (_a = psub_1.return))) return [3 /*break*/, 9];
return [4 /*yield*/, _a.call(psub_1)];
case 8:
_b.sent();
_b.label = 9;
case 9: return [3 /*break*/, 11];
case 10:
if (e_1) throw e_1.error;
return [7 /*endfinally*/];
case 11: return [7 /*endfinally*/];
case 12: return [2 /*return*/];
}
return __generator(this, function (_a) {
psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs });
this.pullInterval = setInterval(function () {
if (!_this.connection.brokerManager.isClosed())
psub.pull({ batch: _this.batchSize, expires: _this.batchMaxTimeToWaitMs });
else
clearInterval(_this.pullInterval);
}, this.pullIntervalMs);
this.pingConsumerInvterval = setInterval(function () { return __awaiter(_this, void 0, void 0, function () {
return __generator(this, function (_a) {
if (!this.connection.brokerManager.isClosed()) {
this._pingConsumer();
}
else
clearInterval(this.pingConsumerInvterval);
return [2 /*return*/];
});
}); }, this.pingConsumerInvtervalMs);
sub = this.connection.brokerManager.subscribe("$memphis_dlq_".concat(this.stationName, "_").concat(this.consumerGroup), { queue: "$memphis_".concat(this.stationName, "_").concat(this.consumerGroup) });
this._handleAsyncIterableSubscriber(psub);
this._handleAsyncIterableSubscriber(sub);
return [2 /*return*/];
});
}); }).catch(function (error) { return _this.eventEmitter.emit("error", error); });
}
this.eventEmitter.on(event, cb);
};
Consumer.prototype._handleAsyncIterableSubscriber = function (iter) {
var iter_1, iter_1_1;
var e_1, _a;
return __awaiter(this, void 0, void 0, function () {
var m, e_1_1;
return __generator(this, function (_b) {
switch (_b.label) {
case 0:
_b.trys.push([0, 5, 6, 11]);
iter_1 = __asyncValues(iter);
_b.label = 1;
case 1: return [4 /*yield*/, iter_1.next()];
case 2:
if (!(iter_1_1 = _b.sent(), !iter_1_1.done)) return [3 /*break*/, 4];
m = iter_1_1.value;
this.eventEmitter.emit("message", new Message(m));
_b.label = 3;
case 3: return [3 /*break*/, 1];
case 4: return [3 /*break*/, 11];
case 5:
e_1_1 = _b.sent();
e_1 = { error: e_1_1 };
return [3 /*break*/, 11];
case 6:
_b.trys.push([6, , 9, 10]);
if (!(iter_1_1 && !iter_1_1.done && (_a = iter_1.return))) return [3 /*break*/, 8];
return [4 /*yield*/, _a.call(iter_1)];
case 7:
_b.sent();
_b.label = 8;
case 8: return [3 /*break*/, 10];
case 9:
if (e_1) throw e_1.error;
return [7 /*endfinally*/];
case 10: return [7 /*endfinally*/];
case 11: return [2 /*return*/];
}
});
});
};
Consumer.prototype._pingConsumer = function () {
return __awaiter(this, void 0, void 0, function () {
var durableName, ex_8;
Expand Down Expand Up @@ -717,7 +728,8 @@ var Message = /** @class */ (function () {
* Ack a message is done processing.
*/
Message.prototype.ack = function () {
this.message.ack();
if (this.message.ack) // for dlq events which are unackable (core NATS messages)
this.message.ack();
};
Message.prototype.getData = function () {
return this.message.data;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "memphis-dev",
"version": "0.3.3",
"version": "0.3.4",
"description": "A dev-first event processing platform",
"main": "index.js",
"scripts": {
Expand Down
28 changes: 19 additions & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Memphis {
private maxReconnect: number;
private reconnectIntervalMs: number;
private timeoutMs: number;
private natsConnection: boolean;
public brokerConnection: any;
public brokerManager: any;
public brokerStats: any;
Expand All @@ -90,8 +91,9 @@ class Memphis {
this.brokerConnection = null;
this.brokerManager = null;
this.brokerStats = null;
this.natsConnection = false;

this.client.on('error', error => {
this.client.on('error', (error: any) => {
console.error(error);
});

Expand Down Expand Up @@ -137,7 +139,6 @@ class Memphis {
broker_creds: connectionToken,
connection_id: this.connectionId
}));
let connected = false;

this.client.on('data', async data => {
let message: IMessage;
Expand All @@ -158,7 +159,7 @@ class Memphis {
if (message.ping_interval_ms)
this._pingServer(message.ping_interval_ms);

if (!connected) {
if (!this.natsConnection) {
try {
this.brokerManager = await broker.connect({
servers: `${this.host}:${this.dataPort}`,
Expand All @@ -171,7 +172,7 @@ class Memphis {

this.brokerConnection = this.brokerManager.jetstream();
this.brokerStats = await this.brokerManager.jetstreamManager();
connected = true;
this.natsConnection = true;
return resolve();
} catch (ex) {
return reject(ex);
Expand Down Expand Up @@ -330,7 +331,7 @@ class Memphis {
* @param {Number} maxAckTimeMs - max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it untill reaches the maxMsgDeliveries value
* @param {Number} maxMsgDeliveries - max number of message deliveries, by default is 10
*/
async consumer({ stationName, consumerName, consumerGroup = consumerName, pullIntervalMs = 1000, batchSize = 10,
async consumer({ stationName, consumerName, consumerGroup, pullIntervalMs = 1000, batchSize = 10,
batchMaxTimeToWaitMs = 5000, maxAckTimeMs = 30000, maxMsgDeliveries = 10 }:
{
stationName: string, consumerName: string, consumerGroup: string, pullIntervalMs: number,
Expand All @@ -340,6 +341,8 @@ class Memphis {
if (!this.isConnectionActive)
throw new Error("Connection is dead");

consumerGroup = consumerGroup || consumerName;

await httpRequest({
method: "POST",
url: `http://${this.host}:${this.managementPort}/api/consumers/createConsumer`,
Expand Down Expand Up @@ -532,15 +535,21 @@ class Consumer {
clearInterval(this.pingConsumerInvterval)
}, this.pingConsumerInvtervalMs);

for await (const m of psub) {
this.eventEmitter.emit("message", new Message(m));
}
const sub = this.connection.brokerManager.subscribe(`$memphis_dlq_${this.stationName}_${this.consumerGroup}`, { queue: `$memphis_${this.stationName}_${this.consumerGroup}` });
this._handleAsyncIterableSubscriber(psub)
this._handleAsyncIterableSubscriber(sub)
}).catch((error: any) => this.eventEmitter.emit("error", error));
}

this.eventEmitter.on(<string>event, cb);
}

private async _handleAsyncIterableSubscriber(iter: any) {
for await (const m of iter) {
this.eventEmitter.emit("message", new Message(m));
}
}

private async _pingConsumer() {
try {
const durableName = this.consumerGroup || this.consumerName;
Expand Down Expand Up @@ -585,7 +594,8 @@ class Message {
* Ack a message is done processing.
*/
ack() {
this.message.ack();
if (this.message.ack) // for dlq events which are unackable (core NATS messages)
this.message.ack();
}

getData() {
Expand Down

0 comments on commit 4a51ae5

Please sign in to comment.