Skip to content

Commit

Permalink
Staging (#45)
Browse files Browse the repository at this point in the history
* producer example bugfix

* version update

* update readme

* update readme

* add  the client ip address of a producer

* remove the IP from the msg headers

* add max msg deliveries option

* add the connection id as a header

* remove the jenkins file

* update readme file

* add missing files

* typo

* pull consumer bugfix

* defaults consumer group to consumer name

* readme update

* async listener for dlq events

* nats connection bugfix

* nats connection bugfix

* version update

* close connection bugfix

* version update
  • Loading branch information
idanasulin2706 authored Jul 18, 2022
1 parent 4a51ae5 commit 18ee812
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 35 deletions.
34 changes: 17 additions & 17 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ var Memphis = /** @class */ (function () {
};
Memphis.prototype._close = function () {
var _this = this;
var _a, _b, _c, _d;
if (this.reconnect && this.reconnectAttempts < this.maxReconnect) {
this.reconnectAttempts++;
setTimeout(function () { return __awaiter(_this, void 0, void 0, function () {
Expand Down Expand Up @@ -455,11 +456,11 @@ var Memphis = /** @class */ (function () {
});
}); }, this.reconnectIntervalMs);
}
else if (this.isConnectionActive) {
this.client.removeAllListeners("data");
this.client.removeAllListeners("error");
this.client.removeAllListeners("close");
this.client.destroy();
else {
(_a = this.client) === null || _a === void 0 ? void 0 : _a.removeAllListeners("data");
(_b = this.client) === null || _b === void 0 ? void 0 : _b.removeAllListeners("error");
(_c = this.client) === null || _c === void 0 ? void 0 : _c.removeAllListeners("close");
(_d = this.client) === null || _d === void 0 ? void 0 : _d.destroy();
clearTimeout(this.accessTokenTimeout);
clearTimeout(this.pingTimeout);
this.reconnectAttempts = 0;
Expand All @@ -473,18 +474,17 @@ var Memphis = /** @class */ (function () {
*/
Memphis.prototype.close = function () {
var _this = this;
if (this.isConnectionActive) {
this.client.removeAllListeners("data");
this.client.removeAllListeners("error");
this.client.removeAllListeners("close");
this.client.destroy();
clearTimeout(this.accessTokenTimeout);
clearTimeout(this.pingTimeout);
this.reconnectAttempts = 0;
setTimeout(function () {
_this.brokerManager && _this.brokerManager.close();
}, 500);
}
var _a, _b, _c, _d;
(_a = this.client) === null || _a === void 0 ? void 0 : _a.removeAllListeners("data");
(_b = this.client) === null || _b === void 0 ? void 0 : _b.removeAllListeners("error");
(_c = this.client) === null || _c === void 0 ? void 0 : _c.removeAllListeners("close");
(_d = this.client) === null || _d === void 0 ? void 0 : _d.destroy();
clearTimeout(this.accessTokenTimeout);
clearTimeout(this.pingTimeout);
this.reconnectAttempts = 0;
setTimeout(function () {
_this.brokerManager && _this.brokerManager.close();
}, 500);
};
return Memphis;
}());
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "memphis-dev",
"version": "0.3.4",
"version": "0.3.5",
"description": "A dev-first event processing platform",
"main": "index.js",
"scripts": {
Expand Down
32 changes: 15 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ class Memphis {
}
}, this.reconnectIntervalMs);
}
else if (this.isConnectionActive) {
this.client.removeAllListeners("data");
this.client.removeAllListeners("error");
this.client.removeAllListeners("close");
this.client.destroy();
else {
this.client?.removeAllListeners("data");
this.client?.removeAllListeners("error");
this.client?.removeAllListeners("close");
this.client?.destroy();
clearTimeout(this.accessTokenTimeout);
clearTimeout(this.pingTimeout);
this.reconnectAttempts = 0;
Expand All @@ -408,18 +408,16 @@ class Memphis {
* Close Memphis connection.
*/
close() {
if (this.isConnectionActive) {
this.client.removeAllListeners("data");
this.client.removeAllListeners("error");
this.client.removeAllListeners("close");
this.client.destroy();
clearTimeout(this.accessTokenTimeout);
clearTimeout(this.pingTimeout);
this.reconnectAttempts = 0;
setTimeout(() => {
this.brokerManager && this.brokerManager.close();
}, 500);
}
this.client?.removeAllListeners("data");
this.client?.removeAllListeners("error");
this.client?.removeAllListeners("close");
this.client?.destroy();
clearTimeout(this.accessTokenTimeout);
clearTimeout(this.pingTimeout);
this.reconnectAttempts = 0;
setTimeout(() => {
this.brokerManager && this.brokerManager.close();
}, 500);
}
}

Expand Down

0 comments on commit 18ee812

Please sign in to comment.