From 1749d581f17ecec06a2919dc99d042d8d9f73578 Mon Sep 17 00:00:00 2001 From: Idan Asulin <74712806+idanasulinmemphis@users.noreply.github.com> Date: Wed, 8 Jun 2022 13:55:16 +0300 Subject: [PATCH 1/5] Update README.md --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ec9987b..215d169 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,8 @@ For Memphis node.js SDK - [Contact](#contact) ## Current SDKs -- [memphis-js](https://github.com/Memphisdev/memphis.js "Node.js") +- [memphis-js](https://github.com/Memphisdev/memphis.js 'Node.js') +- [memphis-py](https://github.com/Memphisdev/memphis.py 'Python') ## Installation From 3a103dece6a9186f709d17399ea269001d3c9c0a Mon Sep 17 00:00:00 2001 From: tgoldberg Date: Tue, 14 Jun 2022 16:01:37 +0300 Subject: [PATCH 2/5] Refactor for TS --- httpRequest.ts | 45 ++++ index.ts | 635 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 680 insertions(+) create mode 100644 httpRequest.ts create mode 100644 index.ts diff --git a/httpRequest.ts b/httpRequest.ts new file mode 100644 index 0000000..6da95ad --- /dev/null +++ b/httpRequest.ts @@ -0,0 +1,45 @@ +// Copyright 2021-2022 The Memphis Authors +// Licensed under the GNU General Public License v3.0 (the “License”); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.gnu.org/licenses/gpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an “AS IS” BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import axios from 'axios'; + +export async function httpRequest({ method, url, headers = {}, bodyParams = {}, queryParams = {}, timeout = 0 } : + {method: string, url: string, headers?: any, bodyParams?: any, queryParams?: any, timeout?: number }): Promise { + if (method !== 'GET' && method !== 'POST' && method !== 'PUT' && method !== 'DELETE') + throw { + status: 400, + message: `Invalid HTTP method` + }; + + headers['content-type'] = 'application/json'; + + try { + const response = await axios({ + method, + url, + headers, + timeout, + data: bodyParams, + params: queryParams, + maxBodyLength: 1000000000, + maxContentLength: 1000000000 + }); + const results = response.data; + return results; + } catch (ex: any) { + if (ex?.response?.data) + throw ex.response.data.message; + + throw ex; + } +}; \ No newline at end of file diff --git a/index.ts b/index.ts new file mode 100644 index 0000000..7ff3920 --- /dev/null +++ b/index.ts @@ -0,0 +1,635 @@ +// Copyright 2021-2022 The Memphis Authors +// Licensed under the GNU General Public License v3.0 (the “License”); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.gnu.org/licenses/gpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an “AS IS” BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import net from 'net'; +import events from 'events'; +import broker from 'nats'; +import { headers } from "nats"; +import { v4 as uuidv4 } from 'uuid'; +import { httpRequest } from './httpRequest'; + +interface IRetentionTypes { + MAX_MESSAGE_AGE_SECONDS: string; + MESSAGES: string; + BYTES: string; +} + +const retentionTypes: IRetentionTypes = { + MAX_MESSAGE_AGE_SECONDS: "message_age_sec", + MESSAGES: "messages", + BYTES: "bytes" +}; + +const storageTypes = { + FILE: "file", + MEMORY: "memory" +}; + +interface IConnectionData { + connection_id: string; + access_token: string; + access_token_exp: number; + ping_interval_ms: number +} + +class Memphis { + private isConnectionActive: boolean; + private connectionId: string; + public accessToken: string; + public host: string; + public managementPort: number; + private tcpPort: number; + private dataPort: number; + private username: string; + private connectionToken: string; + private accessTokenTimeout: NodeJS.Timeout; + private pingTimeout: NodeJS.Timeout; + private client: net.Socket; + private reconnectAttempts: number; + private reconnect: boolean; + private maxReconnect: number; + private reconnectIntervalMs: number; + private timeoutMs: number; + public brokerConnection: broker.JetStreamClient; + public brokerManager: broker.NatsConnection; + public brokerStats: broker.JetStreamManager; + + constructor() { + this.isConnectionActive = false; + this.managementPort = 5555; + this.tcpPort = 6666; + this.dataPort = 7766 + this.username = ""; + this.client = new net.Socket(); + this.reconnectAttempts = 0; + this.reconnect = true; + this.maxReconnect = 3; + this.reconnectIntervalMs = 200; + this.timeoutMs = 15000; + + this.client.on('error', error => { + console.error(error); + }); + + this.client.on('close', () => { + this.isConnectionActive = false; + this._close(); + }); + } + + /** + * Creates connection with Memphis. + * @param {String} host - memphis host. + * @param {Number} managementPort - management port, default is 5555. + * @param {Number} tcpPort - tcp port, default is 6666. + * @param {Number} dataPort - data port, default is 7766 . + * @param {String} username - user of type root/application. + * @param {String} connectionToken - broker token. + * @param {Boolean} reconnect - whether to do reconnect while connection is lost. + * @param {Number} maxReconnect - The reconnect attempts. + * @param {Number} reconnectIntervalMs - Interval in miliseconds between reconnect attempts. + * @param {Number} timeoutMs - connection timeout in miliseconds. + */ + connect({ host, managementPort = 5555, tcpPort = 6666, dataPort = 7766, username, connectionToken, reconnect = true, maxReconnect = 3, reconnectIntervalMs = 200, timeoutMs = 15000 }: + { + host: string, managementPort: number, tcpPort: number, dataPort: number, username: string, connectionToken: string, reconnect: boolean, maxReconnect: number, + reconnectIntervalMs: number, timeoutMs: number + }): Promise { + return new Promise((resolve, reject) => { + this.host = this._normalizeHost(host); + this.managementPort = managementPort; + this.tcpPort = tcpPort; + this.dataPort = dataPort; + this.username = username; + this.connectionToken = connectionToken; + this.reconnect = reconnect; + this.maxReconnect = maxReconnect > 9 ? 9 : maxReconnect; + this.reconnectIntervalMs = reconnectIntervalMs; + this.timeoutMs = timeoutMs; + + this.client.connect(this.tcpPort, this.host, () => { + this.client.write(JSON.stringify({ + username: username, + broker_creds: connectionToken, + connection_id: this.connectionId + })); + let connected = false; + + this.client.on('data', async data => { + let newData: IConnectionData; + try { + newData = JSON.parse(data.toString()) + } catch (ex) { + return reject(data.toString()); + } + this.connectionId = newData.connection_id; + this.isConnectionActive = true; + this.reconnectAttempts = 0; + + if (newData.access_token) { + this.accessToken = newData.access_token; + this._keepAcessTokenFresh(newData.access_token_exp); + } + + if (newData.ping_interval_ms) + this._pingServer(newData.ping_interval_ms); + + if (!connected) { + try { + this.brokerManager = await broker.connect({ + servers: `${this.host}:${this.dataPort}`, + reconnect: this.reconnect, + maxReconnectAttempts: this.reconnect ? this.maxReconnect : 0, + reconnectTimeWait: this.reconnectIntervalMs, + timeout: this.timeoutMs, + token: this.connectionToken + }); + + this.brokerConnection = this.brokerManager.jetstream(); + this.brokerStats = await this.brokerManager.jetstreamManager(); + connected = true; + return resolve(); + } catch (ex) { + return reject(ex); + } + } + }); + }); + + setTimeout(() => { + if (!reconnect || this.reconnectAttempts === maxReconnect || !this.isConnectionActive) + reject(new Error("Connection timeout has reached")); + }, timeoutMs); + }); + } + + private _normalizeHost(host: string): string { + if (host.startsWith("http://")) + return host.split("http://")[1]; + else if (host.startsWith("https://")) + return host.split("https://")[1]; + else + return host; + } + + private _keepAcessTokenFresh(expiresIn: number) { + this.accessTokenTimeout = setTimeout(() => { + if (this.isConnectionActive) + this.client.write(JSON.stringify({ + resend_access_token: true + })); + }, expiresIn) + } + + private _pingServer(interval: number) { + this.pingTimeout = setTimeout(() => { + if (this.isConnectionActive) + this.client.write(JSON.stringify({ + ping: true + })); + }, interval); + } + + /** + * Creates a factory. + * @param {String} name - factory name. + * @param {String} description - factory description (optional). + */ + async factory({ name, description = "" }: { name: string, description: string }): Promise { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + + const response = await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/factories/createFactory`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { name, description }, + }); + + return new Factory(this, response.name); + } catch (ex) { + if (typeof (ex) == "string") { + return new Factory(this, name.toLowerCase()); + } + throw ex; + } + } + + /** + * Creates a station. + * @param {String} name - station name. + * @param {String} factoryName - factory name to link the station with. + * @param {Memphis.retentionTypes} retentionType - retention type, default is MAX_MESSAGE_AGE_SECONDS. + * @param {Number} retentionValue - number which represents the retention based on the retentionType, default is 604800. + * @param {Memphis.storageTypes} storageType - persistance storage for messages of the station, default is storageTypes.FILE. + * @param {Number} replicas - number of replicas for the messages of the data, default is 1. + * @param {Boolean} dedupEnabled - whether to allow dedup mecanism, dedup happens based on message ID, default is false. + * @param {Number} dedupWindowMs - time frame in which dedup track messages, default is 0. + */ + async station({ name, factoryName, retentionType = retentionTypes.MAX_MESSAGE_AGE_SECONDS, retentionValue = 604800, + storageType = storageTypes.FILE, replicas = 1, dedupEnabled = false, dedupWindowMs = 0 }: + { + name: string, factoryName: string, retentionType: string, retentionValue: number, storageType: string, + replicas: number, dedupEnabled: boolean, dedupWindowMs: number + }): Promise { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + const response = await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/stations/createStation`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: name, + factory_name: factoryName, + retention_type: retentionType, + retention_value: retentionValue, + storage_type: storageType, + replicas: replicas, + dedup_enabled: dedupEnabled, + dedup_window_in_ms: dedupWindowMs + } + }); + + return new Station(this, response.name); + } catch (ex) { + if (typeof (ex) == "string") { + return new Station(this, name.toLowerCase()); + } + throw ex; + } + } + + /** + * Creates a producer. + * @param {String} stationName - station name to produce messages into. + * @param {String} producerName - name for the producer. + */ + async producer({ stationName, producerName }: { stationName: string, producerName: string }): Promise { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + + await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/producers/createProducer`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: producerName, + station_name: stationName, + connection_id: this.connectionId, + producer_type: "application" + }, + }); + + return new Producer(this, producerName, stationName); + } catch (ex) { + throw ex; + } + } + + /** + * Creates a consumer. + * @param {String} stationName - station name to consume messages from. + * @param {String} consumerName - name for the consumer. + * @param {String} consumerGroup - consumer group name, default is "". + * @param {Number} pullIntervalMs - interval in miliseconds between pulls, default is 1000. + * @param {Number} batchSize - pull batch size. + * @param {Number} batchMaxTimeToWaitMs - max time in miliseconds to wait between pulls, defauls is 5000. + * @param {Number} maxAckTimeMs - max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it + */ + async consumer({ stationName, consumerName, consumerGroup = "", pullIntervalMs = 1000, batchSize = 10, + batchMaxTimeToWaitMs = 5000, maxAckTimeMs = 30000 }: + { + stationName: string, consumerName: string, consumerGroup: string, pullIntervalMs: number, + batchSize: number, batchMaxTimeToWaitMs: number, maxAckTimeMs: number + }): Promise { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + + await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/consumers/createConsumer`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: consumerName, + station_name: stationName, + connection_id: this.connectionId, + consumer_type: "application", + consumers_group: consumerGroup, + max_ack_time_ms: maxAckTimeMs + }, + }); + + return new Consumer(this, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs); + } catch (ex) { + throw ex; + } + } + + private _close() { + if (this.reconnect && this.reconnectAttempts < this.maxReconnect) { + this.reconnectAttempts++; + setTimeout(async () => { + try { + await this.connect({ + host: this.host, + managementPort: this.managementPort, + tcpPort: this.tcpPort, + dataPort: this.dataPort, + username: this.username, + connectionToken: this.connectionToken, + reconnect: this.reconnect, + maxReconnect: this.maxReconnect, + reconnectIntervalMs: this.reconnectIntervalMs, + timeoutMs: this.timeoutMs + }); + console.log("Reconnect to memphis has been succeeded"); + } catch (ex) { + console.error("Failed reconnect to memphis"); + return; + } + }, this.reconnectIntervalMs); + } + else if (this.isConnectionActive) { + this.client.removeAllListeners("data"); + this.client.removeAllListeners("error"); + this.client.removeAllListeners("close"); + this.client.destroy(); + clearTimeout(this.accessTokenTimeout); + clearTimeout(this.pingTimeout); + this.reconnectAttempts = 0; + setTimeout(() => { + this.brokerManager && this.brokerManager.close(); + }, 500); + } + } + + /** + * Close Memphis connection. + */ + close() { + if (this.isConnectionActive) { + this.client.removeAllListeners("data"); + this.client.removeAllListeners("error"); + this.client.removeAllListeners("close"); + this.client.destroy(); + clearTimeout(this.accessTokenTimeout); + clearTimeout(this.pingTimeout); + this.reconnectAttempts = 0; + setTimeout(() => { + this.brokerManager && this.brokerManager.close(); + }, 500); + } + } +} + +class Producer { + private connection: Memphis; + private producerName: string; + private stationName: string; + + constructor(connection: Memphis, producerName: string, stationName: string) { + this.connection = connection; + this.producerName = producerName.toLowerCase(); + this.stationName = stationName.toLowerCase(); + } + + /** + * Produces a message into a station. + * @param {Uint8Array} message - message to send into the station. + * @param {Number} ackWaitSec - max time in seconds to wait for an ack from memphis. + */ + async produce({ message, ackWaitSec = 15 }: { message: Uint8Array, ackWaitSec: number }): Promise { + try { + const h = headers(); + h.append("producedBy", this.producerName); + await this.connection.brokerConnection.publish(`${this.stationName}.final`, message, { msgID: uuidv4(), headers: h, ackWait: ackWaitSec * 1000 * 1000000 }); + } catch (ex: any) { + if (ex.code === '503') { + throw new Error("Produce operation has failed, please check wheether Station/Producer are still exist"); + } + throw ex; + } + } + + /** + * Destroy the producer. + */ + async destroy(): Promise { + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/producers/destroyProducer`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + name: this.producerName, + station_name: this.stationName + }, + }); + } catch (_) { } + } +} + +class Consumer { + private connection: Memphis; + private stationName: string; + private consumerName: string; + private consumerGroup: string; + private pullIntervalMs: number; + private batchSize: number; + private batchMaxTimeToWaitMs: number; + private maxAckTimeMs: number; + private eventEmitter: events.EventEmitter; + private pullInterval: NodeJS.Timeout; + private pingConsumerInvtervalMs: number; + private pingConsumerInvterval: NodeJS.Timeout; + + constructor(connection: Memphis, stationName: string, consumerName: string, consumerGroup: string, pullIntervalMs: number, + batchSize: number, batchMaxTimeToWaitMs: number, maxAckTimeMs: number) { + this.connection = connection; + this.stationName = stationName.toLowerCase(); + this.consumerName = consumerName.toLowerCase(); + this.consumerGroup = consumerGroup.toLowerCase(); + this.pullIntervalMs = pullIntervalMs; + this.batchSize = batchSize; + this.batchMaxTimeToWaitMs = batchMaxTimeToWaitMs; + this.maxAckTimeMs = maxAckTimeMs; + this.eventEmitter = new events.EventEmitter(); + this.pingConsumerInvtervalMs = 30000; + + this.connection.brokerConnection.pullSubscribe(`${this.stationName}.final`, { + mack: true, + config: { + durable_name: this.consumerGroup ? this.consumerGroup : this.consumerName, + ack_wait: this.maxAckTimeMs, + }, + }).then(async psub => { + psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); + this.pullInterval = setInterval(() => { + if (!this.connection.brokerManager.isClosed()) + psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); + else + clearInterval(this.pullInterval) + }, this.pullIntervalMs); + + this.pingConsumerInvterval = setInterval(async () => { + if (!this.connection.brokerManager.isClosed()) { + this._pingConsumer() + } + else + clearInterval(this.pingConsumerInvterval) + }, this.pingConsumerInvtervalMs); + + for await (const m of psub) { + this.eventEmitter.emit("message", new Message(m)); + } + }).catch(error => this.eventEmitter.emit("error", error)); + } + + /** + * Creates an event listener. + * @param {String} event - the event to listen to. + * @param {Function} cb - a callback function. + */ + on(event: String, cb: (...args: any[]) => void) { + this.eventEmitter.on(event, cb); + } + + private async _pingConsumer() { + try { + const durableName = this.consumerGroup || this.consumerName; + await this.connection.brokerStats.consumers.info(this.stationName, durableName) + } catch (ex) { + this.eventEmitter.emit("error", "station/consumer were not found"); + } + } + + /** + * Destroy the consumer. + */ + async destroy(): Promise { + this.eventEmitter.removeAllListeners("message"); + this.eventEmitter.removeAllListeners("error"); + clearInterval(this.pullInterval); + clearInterval(this.pingConsumerInvterval); + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/consumers/destroyConsumer`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + name: this.consumerName, + station_name: this.stationName + }, + }); + } catch (_) { } + } +} + +class Message { + private message: broker.JsMsg; + + constructor(message: broker.JsMsg) { + this.message = message; + } + + /** + * Ack a message is done processing. + */ + ack() { + this.message.ack(); + } + + getData() { + return this.message.data; + } +} + +class Factory { + private connection: Memphis; + private name: string; + + constructor(connection: Memphis, name: string) { + this.connection = connection; + this.name = name.toLowerCase(); + } + + /** + * Destroy the factory. + */ + async destroy(): Promise { + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/factories/removeFactory`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + factory_name: this.name + }, + }); + } catch (ex) { + throw ex; + } + } +} + +class Station { + private connection: Memphis; + private name: string; + + constructor(connection: Memphis, name: string) { + this.connection = connection; + this.name = name.toLowerCase(); + } + + /** + * Destroy the station. + */ + async destroy(): Promise { + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/stations/removeStation`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + station_name: this.name + }, + }); + } catch (ex) { + throw ex; + } + } +} + +module.exports = new Memphis(); +module.exports.retentionTypes = retentionTypes; +module.exports.storageTypes = storageTypes; From 45ec6118840691de12c27de12f099b17b677b361 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Tue, 14 Jun 2022 17:28:45 +0300 Subject: [PATCH 3/5] typescript compile adjustments --- httpRequest.d.ts | 8 + httpRequest.js | 121 ++- index.d.ts | 1 + index.js | 1025 ++++++++++++++++---------- package.json | 12 +- src/httpRequest.js | 51 ++ httpRequest.ts => src/httpRequest.ts | 4 +- src/index.js | 586 +++++++++++++++ index.ts => src/index.ts | 56 +- tsconfig.json | 18 + 10 files changed, 1415 insertions(+), 467 deletions(-) create mode 100644 httpRequest.d.ts create mode 100644 index.d.ts create mode 100644 src/httpRequest.js rename httpRequest.ts => src/httpRequest.ts (89%) create mode 100644 src/index.js rename index.ts => src/index.ts (94%) create mode 100644 tsconfig.json diff --git a/httpRequest.d.ts b/httpRequest.d.ts new file mode 100644 index 0000000..7c83e91 --- /dev/null +++ b/httpRequest.d.ts @@ -0,0 +1,8 @@ +export declare function httpRequest({ method, url, headers, bodyParams, queryParams, timeout }: { + method: string; + url: string; + headers?: any; + bodyParams?: any; + queryParams?: any; + timeout?: number; +}): Promise; diff --git a/httpRequest.js b/httpRequest.js index 7662926..557946a 100644 --- a/httpRequest.js +++ b/httpRequest.js @@ -1,3 +1,4 @@ +"use strict"; // Copyright 2021-2022 The Memphis Authors // Licensed under the GNU General Public License v3.0 (the “License”); // you may not use this file except in compliance with the License. @@ -10,42 +11,88 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - -const axios = require('axios'); - -const httpRequest = async ({ method, url, headers = {}, bodyParams = {}, queryParams = {}, file = null, timeout = 0 }) => { - if (method !== 'GET' && method !== 'POST' && method !== 'PUT' && method !== 'DELETE') - throw { - status: 400, - message: `Invalid HTTP method`, - data: { method, url, data } - }; - - if (file) { - bodyParams.file = file; - } - - headers['content-type'] = 'application/json'; - - try { - const response = await axios({ - method, - url, - headers, - timeout, - data: bodyParams, - params: queryParams, - maxBodyLength: 1000000000, - maxContentLength: 1000000000 - }); - const results = response.data; - return results; - } catch (ex) { - if (ex?.response?.data) - throw ex.response.data.message; - - throw ex; +var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { + function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } + return new (P || (P = Promise))(function (resolve, reject) { + function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } + function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } + function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } + step((generator = generator.apply(thisArg, _arguments || [])).next()); + }); +}; +var __generator = (this && this.__generator) || function (thisArg, body) { + var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; + return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; + function verb(n) { return function (v) { return step([n, v]); }; } + function step(op) { + if (f) throw new TypeError("Generator is already executing."); + while (_) try { + if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; + if (y = 0, t) op = [op[0] & 2, t.value]; + switch (op[0]) { + case 0: case 1: t = op; break; + case 4: _.label++; return { value: op[1], done: false }; + case 5: _.label++; y = op[1]; op = [0]; continue; + case 7: op = _.ops.pop(); _.trys.pop(); continue; + default: + if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } + if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } + if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } + if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } + if (t[2]) _.ops.pop(); + _.trys.pop(); continue; + } + op = body.call(thisArg, _); + } catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } + if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; } }; - -module.exports = httpRequest; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.httpRequest = void 0; +var axios_1 = __importDefault(require("axios")); +function httpRequest(_a) { + var _b; + var method = _a.method, url = _a.url, _c = _a.headers, headers = _c === void 0 ? {} : _c, _d = _a.bodyParams, bodyParams = _d === void 0 ? {} : _d, _e = _a.queryParams, queryParams = _e === void 0 ? {} : _e, _f = _a.timeout, timeout = _f === void 0 ? 0 : _f; + return __awaiter(this, void 0, void 0, function () { + var response, results, ex_1; + return __generator(this, function (_g) { + switch (_g.label) { + case 0: + if (method !== 'GET' && method !== 'POST' && method !== 'PUT' && method !== 'DELETE') + throw { + status: 400, + message: "Invalid HTTP method" + }; + headers['content-type'] = 'application/json'; + _g.label = 1; + case 1: + _g.trys.push([1, 3, , 4]); + return [4 /*yield*/, (0, axios_1.default)({ + method: method, + url: url, + headers: headers, + timeout: timeout, + data: bodyParams, + params: queryParams, + maxBodyLength: 1000000000, + maxContentLength: 1000000000 + })]; + case 2: + response = _g.sent(); + results = response.data; + return [2 /*return*/, results]; + case 3: + ex_1 = _g.sent(); + if ((_b = ex_1 === null || ex_1 === void 0 ? void 0 : ex_1.response) === null || _b === void 0 ? void 0 : _b.data) + throw ex_1.response.data.message; + throw ex_1; + case 4: return [2 /*return*/]; + } + }); + }); +} +exports.httpRequest = httpRequest; +; diff --git a/index.d.ts b/index.d.ts new file mode 100644 index 0000000..cb0ff5c --- /dev/null +++ b/index.d.ts @@ -0,0 +1 @@ +export {}; diff --git a/index.js b/index.js index ab74641..e531536 100644 --- a/index.js +++ b/index.js @@ -1,3 +1,4 @@ +"use strict"; // Copyright 2021-2022 The Memphis Authors // Licensed under the GNU General Public License v3.0 (the “License”); // you may not use this file except in compliance with the License. @@ -10,39 +11,106 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - -const net = require('net'); -const events = require('events'); -const broker = require('nats'); -const { headers } = require("nats"); -const { v4: uuidv4 } = require('uuid'); -const httpRequest = require('./httpRequest'); - -const retentionTypes = { +var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + var desc = Object.getOwnPropertyDescriptor(m, k); + if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { + desc = { enumerable: true, get: function() { return m[k]; } }; + } + Object.defineProperty(o, k2, desc); +}) : (function(o, m, k, k2) { + if (k2 === undefined) k2 = k; + o[k2] = m[k]; +})); +var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { + Object.defineProperty(o, "default", { enumerable: true, value: v }); +}) : function(o, v) { + o["default"] = v; +}); +var __importStar = (this && this.__importStar) || function (mod) { + if (mod && mod.__esModule) return mod; + var result = {}; + if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); + __setModuleDefault(result, mod); + return result; +}; +var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { + function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } + return new (P || (P = Promise))(function (resolve, reject) { + function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } + function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } + function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } + step((generator = generator.apply(thisArg, _arguments || [])).next()); + }); +}; +var __generator = (this && this.__generator) || function (thisArg, body) { + var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; + return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; + function verb(n) { return function (v) { return step([n, v]); }; } + function step(op) { + if (f) throw new TypeError("Generator is already executing."); + while (_) try { + if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; + if (y = 0, t) op = [op[0] & 2, t.value]; + switch (op[0]) { + case 0: case 1: t = op; break; + case 4: _.label++; return { value: op[1], done: false }; + case 5: _.label++; y = op[1]; op = [0]; continue; + case 7: op = _.ops.pop(); _.trys.pop(); continue; + default: + if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } + if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } + if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } + if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } + if (t[2]) _.ops.pop(); + _.trys.pop(); continue; + } + op = body.call(thisArg, _); + } catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } + if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; + } +}; +var __asyncValues = (this && this.__asyncValues) || function (o) { + if (!Symbol.asyncIterator) throw new TypeError("Symbol.asyncIterator is not defined."); + var m = o[Symbol.asyncIterator], i; + return m ? m.call(o) : (o = typeof __values === "function" ? __values(o) : o[Symbol.iterator](), i = {}, verb("next"), verb("throw"), verb("return"), i[Symbol.asyncIterator] = function () { return this; }, i); + function verb(n) { i[n] = o[n] && function (v) { return new Promise(function (resolve, reject) { v = o[n](v), settle(resolve, reject, v.done, v.value); }); }; } + function settle(resolve, reject, d, v) { Promise.resolve(v).then(function(v) { resolve({ value: v, done: d }); }, reject); } +}; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +var net_1 = __importDefault(require("net")); +var events_1 = __importDefault(require("events")); +var broker = __importStar(require("nats")); +var nats_1 = require("nats"); +var uuid_1 = require("uuid"); +var httpRequest_1 = require("./httpRequest"); +var retentionTypes = { MAX_MESSAGE_AGE_SECONDS: "message_age_sec", MESSAGES: "messages", BYTES: "bytes" }; - -const storageTypes = { +var storageTypes = { FILE: "file", MEMORY: "memory" }; - -class Memphis { - constructor() { +var Memphis = /** @class */ (function () { + function Memphis() { + var _this = this; this.isConnectionActive = false; - this.connectionId = null; - this.accessToken = null; - this.host = null; + this.connectionId = ""; + this.accessToken = ""; + this.host = ""; this.managementPort = 5555; this.tcpPort = 6666; - this.dataPort = 7766 - this.username = null; - this.connectionToken = null; + this.dataPort = 7766; + this.username = ""; + this.connectionToken = ""; this.accessTokenTimeout = null; this.pingTimeout = null; - this.client = new net.Socket(); + this.client = new net_1.default.Socket(); this.reconnectAttempts = 0; this.reconnect = true; this.maxReconnect = 3; @@ -51,19 +119,16 @@ class Memphis { this.brokerConnection = null; this.brokerManager = null; this.brokerStats = null; - - this.client.on('error', error => { + this.client.on('error', function (error) { console.error(error); }); - - this.client.on('close', () => { - this.isConnectionActive = false; - this._close(); + this.client.on('close', function () { + _this.isConnectionActive = false; + _this._close(); }); } - /** - * Creates connection with Memphis. + * Creates connection with Memphis. * @param {String} host - memphis host. * @param {Number} managementPort - management port, default is 5555. * @param {Number} tcpPort - tcp port, default is 6666. @@ -75,131 +140,148 @@ class Memphis { * @param {Number} reconnectIntervalMs - Interval in miliseconds between reconnect attempts. * @param {Number} timeoutMs - connection timeout in miliseconds. */ - connect({ host, managementPort = 5555, tcpPort = 6666, dataPort = 7766, username, connectionToken, reconnect = true, maxReconnect = 3, reconnectIntervalMs = 200, timeoutMs = 15000 }) { - return new Promise((resolve, reject) => { - this.host = this._normalizeHost(host); - this.managementPort = managementPort; - this.tcpPort = tcpPort; - this.dataPort = dataPort; - this.username = username; - this.connectionToken = connectionToken; - this.reconnect = reconnect; - this.maxReconnect = maxReconnect > 9 ? 9 : maxReconnect; - this.reconnectIntervalMs = reconnectIntervalMs; - this.timeoutMs = timeoutMs; - - this.client.connect(this.tcpPort, this.host, () => { - this.client.write(JSON.stringify({ + Memphis.prototype.connect = function (_a) { + var _this = this; + var host = _a.host, _b = _a.managementPort, managementPort = _b === void 0 ? 5555 : _b, _c = _a.tcpPort, tcpPort = _c === void 0 ? 6666 : _c, _d = _a.dataPort, dataPort = _d === void 0 ? 7766 : _d, username = _a.username, connectionToken = _a.connectionToken, _e = _a.reconnect, reconnect = _e === void 0 ? true : _e, _f = _a.maxReconnect, maxReconnect = _f === void 0 ? 3 : _f, _g = _a.reconnectIntervalMs, reconnectIntervalMs = _g === void 0 ? 200 : _g, _h = _a.timeoutMs, timeoutMs = _h === void 0 ? 15000 : _h; + return new Promise(function (resolve, reject) { + _this.host = _this._normalizeHost(host); + _this.managementPort = managementPort; + _this.tcpPort = tcpPort; + _this.dataPort = dataPort; + _this.username = username; + _this.connectionToken = connectionToken; + _this.reconnect = reconnect; + _this.maxReconnect = maxReconnect > 9 ? 9 : maxReconnect; + _this.reconnectIntervalMs = reconnectIntervalMs; + _this.timeoutMs = timeoutMs; + _this.client.connect(_this.tcpPort, _this.host, function () { + _this.client.write(JSON.stringify({ username: username, broker_creds: connectionToken, - connection_id: this.connectionId + connection_id: _this.connectionId })); - let connected = false; - - this.client.on('data', async data => { - try { - data = JSON.parse(data.toString()) - } catch (ex) { - return reject(data.toString()); - } - this.connectionId = data.connection_id; - this.isConnectionActive = true; - this.reconnectAttempts = 0; - - if (data.access_token) { - this.accessToken = data.access_token; - this._keepAcessTokenFresh(data.access_token_exp); - } - - if (data.ping_interval_ms) - this._pingServer(data.ping_interval_ms); - - if (!connected) { - try { - this.brokerManager = await broker.connect({ - servers: `${this.host}:${this.dataPort}`, - reconnect: this.reconnect, - maxReconnectAttempts: this.reconnect ? this.maxReconnect : 0, - reconnectTimeWait: this.reconnectIntervalMs, - timeout: this.timeoutMs, - token: this.connectionToken - }); - - this.brokerConnection = this.brokerManager.jetstream(); - this.brokerStats = await this.brokerManager.jetstreamManager(); - connected = true; - return resolve(); - } catch (ex) { - return reject(ex); + var connected = false; + _this.client.on('data', function (data) { return __awaiter(_this, void 0, void 0, function () { + var message, _a, _b, ex_1; + return __generator(this, function (_c) { + switch (_c.label) { + case 0: + try { + message = JSON.parse(data.toString()); + } + catch (ex) { + return [2 /*return*/, reject(data.toString())]; + } + this.connectionId = message.connection_id; + this.isConnectionActive = true; + this.reconnectAttempts = 0; + if (message.access_token) { + this.accessToken = message.access_token; + this._keepAcessTokenFresh(message.access_token_exp); + } + if (message.ping_interval_ms) + this._pingServer(message.ping_interval_ms); + if (!!connected) return [3 /*break*/, 5]; + _c.label = 1; + case 1: + _c.trys.push([1, 4, , 5]); + _a = this; + return [4 /*yield*/, broker.connect({ + servers: "".concat(this.host, ":").concat(this.dataPort), + reconnect: this.reconnect, + maxReconnectAttempts: this.reconnect ? this.maxReconnect : 0, + reconnectTimeWait: this.reconnectIntervalMs, + timeout: this.timeoutMs, + token: this.connectionToken + })]; + case 2: + _a.brokerManager = _c.sent(); + this.brokerConnection = this.brokerManager.jetstream(); + _b = this; + return [4 /*yield*/, this.brokerManager.jetstreamManager()]; + case 3: + _b.brokerStats = _c.sent(); + connected = true; + return [2 /*return*/, resolve()]; + case 4: + ex_1 = _c.sent(); + return [2 /*return*/, reject(ex_1)]; + case 5: return [2 /*return*/]; } - } - }); + }); + }); }); }); - - setTimeout(() => { - if (!reconnect || this.reconnectAttempts === maxReconnect || !this.isConnectionActive) + setTimeout(function () { + if (!reconnect || _this.reconnectAttempts === maxReconnect || !_this.isConnectionActive) reject(new Error("Connection timeout has reached")); }, timeoutMs); }); - } - - _normalizeHost(host) { + }; + Memphis.prototype._normalizeHost = function (host) { if (host.startsWith("http://")) return host.split("http://")[1]; else if (host.startsWith("https://")) return host.split("https://")[1]; else return host; - } - - _keepAcessTokenFresh(expiresIn) { - this.accessTokenTimeout = setTimeout(() => { - if (this.isConnectionActive) - this.client.write(JSON.stringify({ + }; + Memphis.prototype._keepAcessTokenFresh = function (expiresIn) { + var _this = this; + this.accessTokenTimeout = setTimeout(function () { + if (_this.isConnectionActive) + _this.client.write(JSON.stringify({ resend_access_token: true })); - }, expiresIn) - } - - _pingServer(interval) { - this.pingTimeout = setTimeout(() => { - if (this.isConnectionActive) - this.client.write(JSON.stringify({ + }, expiresIn); + }; + Memphis.prototype._pingServer = function (interval) { + var _this = this; + this.pingTimeout = setTimeout(function () { + if (_this.isConnectionActive) + _this.client.write(JSON.stringify({ ping: true })); }, interval); - } - + }; /** - * Creates a factory. + * Creates a factory. * @param {String} name - factory name. * @param {String} description - factory description (optional). */ - async factory({ name, description = "" }) { - try { - if (!this.isConnectionActive) - throw new Error("Connection is dead"); - - const response = await httpRequest({ - method: "POST", - url: `http://${this.host}:${this.managementPort}/api/factories/createFactory`, - headers: { - Authorization: "Bearer " + this.accessToken - }, - bodyParams: { name, description }, + Memphis.prototype.factory = function (_a) { + var name = _a.name, _b = _a.description, description = _b === void 0 ? "" : _b; + return __awaiter(this, void 0, void 0, function () { + var response, ex_2; + return __generator(this, function (_c) { + switch (_c.label) { + case 0: + _c.trys.push([0, 2, , 3]); + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "POST", + url: "http://".concat(this.host, ":").concat(this.managementPort, "/api/factories/createFactory"), + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { name: name, description: description }, + })]; + case 1: + response = _c.sent(); + return [2 /*return*/, new Factory(this, response.name)]; + case 2: + ex_2 = _c.sent(); + if (typeof (ex_2) == "string") { + return [2 /*return*/, new Factory(this, name.toLowerCase())]; + } + throw ex_2; + case 3: return [2 /*return*/]; + } }); - - return new Factory(this, response.name); - } catch (ex) { - if (typeof (ex) == "string") { - return new Factory(this, name.toLowerCase()); - } - throw ex; - } - } - + }); + }; /** - * Creates a station. + * Creates a station. * @param {String} name - station name. * @param {String} factoryName - factory name to link the station with. * @param {Memphis.retentionTypes} retentionType - retention type, default is MAX_MESSAGE_AGE_SECONDS. @@ -209,70 +291,88 @@ class Memphis { * @param {Boolean} dedupEnabled - whether to allow dedup mecanism, dedup happens based on message ID, default is false. * @param {Number} dedupWindowMs - time frame in which dedup track messages, default is 0. */ - async station({ name, factoryName, retentionType = retentionTypes.MAX_MESSAGE_AGE_SECONDS, retentionValue = 604800, storageType = storageTypes.FILE, replicas = 1, dedupEnabled = false, dedupWindowMs = 0 }) { - try { - if (!this.isConnectionActive) - throw new Error("Connection is dead"); - - const response = await httpRequest({ - method: "POST", - url: `http://${this.host}:${this.managementPort}/api/stations/createStation`, - headers: { - Authorization: "Bearer " + this.accessToken - }, - bodyParams: { - name: name, - factory_name: factoryName, - retention_type: retentionType, - retention_value: retentionValue, - storage_type: storageType, - replicas: replicas, - dedup_enabled: dedupEnabled, - dedup_window_in_ms: dedupWindowMs - }, + Memphis.prototype.station = function (_a) { + var name = _a.name, factoryName = _a.factoryName, _b = _a.retentionType, retentionType = _b === void 0 ? retentionTypes.MAX_MESSAGE_AGE_SECONDS : _b, _c = _a.retentionValue, retentionValue = _c === void 0 ? 604800 : _c, _d = _a.storageType, storageType = _d === void 0 ? storageTypes.FILE : _d, _e = _a.replicas, replicas = _e === void 0 ? 1 : _e, _f = _a.dedupEnabled, dedupEnabled = _f === void 0 ? false : _f, _g = _a.dedupWindowMs, dedupWindowMs = _g === void 0 ? 0 : _g; + return __awaiter(this, void 0, void 0, function () { + var response, ex_3; + return __generator(this, function (_h) { + switch (_h.label) { + case 0: + _h.trys.push([0, 2, , 3]); + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "POST", + url: "http://".concat(this.host, ":").concat(this.managementPort, "/api/stations/createStation"), + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: name, + factory_name: factoryName, + retention_type: retentionType, + retention_value: retentionValue, + storage_type: storageType, + replicas: replicas, + dedup_enabled: dedupEnabled, + dedup_window_in_ms: dedupWindowMs + } + })]; + case 1: + response = _h.sent(); + return [2 /*return*/, new Station(this, response.name)]; + case 2: + ex_3 = _h.sent(); + if (typeof (ex_3) == "string") { + return [2 /*return*/, new Station(this, name.toLowerCase())]; + } + throw ex_3; + case 3: return [2 /*return*/]; + } }); - - return new Station(this, response.name); - } catch (ex) { - if (typeof (ex) == "string") { - return new Station(this, name.toLowerCase()); - } - throw ex; - } - } - + }); + }; /** - * Creates a producer. + * Creates a producer. * @param {String} stationName - station name to produce messages into. - * @param {Number} producerName - name for the producer. + * @param {String} producerName - name for the producer. */ - async producer({ stationName, producerName }) { - try { - if (!this.isConnectionActive) - throw new Error("Connection is dead"); - - await httpRequest({ - method: "POST", - url: `http://${this.host}:${this.managementPort}/api/producers/createProducer`, - headers: { - Authorization: "Bearer " + this.accessToken - }, - bodyParams: { - name: producerName, - station_name: stationName, - connection_id: this.connectionId, - producer_type: "application" - }, + Memphis.prototype.producer = function (_a) { + var stationName = _a.stationName, producerName = _a.producerName; + return __awaiter(this, void 0, void 0, function () { + var ex_4; + return __generator(this, function (_b) { + switch (_b.label) { + case 0: + _b.trys.push([0, 2, , 3]); + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "POST", + url: "http://".concat(this.host, ":").concat(this.managementPort, "/api/producers/createProducer"), + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: producerName, + station_name: stationName, + connection_id: this.connectionId, + producer_type: "application" + }, + })]; + case 1: + _b.sent(); + return [2 /*return*/, new Producer(this, producerName, stationName)]; + case 2: + ex_4 = _b.sent(); + throw ex_4; + case 3: return [2 /*return*/]; + } }); - - return new Producer(this, producerName, stationName); - } catch (ex) { - throw ex; - } - } - + }); + }; /** - * Creates a consumer. + * Creates a consumer. * @param {String} stationName - station name to consume messages from. * @param {String} consumerName - name for the consumer. * @param {String} consumerGroup - consumer group name, default is "". @@ -281,56 +381,76 @@ class Memphis { * @param {Number} batchMaxTimeToWaitMs - max time in miliseconds to wait between pulls, defauls is 5000. * @param {Number} maxAckTimeMs - max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it */ - async consumer({ stationName, consumerName, consumerGroup = "", pullIntervalMs = 1000, batchSize = 10, batchMaxTimeToWaitMs = 5000, maxAckTimeMs = 30000 }) { - try { - if (!this.isConnectionActive) - throw new Error("Connection is dead"); - - await httpRequest({ - method: "POST", - url: `http://${this.host}:${this.managementPort}/api/consumers/createConsumer`, - headers: { - Authorization: "Bearer " + this.accessToken - }, - bodyParams: { - name: consumerName, - station_name: stationName, - connection_id: this.connectionId, - consumer_type: "application", - consumers_group: consumerGroup, - max_ack_time_ms: maxAckTimeMs - }, + Memphis.prototype.consumer = function (_a) { + var stationName = _a.stationName, consumerName = _a.consumerName, _b = _a.consumerGroup, consumerGroup = _b === void 0 ? "" : _b, _c = _a.pullIntervalMs, pullIntervalMs = _c === void 0 ? 1000 : _c, _d = _a.batchSize, batchSize = _d === void 0 ? 10 : _d, _e = _a.batchMaxTimeToWaitMs, batchMaxTimeToWaitMs = _e === void 0 ? 5000 : _e, _f = _a.maxAckTimeMs, maxAckTimeMs = _f === void 0 ? 30000 : _f; + return __awaiter(this, void 0, void 0, function () { + var ex_5; + return __generator(this, function (_g) { + switch (_g.label) { + case 0: + _g.trys.push([0, 2, , 3]); + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "POST", + url: "http://".concat(this.host, ":").concat(this.managementPort, "/api/consumers/createConsumer"), + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: consumerName, + station_name: stationName, + connection_id: this.connectionId, + consumer_type: "application", + consumers_group: consumerGroup, + max_ack_time_ms: maxAckTimeMs + }, + })]; + case 1: + _g.sent(); + return [2 /*return*/, new Consumer(this, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs)]; + case 2: + ex_5 = _g.sent(); + throw ex_5; + case 3: return [2 /*return*/]; + } }); - - return new Consumer(this, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs); - } catch (ex) { - throw ex; - } - } - - _close() { + }); + }; + Memphis.prototype._close = function () { + var _this = this; if (this.reconnect && this.reconnectAttempts < this.maxReconnect) { this.reconnectAttempts++; - setTimeout(async () => { - try { - await this.connect({ - host: this.host, - managementPort: this.managementPort, - tcpPort: this.tcpPort, - dataPort: this.dataPort, - username: this.username, - connectionToken: this.connectionToken, - reconnect: this.reconnect, - maxReconnect: this.maxReconnect, - reconnectIntervalMs: this.reconnectIntervalMs, - timeoutMs: this.timeoutMs - }); - console.log("Reconnect to memphis has been succeeded"); - } catch (ex) { - console.error("Failed reconnect to memphis"); - return; - } - }, this.reconnectIntervalMs); + setTimeout(function () { return __awaiter(_this, void 0, void 0, function () { + var ex_6; + return __generator(this, function (_a) { + switch (_a.label) { + case 0: + _a.trys.push([0, 2, , 3]); + return [4 /*yield*/, this.connect({ + host: this.host, + managementPort: this.managementPort, + tcpPort: this.tcpPort, + dataPort: this.dataPort, + username: this.username, + connectionToken: this.connectionToken, + reconnect: this.reconnect, + maxReconnect: this.maxReconnect, + reconnectIntervalMs: this.reconnectIntervalMs, + timeoutMs: this.timeoutMs + })]; + case 1: + _a.sent(); + console.log("Reconnect to memphis has been succeeded"); + return [3 /*break*/, 3]; + case 2: + ex_6 = _a.sent(); + console.error("Failed reconnect to memphis"); + return [2 /*return*/]; + case 3: return [2 /*return*/]; + } + }); + }); }, this.reconnectIntervalMs); } else if (this.isConnectionActive) { this.client.removeAllListeners("data"); @@ -339,22 +459,17 @@ class Memphis { this.client.destroy(); clearTimeout(this.accessTokenTimeout); clearTimeout(this.pingTimeout); - this.accessToken = null; - this.connectionId = null; - this.isConnectionActive = false; - this.accessTokenTimeout = null; - this.pingTimeout = null; this.reconnectAttempts = 0; - setTimeout(() => { - this.brokerManager && this.brokerManager.close(); + setTimeout(function () { + _this.brokerManager && _this.brokerManager.close(); }, 500); } - } - + }; /** - * Close Memphis connection. + * Close Memphis connection. */ - close() { + Memphis.prototype.close = function () { + var _this = this; if (this.isConnectionActive) { this.client.removeAllListeners("data"); this.client.removeAllListeners("error"); @@ -362,68 +477,87 @@ class Memphis { this.client.destroy(); clearTimeout(this.accessTokenTimeout); clearTimeout(this.pingTimeout); - this.accessToken = null; - this.connectionId = null; - this.isConnectionActive = false; - this.accessTokenTimeout = null; - this.pingTimeout = null; this.reconnectAttempts = 0; - setTimeout(() => { - this.brokerManager && this.brokerManager.close(); + setTimeout(function () { + _this.brokerManager && _this.brokerManager.close(); }, 500); } - } -} - -class Producer { - constructor(connection, producerName, stationName) { + }; + return Memphis; +}()); +var Producer = /** @class */ (function () { + function Producer(connection, producerName, stationName) { this.connection = connection; this.producerName = producerName.toLowerCase(); this.stationName = stationName.toLowerCase(); } - /** - * Produces a message into a station. + * Produces a message into a station. * @param {Uint8Array} message - message to send into the station. * @param {Number} ackWaitSec - max time in seconds to wait for an ack from memphis. */ - async produce({ message, ackWaitSec = 15 }) { - try { - const h = headers(); - h.append("producedBy", this.producerName); - await this.connection.brokerConnection.publish(`${this.stationName}.final`, message, { msgID: uuidv4(), headers: h, ackWait: ackWaitSec * 1000 * 1000000 }); - } catch (ex) { - if (ex.code === '503') { - throw new Error("Produce operation has failed, please check wheether Station/Producer are still exist"); - } - throw ex; - } - } - + Producer.prototype.produce = function (_a) { + var message = _a.message, _b = _a.ackWaitSec, ackWaitSec = _b === void 0 ? 15 : _b; + return __awaiter(this, void 0, void 0, function () { + var h, ex_7; + return __generator(this, function (_c) { + switch (_c.label) { + case 0: + _c.trys.push([0, 2, , 3]); + h = (0, nats_1.headers)(); + h.append("producedBy", this.producerName); + return [4 /*yield*/, this.connection.brokerConnection.publish("".concat(this.stationName, ".final"), message, { msgID: (0, uuid_1.v4)(), headers: h, ackWait: ackWaitSec * 1000 * 1000000 })]; + case 1: + _c.sent(); + return [3 /*break*/, 3]; + case 2: + ex_7 = _c.sent(); + if (ex_7.code === '503') { + throw new Error("Produce operation has failed, please check wheether Station/Producer are still exist"); + } + throw ex_7; + case 3: return [2 /*return*/]; + } + }); + }); + }; /** - * Destroy the producer. + * Destroy the producer. */ - async destroy() { - try { - await httpRequest({ - method: "DELETE", - url: `http://${this.connection.host}:${this.connection.managementPort}/api/producers/destroyProducer`, - headers: { - Authorization: "Bearer " + this.connection.accessToken - }, - bodyParams: { - name: this.producerName, - station_name: this.stationName - }, + Producer.prototype.destroy = function () { + return __awaiter(this, void 0, void 0, function () { + var _1; + return __generator(this, function (_a) { + switch (_a.label) { + case 0: + _a.trys.push([0, 2, , 3]); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "DELETE", + url: "http://".concat(this.connection.host, ":").concat(this.connection.managementPort, "/api/producers/destroyProducer"), + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + name: this.producerName, + station_name: this.stationName + }, + })]; + case 1: + _a.sent(); + return [3 /*break*/, 3]; + case 2: + _1 = _a.sent(); + return [3 /*break*/, 3]; + case 3: return [2 /*return*/]; + } }); - } catch (ex) { - return; // ignoring unsuccessful destroy calls - } - } -} - -class Consumer { - constructor(connection, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs) { + }); + }; + return Producer; +}()); +var Consumer = /** @class */ (function () { + function Consumer(connection, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs) { + var _this = this; this.connection = connection; this.stationName = stationName.toLowerCase(); this.consumerName = consumerName.toLowerCase(); @@ -432,155 +566,234 @@ class Consumer { this.batchSize = batchSize; this.batchMaxTimeToWaitMs = batchMaxTimeToWaitMs; this.maxAckTimeMs = maxAckTimeMs; - this.eventEmitter = new events.EventEmitter(); + this.eventEmitter = new events_1.default.EventEmitter(); this.pullInterval = null; this.pingConsumerInvtervalMs = 30000; this.pingConsumerInvterval = null; - - this.connection.brokerConnection.pullSubscribe(`${this.stationName}.final`, { + this.connection.brokerConnection.pullSubscribe("".concat(this.stationName, ".final"), { mack: true, config: { durable_name: this.consumerGroup ? this.consumerGroup : this.consumerName, ack_wait: this.maxAckTimeMs, }, - }).then(async psub => { - psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); - this.pullInterval = setInterval(() => { - if (!this.connection.brokerManager.isClosed()) - psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); - else - clearInterval(this.pullInterval) - }, this.pullIntervalMs); - - this.pingConsumerInvterval = setInterval(async () => { - if (!this.connection.brokerManager.isClosed()) { - this._pingConsumer() + }).then(function (psub) { var psub_1, psub_1_1; return __awaiter(_this, void 0, void 0, function () { + var m, e_1_1; + var _this = this; + var e_1, _a; + return __generator(this, function (_b) { + switch (_b.label) { + case 0: + psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); + this.pullInterval = setInterval(function () { + if (!_this.connection.brokerManager.isClosed()) + psub.pull({ batch: _this.batchSize, expires: _this.batchMaxTimeToWaitMs }); + else + clearInterval(_this.pullInterval); + }, this.pullIntervalMs); + this.pingConsumerInvterval = setInterval(function () { return __awaiter(_this, void 0, void 0, function () { + return __generator(this, function (_a) { + if (!this.connection.brokerManager.isClosed()) { + this._pingConsumer(); + } + else + clearInterval(this.pingConsumerInvterval); + return [2 /*return*/]; + }); + }); }, this.pingConsumerInvtervalMs); + _b.label = 1; + case 1: + _b.trys.push([1, 6, 7, 12]); + psub_1 = __asyncValues(psub); + _b.label = 2; + case 2: return [4 /*yield*/, psub_1.next()]; + case 3: + if (!(psub_1_1 = _b.sent(), !psub_1_1.done)) return [3 /*break*/, 5]; + m = psub_1_1.value; + this.eventEmitter.emit("message", new Message(m)); + _b.label = 4; + case 4: return [3 /*break*/, 2]; + case 5: return [3 /*break*/, 12]; + case 6: + e_1_1 = _b.sent(); + e_1 = { error: e_1_1 }; + return [3 /*break*/, 12]; + case 7: + _b.trys.push([7, , 10, 11]); + if (!(psub_1_1 && !psub_1_1.done && (_a = psub_1.return))) return [3 /*break*/, 9]; + return [4 /*yield*/, _a.call(psub_1)]; + case 8: + _b.sent(); + _b.label = 9; + case 9: return [3 /*break*/, 11]; + case 10: + if (e_1) throw e_1.error; + return [7 /*endfinally*/]; + case 11: return [7 /*endfinally*/]; + case 12: return [2 /*return*/]; } - else - clearInterval(this.pingConsumerInvterval) - }, this.pingConsumerInvtervalMs); - - for await (const m of psub) { - this.eventEmitter.emit("message", new Message(m)); - } - }).catch(error => this.eventEmitter.emit("error", error)); + }); + }); }).catch(function (error) { return _this.eventEmitter.emit("error", error); }); } - /** - * Creates an event listener. + * Creates an event listener. * @param {String} event - the event to listen to. * @param {Function} cb - a callback function. */ - on(event, cb) { + Consumer.prototype.on = function (event, cb) { this.eventEmitter.on(event, cb); - } - - async _pingConsumer() { - try { - const durableName = this.consumerGroup || this.consumerName; - await this.connection.brokerStats.consumers.info(this.stationName, durableName) - } catch (ex) { - this.eventEmitter.emit("error", "station/consumer were not found"); - } - } - + }; + Consumer.prototype._pingConsumer = function () { + return __awaiter(this, void 0, void 0, function () { + var durableName, ex_8; + return __generator(this, function (_a) { + switch (_a.label) { + case 0: + _a.trys.push([0, 2, , 3]); + durableName = this.consumerGroup || this.consumerName; + return [4 /*yield*/, this.connection.brokerStats.consumers.info(this.stationName, durableName)]; + case 1: + _a.sent(); + return [3 /*break*/, 3]; + case 2: + ex_8 = _a.sent(); + this.eventEmitter.emit("error", "station/consumer were not found"); + return [3 /*break*/, 3]; + case 3: return [2 /*return*/]; + } + }); + }); + }; /** - * Destroy the consumer. + * Destroy the consumer. */ - async destroy() { - this.eventEmitter.removeAllListeners("message"); - this.eventEmitter.removeAllListeners("error"); - clearInterval(this.pullInterval); - clearInterval(this.pingConsumerInvterval); - try { - await httpRequest({ - method: "DELETE", - url: `http://${this.connection.host}:${this.connection.managementPort}/api/consumers/destroyConsumer`, - headers: { - Authorization: "Bearer " + this.connection.accessToken - }, - bodyParams: { - name: this.consumerName, - station_name: this.stationName - }, + Consumer.prototype.destroy = function () { + return __awaiter(this, void 0, void 0, function () { + var _2; + return __generator(this, function (_a) { + switch (_a.label) { + case 0: + this.eventEmitter.removeAllListeners("message"); + this.eventEmitter.removeAllListeners("error"); + clearInterval(this.pullInterval); + clearInterval(this.pingConsumerInvterval); + _a.label = 1; + case 1: + _a.trys.push([1, 3, , 4]); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "DELETE", + url: "http://".concat(this.connection.host, ":").concat(this.connection.managementPort, "/api/consumers/destroyConsumer"), + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + name: this.consumerName, + station_name: this.stationName + }, + })]; + case 2: + _a.sent(); + return [3 /*break*/, 4]; + case 3: + _2 = _a.sent(); + return [3 /*break*/, 4]; + case 4: return [2 /*return*/]; + } }); - } catch (ex) { - return; // ignoring unsuccessful destroy calls - } - } -} - -class Message { - constructor(message) { + }); + }; + return Consumer; +}()); +var Message = /** @class */ (function () { + function Message(message) { this.message = message; } - /** - * Ack a message is done processing. + * Ack a message is done processing. */ - ack() { + Message.prototype.ack = function () { this.message.ack(); - } - - getData() { + }; + Message.prototype.getData = function () { return this.message.data; - } -} - -class Factory { - constructor(connection, name) { + }; + return Message; +}()); +var Factory = /** @class */ (function () { + function Factory(connection, name) { this.connection = connection; this.name = name.toLowerCase(); } - /** - * Destroy the factory. + * Destroy the factory. */ - async destroy() { - try { - await httpRequest({ - method: "DELETE", - url: `http://${this.connection.host}:${this.connection.managementPort}/api/factories/removeFactory`, - headers: { - Authorization: "Bearer " + this.connection.accessToken - }, - bodyParams: { - factory_name: this.name - }, + Factory.prototype.destroy = function () { + return __awaiter(this, void 0, void 0, function () { + var ex_9; + return __generator(this, function (_a) { + switch (_a.label) { + case 0: + _a.trys.push([0, 2, , 3]); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "DELETE", + url: "http://".concat(this.connection.host, ":").concat(this.connection.managementPort, "/api/factories/removeFactory"), + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + factory_name: this.name + }, + })]; + case 1: + _a.sent(); + return [3 /*break*/, 3]; + case 2: + ex_9 = _a.sent(); + throw ex_9; + case 3: return [2 /*return*/]; + } }); - } catch (ex) { - throw ex; - } - } -} - -class Station { - constructor(connection, name) { + }); + }; + return Factory; +}()); +var Station = /** @class */ (function () { + function Station(connection, name) { this.connection = connection; this.name = name.toLowerCase(); } - /** - * Destroy the station. + * Destroy the station. */ - async destroy() { - try { - await httpRequest({ - method: "DELETE", - url: `http://${this.connection.host}:${this.connection.managementPort}/api/stations/removeStation`, - headers: { - Authorization: "Bearer " + this.connection.accessToken - }, - bodyParams: { - station_name: this.name - }, + Station.prototype.destroy = function () { + return __awaiter(this, void 0, void 0, function () { + var ex_10; + return __generator(this, function (_a) { + switch (_a.label) { + case 0: + _a.trys.push([0, 2, , 3]); + return [4 /*yield*/, (0, httpRequest_1.httpRequest)({ + method: "DELETE", + url: "http://".concat(this.connection.host, ":").concat(this.connection.managementPort, "/api/stations/removeStation"), + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + station_name: this.name + }, + })]; + case 1: + _a.sent(); + return [3 /*break*/, 3]; + case 2: + ex_10 = _a.sent(); + throw ex_10; + case 3: return [2 /*return*/]; + } }); - } catch (ex) { - throw ex; - } - } -} - + }); + }; + return Station; +}()); module.exports = new Memphis(); module.exports.retentionTypes = retentionTypes; module.exports.storageTypes = storageTypes; diff --git a/package.json b/package.json index 6e651b3..5fbff6a 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,11 @@ { "name": "memphis-dev", - "version": "0.3.1", + "version": "0.3.2", "description": "A dev-first event processing platform", "main": "index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "test": "echo \"Error: no test specified\" && exit 1", + "build": "tsc" }, "author": "", "license": "ISC", @@ -17,5 +18,12 @@ "axios": "^0.26.1", "nats": "^2.6.1", "uuid": "^8.3.2" + }, + "devDependencies": { + "@tsconfig/node16": "^1.0.3", + "@types/axios": "^0.14.0", + "@types/node": "^17.0.42", + "@types/uuid": "^8.3.4", + "typescript": "^4.7.3" } } diff --git a/src/httpRequest.js b/src/httpRequest.js new file mode 100644 index 0000000..7662926 --- /dev/null +++ b/src/httpRequest.js @@ -0,0 +1,51 @@ +// Copyright 2021-2022 The Memphis Authors +// Licensed under the GNU General Public License v3.0 (the “License”); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.gnu.org/licenses/gpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an “AS IS” BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +const axios = require('axios'); + +const httpRequest = async ({ method, url, headers = {}, bodyParams = {}, queryParams = {}, file = null, timeout = 0 }) => { + if (method !== 'GET' && method !== 'POST' && method !== 'PUT' && method !== 'DELETE') + throw { + status: 400, + message: `Invalid HTTP method`, + data: { method, url, data } + }; + + if (file) { + bodyParams.file = file; + } + + headers['content-type'] = 'application/json'; + + try { + const response = await axios({ + method, + url, + headers, + timeout, + data: bodyParams, + params: queryParams, + maxBodyLength: 1000000000, + maxContentLength: 1000000000 + }); + const results = response.data; + return results; + } catch (ex) { + if (ex?.response?.data) + throw ex.response.data.message; + + throw ex; + } +}; + +module.exports = httpRequest; diff --git a/httpRequest.ts b/src/httpRequest.ts similarity index 89% rename from httpRequest.ts rename to src/httpRequest.ts index 6da95ad..d4ded8d 100644 --- a/httpRequest.ts +++ b/src/httpRequest.ts @@ -13,8 +13,8 @@ import axios from 'axios'; -export async function httpRequest({ method, url, headers = {}, bodyParams = {}, queryParams = {}, timeout = 0 } : - {method: string, url: string, headers?: any, bodyParams?: any, queryParams?: any, timeout?: number }): Promise { +export async function httpRequest({ method, url, headers = {}, bodyParams = {}, queryParams = {}, timeout = 0 }: + { method: string, url: string, headers?: any, bodyParams?: any, queryParams?: any, timeout?: number }): Promise { if (method !== 'GET' && method !== 'POST' && method !== 'PUT' && method !== 'DELETE') throw { status: 400, diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..ab74641 --- /dev/null +++ b/src/index.js @@ -0,0 +1,586 @@ +// Copyright 2021-2022 The Memphis Authors +// Licensed under the GNU General Public License v3.0 (the “License”); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.gnu.org/licenses/gpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an “AS IS” BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +const net = require('net'); +const events = require('events'); +const broker = require('nats'); +const { headers } = require("nats"); +const { v4: uuidv4 } = require('uuid'); +const httpRequest = require('./httpRequest'); + +const retentionTypes = { + MAX_MESSAGE_AGE_SECONDS: "message_age_sec", + MESSAGES: "messages", + BYTES: "bytes" +}; + +const storageTypes = { + FILE: "file", + MEMORY: "memory" +}; + +class Memphis { + constructor() { + this.isConnectionActive = false; + this.connectionId = null; + this.accessToken = null; + this.host = null; + this.managementPort = 5555; + this.tcpPort = 6666; + this.dataPort = 7766 + this.username = null; + this.connectionToken = null; + this.accessTokenTimeout = null; + this.pingTimeout = null; + this.client = new net.Socket(); + this.reconnectAttempts = 0; + this.reconnect = true; + this.maxReconnect = 3; + this.reconnectIntervalMs = 200; + this.timeoutMs = 15000; + this.brokerConnection = null; + this.brokerManager = null; + this.brokerStats = null; + + this.client.on('error', error => { + console.error(error); + }); + + this.client.on('close', () => { + this.isConnectionActive = false; + this._close(); + }); + } + + /** + * Creates connection with Memphis. + * @param {String} host - memphis host. + * @param {Number} managementPort - management port, default is 5555. + * @param {Number} tcpPort - tcp port, default is 6666. + * @param {Number} dataPort - data port, default is 7766 . + * @param {String} username - user of type root/application. + * @param {String} connectionToken - broker token. + * @param {Boolean} reconnect - whether to do reconnect while connection is lost. + * @param {Number} maxReconnect - The reconnect attempts. + * @param {Number} reconnectIntervalMs - Interval in miliseconds between reconnect attempts. + * @param {Number} timeoutMs - connection timeout in miliseconds. + */ + connect({ host, managementPort = 5555, tcpPort = 6666, dataPort = 7766, username, connectionToken, reconnect = true, maxReconnect = 3, reconnectIntervalMs = 200, timeoutMs = 15000 }) { + return new Promise((resolve, reject) => { + this.host = this._normalizeHost(host); + this.managementPort = managementPort; + this.tcpPort = tcpPort; + this.dataPort = dataPort; + this.username = username; + this.connectionToken = connectionToken; + this.reconnect = reconnect; + this.maxReconnect = maxReconnect > 9 ? 9 : maxReconnect; + this.reconnectIntervalMs = reconnectIntervalMs; + this.timeoutMs = timeoutMs; + + this.client.connect(this.tcpPort, this.host, () => { + this.client.write(JSON.stringify({ + username: username, + broker_creds: connectionToken, + connection_id: this.connectionId + })); + let connected = false; + + this.client.on('data', async data => { + try { + data = JSON.parse(data.toString()) + } catch (ex) { + return reject(data.toString()); + } + this.connectionId = data.connection_id; + this.isConnectionActive = true; + this.reconnectAttempts = 0; + + if (data.access_token) { + this.accessToken = data.access_token; + this._keepAcessTokenFresh(data.access_token_exp); + } + + if (data.ping_interval_ms) + this._pingServer(data.ping_interval_ms); + + if (!connected) { + try { + this.brokerManager = await broker.connect({ + servers: `${this.host}:${this.dataPort}`, + reconnect: this.reconnect, + maxReconnectAttempts: this.reconnect ? this.maxReconnect : 0, + reconnectTimeWait: this.reconnectIntervalMs, + timeout: this.timeoutMs, + token: this.connectionToken + }); + + this.brokerConnection = this.brokerManager.jetstream(); + this.brokerStats = await this.brokerManager.jetstreamManager(); + connected = true; + return resolve(); + } catch (ex) { + return reject(ex); + } + } + }); + }); + + setTimeout(() => { + if (!reconnect || this.reconnectAttempts === maxReconnect || !this.isConnectionActive) + reject(new Error("Connection timeout has reached")); + }, timeoutMs); + }); + } + + _normalizeHost(host) { + if (host.startsWith("http://")) + return host.split("http://")[1]; + else if (host.startsWith("https://")) + return host.split("https://")[1]; + else + return host; + } + + _keepAcessTokenFresh(expiresIn) { + this.accessTokenTimeout = setTimeout(() => { + if (this.isConnectionActive) + this.client.write(JSON.stringify({ + resend_access_token: true + })); + }, expiresIn) + } + + _pingServer(interval) { + this.pingTimeout = setTimeout(() => { + if (this.isConnectionActive) + this.client.write(JSON.stringify({ + ping: true + })); + }, interval); + } + + /** + * Creates a factory. + * @param {String} name - factory name. + * @param {String} description - factory description (optional). + */ + async factory({ name, description = "" }) { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + + const response = await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/factories/createFactory`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { name, description }, + }); + + return new Factory(this, response.name); + } catch (ex) { + if (typeof (ex) == "string") { + return new Factory(this, name.toLowerCase()); + } + throw ex; + } + } + + /** + * Creates a station. + * @param {String} name - station name. + * @param {String} factoryName - factory name to link the station with. + * @param {Memphis.retentionTypes} retentionType - retention type, default is MAX_MESSAGE_AGE_SECONDS. + * @param {Number} retentionValue - number which represents the retention based on the retentionType, default is 604800. + * @param {Memphis.storageTypes} storageType - persistance storage for messages of the station, default is storageTypes.FILE. + * @param {Number} replicas - number of replicas for the messages of the data, default is 1. + * @param {Boolean} dedupEnabled - whether to allow dedup mecanism, dedup happens based on message ID, default is false. + * @param {Number} dedupWindowMs - time frame in which dedup track messages, default is 0. + */ + async station({ name, factoryName, retentionType = retentionTypes.MAX_MESSAGE_AGE_SECONDS, retentionValue = 604800, storageType = storageTypes.FILE, replicas = 1, dedupEnabled = false, dedupWindowMs = 0 }) { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + + const response = await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/stations/createStation`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: name, + factory_name: factoryName, + retention_type: retentionType, + retention_value: retentionValue, + storage_type: storageType, + replicas: replicas, + dedup_enabled: dedupEnabled, + dedup_window_in_ms: dedupWindowMs + }, + }); + + return new Station(this, response.name); + } catch (ex) { + if (typeof (ex) == "string") { + return new Station(this, name.toLowerCase()); + } + throw ex; + } + } + + /** + * Creates a producer. + * @param {String} stationName - station name to produce messages into. + * @param {Number} producerName - name for the producer. + */ + async producer({ stationName, producerName }) { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + + await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/producers/createProducer`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: producerName, + station_name: stationName, + connection_id: this.connectionId, + producer_type: "application" + }, + }); + + return new Producer(this, producerName, stationName); + } catch (ex) { + throw ex; + } + } + + /** + * Creates a consumer. + * @param {String} stationName - station name to consume messages from. + * @param {String} consumerName - name for the consumer. + * @param {String} consumerGroup - consumer group name, default is "". + * @param {Number} pullIntervalMs - interval in miliseconds between pulls, default is 1000. + * @param {Number} batchSize - pull batch size. + * @param {Number} batchMaxTimeToWaitMs - max time in miliseconds to wait between pulls, defauls is 5000. + * @param {Number} maxAckTimeMs - max time for ack a message in miliseconds, in case a message not acked in this time period the Memphis broker will resend it + */ + async consumer({ stationName, consumerName, consumerGroup = "", pullIntervalMs = 1000, batchSize = 10, batchMaxTimeToWaitMs = 5000, maxAckTimeMs = 30000 }) { + try { + if (!this.isConnectionActive) + throw new Error("Connection is dead"); + + await httpRequest({ + method: "POST", + url: `http://${this.host}:${this.managementPort}/api/consumers/createConsumer`, + headers: { + Authorization: "Bearer " + this.accessToken + }, + bodyParams: { + name: consumerName, + station_name: stationName, + connection_id: this.connectionId, + consumer_type: "application", + consumers_group: consumerGroup, + max_ack_time_ms: maxAckTimeMs + }, + }); + + return new Consumer(this, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs); + } catch (ex) { + throw ex; + } + } + + _close() { + if (this.reconnect && this.reconnectAttempts < this.maxReconnect) { + this.reconnectAttempts++; + setTimeout(async () => { + try { + await this.connect({ + host: this.host, + managementPort: this.managementPort, + tcpPort: this.tcpPort, + dataPort: this.dataPort, + username: this.username, + connectionToken: this.connectionToken, + reconnect: this.reconnect, + maxReconnect: this.maxReconnect, + reconnectIntervalMs: this.reconnectIntervalMs, + timeoutMs: this.timeoutMs + }); + console.log("Reconnect to memphis has been succeeded"); + } catch (ex) { + console.error("Failed reconnect to memphis"); + return; + } + }, this.reconnectIntervalMs); + } + else if (this.isConnectionActive) { + this.client.removeAllListeners("data"); + this.client.removeAllListeners("error"); + this.client.removeAllListeners("close"); + this.client.destroy(); + clearTimeout(this.accessTokenTimeout); + clearTimeout(this.pingTimeout); + this.accessToken = null; + this.connectionId = null; + this.isConnectionActive = false; + this.accessTokenTimeout = null; + this.pingTimeout = null; + this.reconnectAttempts = 0; + setTimeout(() => { + this.brokerManager && this.brokerManager.close(); + }, 500); + } + } + + /** + * Close Memphis connection. + */ + close() { + if (this.isConnectionActive) { + this.client.removeAllListeners("data"); + this.client.removeAllListeners("error"); + this.client.removeAllListeners("close"); + this.client.destroy(); + clearTimeout(this.accessTokenTimeout); + clearTimeout(this.pingTimeout); + this.accessToken = null; + this.connectionId = null; + this.isConnectionActive = false; + this.accessTokenTimeout = null; + this.pingTimeout = null; + this.reconnectAttempts = 0; + setTimeout(() => { + this.brokerManager && this.brokerManager.close(); + }, 500); + } + } +} + +class Producer { + constructor(connection, producerName, stationName) { + this.connection = connection; + this.producerName = producerName.toLowerCase(); + this.stationName = stationName.toLowerCase(); + } + + /** + * Produces a message into a station. + * @param {Uint8Array} message - message to send into the station. + * @param {Number} ackWaitSec - max time in seconds to wait for an ack from memphis. + */ + async produce({ message, ackWaitSec = 15 }) { + try { + const h = headers(); + h.append("producedBy", this.producerName); + await this.connection.brokerConnection.publish(`${this.stationName}.final`, message, { msgID: uuidv4(), headers: h, ackWait: ackWaitSec * 1000 * 1000000 }); + } catch (ex) { + if (ex.code === '503') { + throw new Error("Produce operation has failed, please check wheether Station/Producer are still exist"); + } + throw ex; + } + } + + /** + * Destroy the producer. + */ + async destroy() { + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/producers/destroyProducer`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + name: this.producerName, + station_name: this.stationName + }, + }); + } catch (ex) { + return; // ignoring unsuccessful destroy calls + } + } +} + +class Consumer { + constructor(connection, stationName, consumerName, consumerGroup, pullIntervalMs, batchSize, batchMaxTimeToWaitMs, maxAckTimeMs) { + this.connection = connection; + this.stationName = stationName.toLowerCase(); + this.consumerName = consumerName.toLowerCase(); + this.consumerGroup = consumerGroup.toLowerCase(); + this.pullIntervalMs = pullIntervalMs; + this.batchSize = batchSize; + this.batchMaxTimeToWaitMs = batchMaxTimeToWaitMs; + this.maxAckTimeMs = maxAckTimeMs; + this.eventEmitter = new events.EventEmitter(); + this.pullInterval = null; + this.pingConsumerInvtervalMs = 30000; + this.pingConsumerInvterval = null; + + this.connection.brokerConnection.pullSubscribe(`${this.stationName}.final`, { + mack: true, + config: { + durable_name: this.consumerGroup ? this.consumerGroup : this.consumerName, + ack_wait: this.maxAckTimeMs, + }, + }).then(async psub => { + psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); + this.pullInterval = setInterval(() => { + if (!this.connection.brokerManager.isClosed()) + psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); + else + clearInterval(this.pullInterval) + }, this.pullIntervalMs); + + this.pingConsumerInvterval = setInterval(async () => { + if (!this.connection.brokerManager.isClosed()) { + this._pingConsumer() + } + else + clearInterval(this.pingConsumerInvterval) + }, this.pingConsumerInvtervalMs); + + for await (const m of psub) { + this.eventEmitter.emit("message", new Message(m)); + } + }).catch(error => this.eventEmitter.emit("error", error)); + } + + /** + * Creates an event listener. + * @param {String} event - the event to listen to. + * @param {Function} cb - a callback function. + */ + on(event, cb) { + this.eventEmitter.on(event, cb); + } + + async _pingConsumer() { + try { + const durableName = this.consumerGroup || this.consumerName; + await this.connection.brokerStats.consumers.info(this.stationName, durableName) + } catch (ex) { + this.eventEmitter.emit("error", "station/consumer were not found"); + } + } + + /** + * Destroy the consumer. + */ + async destroy() { + this.eventEmitter.removeAllListeners("message"); + this.eventEmitter.removeAllListeners("error"); + clearInterval(this.pullInterval); + clearInterval(this.pingConsumerInvterval); + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/consumers/destroyConsumer`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + name: this.consumerName, + station_name: this.stationName + }, + }); + } catch (ex) { + return; // ignoring unsuccessful destroy calls + } + } +} + +class Message { + constructor(message) { + this.message = message; + } + + /** + * Ack a message is done processing. + */ + ack() { + this.message.ack(); + } + + getData() { + return this.message.data; + } +} + +class Factory { + constructor(connection, name) { + this.connection = connection; + this.name = name.toLowerCase(); + } + + /** + * Destroy the factory. + */ + async destroy() { + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/factories/removeFactory`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + factory_name: this.name + }, + }); + } catch (ex) { + throw ex; + } + } +} + +class Station { + constructor(connection, name) { + this.connection = connection; + this.name = name.toLowerCase(); + } + + /** + * Destroy the station. + */ + async destroy() { + try { + await httpRequest({ + method: "DELETE", + url: `http://${this.connection.host}:${this.connection.managementPort}/api/stations/removeStation`, + headers: { + Authorization: "Bearer " + this.connection.accessToken + }, + bodyParams: { + station_name: this.name + }, + }); + } catch (ex) { + throw ex; + } + } +} + +module.exports = new Memphis(); +module.exports.retentionTypes = retentionTypes; +module.exports.storageTypes = storageTypes; diff --git a/index.ts b/src/index.ts similarity index 94% rename from index.ts rename to src/index.ts index 7ff3920..f19047c 100644 --- a/index.ts +++ b/src/index.ts @@ -13,7 +13,7 @@ import net from 'net'; import events from 'events'; -import broker from 'nats'; +import * as broker from 'nats'; import { headers } from "nats"; import { v4 as uuidv4 } from 'uuid'; import { httpRequest } from './httpRequest'; @@ -30,12 +30,17 @@ const retentionTypes: IRetentionTypes = { BYTES: "bytes" }; -const storageTypes = { +interface IStorageTypes { + FILE: string; + MEMORY: string; +} + +const storageTypes: IStorageTypes = { FILE: "file", MEMORY: "memory" }; -interface IConnectionData { +interface IMessage { connection_id: string; access_token: string; access_token_exp: number; @@ -52,30 +57,39 @@ class Memphis { private dataPort: number; private username: string; private connectionToken: string; - private accessTokenTimeout: NodeJS.Timeout; - private pingTimeout: NodeJS.Timeout; + private accessTokenTimeout: any; + private pingTimeout: any; private client: net.Socket; private reconnectAttempts: number; private reconnect: boolean; private maxReconnect: number; private reconnectIntervalMs: number; private timeoutMs: number; - public brokerConnection: broker.JetStreamClient; - public brokerManager: broker.NatsConnection; - public brokerStats: broker.JetStreamManager; + public brokerConnection: any; + public brokerManager: any; + public brokerStats: any; constructor() { this.isConnectionActive = false; + this.connectionId = ""; + this.accessToken = ""; + this.host = ""; this.managementPort = 5555; this.tcpPort = 6666; this.dataPort = 7766 this.username = ""; + this.connectionToken = ""; + this.accessTokenTimeout = null; + this.pingTimeout = null; this.client = new net.Socket(); this.reconnectAttempts = 0; this.reconnect = true; this.maxReconnect = 3; this.reconnectIntervalMs = 200; this.timeoutMs = 15000; + this.brokerConnection = null; + this.brokerManager = null; + this.brokerStats = null; this.client.on('error', error => { console.error(error); @@ -126,23 +140,23 @@ class Memphis { let connected = false; this.client.on('data', async data => { - let newData: IConnectionData; + let message: IMessage; try { - newData = JSON.parse(data.toString()) + message = JSON.parse(data.toString()) } catch (ex) { return reject(data.toString()); } - this.connectionId = newData.connection_id; + this.connectionId = message.connection_id; this.isConnectionActive = true; this.reconnectAttempts = 0; - if (newData.access_token) { - this.accessToken = newData.access_token; - this._keepAcessTokenFresh(newData.access_token_exp); + if (message.access_token) { + this.accessToken = message.access_token; + this._keepAcessTokenFresh(message.access_token_exp); } - if (newData.ping_interval_ms) - this._pingServer(newData.ping_interval_ms); + if (message.ping_interval_ms) + this._pingServer(message.ping_interval_ms); if (!connected) { try { @@ -463,9 +477,9 @@ class Consumer { private batchMaxTimeToWaitMs: number; private maxAckTimeMs: number; private eventEmitter: events.EventEmitter; - private pullInterval: NodeJS.Timeout; + private pullInterval: any; private pingConsumerInvtervalMs: number; - private pingConsumerInvterval: NodeJS.Timeout; + private pingConsumerInvterval: any; constructor(connection: Memphis, stationName: string, consumerName: string, consumerGroup: string, pullIntervalMs: number, batchSize: number, batchMaxTimeToWaitMs: number, maxAckTimeMs: number) { @@ -478,7 +492,9 @@ class Consumer { this.batchMaxTimeToWaitMs = batchMaxTimeToWaitMs; this.maxAckTimeMs = maxAckTimeMs; this.eventEmitter = new events.EventEmitter(); + this.pullInterval = null; this.pingConsumerInvtervalMs = 30000; + this.pingConsumerInvterval = null; this.connection.brokerConnection.pullSubscribe(`${this.stationName}.final`, { mack: true, @@ -486,7 +502,7 @@ class Consumer { durable_name: this.consumerGroup ? this.consumerGroup : this.consumerName, ack_wait: this.maxAckTimeMs, }, - }).then(async psub => { + }).then(async (psub: any) => { psub.pull({ batch: this.batchSize, expires: this.batchMaxTimeToWaitMs }); this.pullInterval = setInterval(() => { if (!this.connection.brokerManager.isClosed()) @@ -506,7 +522,7 @@ class Consumer { for await (const m of psub) { this.eventEmitter.emit("message", new Message(m)); } - }).catch(error => this.eventEmitter.emit("error", error)); + }).catch((error: any) => this.eventEmitter.emit("error", error)); } /** diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..aa3d89e --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,18 @@ +{ + "extends": "@tsconfig/node16/tsconfig.json", + "compilerOptions": { + "target": "es5", + "module": "commonjs", + "declaration": true, + "outDir": "", + "strict": true + }, + "include": [ + "src" + ], + "exclude": [ + "node_modules", + "**/__tests__/*", + "Jenkinsfile" + ] +} \ No newline at end of file From 1521f4b8f17399259420dc59854081232accca9a Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Tue, 14 Jun 2022 18:03:54 +0300 Subject: [PATCH 4/5] add ds store to git ignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 25c8fdb..373c5df 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ node_modules -package-lock.json \ No newline at end of file +package-lock.json +.DS_Store \ No newline at end of file From 9117bdff3e1158943a44d16884f6eab7b7c7b9d2 Mon Sep 17 00:00:00 2001 From: Idan Asulin <74712806+idanasulinmemphis@users.noreply.github.com> Date: Tue, 14 Jun 2022 18:04:14 +0300 Subject: [PATCH 5/5] Delete .DS_Store --- .DS_Store | Bin 6148 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 50c27e7d42157bcfb77412dd29282b4ae01178b3..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKI|>3p3{6x-u(7n9D|mxJ^aNhOFCr)wV!xH=@@T$%5M;FxY~%%!Hh}rb_T?ZaS;_tj{`*^* zr2If Wye2k*PDkG9K>iGvE;K6eYXu%tHx-5e