diff --git a/lib/connection.js b/lib/connection.js index 9200422..5f9eae5 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -52,6 +52,7 @@ function Connection(r, options, resolve, reject) { } this.authKey = options.authKey || r._authKey; + this.releaseFeed = options.releaseFeed || r._releaseFeed; // period in *seconds* for the connection to be opened this.timeoutConnect = options.timeout || r._timeoutConnect; // The connection will be pinged every seconds @@ -212,6 +213,9 @@ function Connection(r, options, resolve, reject) { self.connection.toJSON = function() { // We want people to be able to jsonify a cursor return '"A socket object cannot be converted to JSON due to circular references."' } + // For the pool implementation + this.node = null; + this.id = Math.random(); } util.inherits(Connection, events.EventEmitter); @@ -518,6 +522,10 @@ Connection.prototype._processResponse = function(response, token) { if (includesStates === true) { cursor.setIncludesStates(); } + if ((cursor.getType() !== 'Cursor') && (self.releaseFeed === true)) { + self.metadata[token].released = true; + self.emit('release-feed'); + } if ((self.metadata[token].options.cursor === true) || ((self.metadata[token].options.cursor === undefined) && (self.r._options.cursor === true))) { // Return a cursor if (self.metadata[token].options.profile === true) { @@ -579,7 +587,9 @@ Connection.prototype._processResponse = function(response, token) { } } else if (type === responseTypes.SUCCESS_SEQUENCE) { - self.emit('release'); + if (self.metadata[token].released === false) { + self.emit('release'); + } if (typeof self.metadata[token].resolve === 'function') { currentResolve = self.metadata[token].resolve; diff --git a/lib/index.js b/lib/index.js index 1cc3e89..a5c5efe 100644 --- a/lib/index.js +++ b/lib/index.js @@ -53,6 +53,7 @@ r.prototype._user = 'admin'; r.prototype._password = ''; r.prototype._timeoutConnect = 20; // seconds r.prototype._pingInterval = -1; // seconds +r.prototype._releaseFeed = false; r.prototype._nestingLevel = 100; r.prototype._arrayLimit = 100000; diff --git a/lib/linked_list.js b/lib/linked_list.js new file mode 100644 index 0000000..d7c5514 --- /dev/null +++ b/lib/linked_list.js @@ -0,0 +1,110 @@ +function LinkedList() { + this.root = null; + this.last = null; + this.length = 0; +} + +LinkedList.prototype.getLength = function() { + return this.length; +} + +LinkedList.prototype.push = function(connection) { + var node = new Node(this, connection, this.last, null); + connection.node = node; + if (this.root === null) { + this.root = node; + this.last = node; + } + else { + this.last.next = node; + this.last = node; + } + this.length++; + // Keep a reference to the node in the connection + return node; +} + +LinkedList.prototype.unshift = function(connection) { + var node = new Node(this, connection, null, this.root); + connection.node = node; + if (this.root) { + this.root.prev = node; + } + this.root = node; + if (this.last === null) { + this.last = node; + } + this.length++; + return node; +} + +// Pop a node and return the connection (not the node) +LinkedList.prototype.pop = function() { + if (this.last === null) { + return null; + } + + var last = this.last + if (this.last.prev === null) { + // this.last is the root + this.root = null; + this.last = null; + } + else { + this.last = this.last.prev; + this.last.next = null; + } + this.length--; + last.removed = true; + return last.connection; +} + +LinkedList.prototype.shift = function() { + if (this.root === null) { + return null; + } + + var result = this.root; + this.root = this.root.next; + this.length--; + result.removed = true; + return result.connection; +} + +function Node(list, connection, prev, next) { + this.list = list; + this.connection = connection; + this.prev = prev; + this.next = next; + this.removed = false; +} + +Node.prototype.remove = function() { + if (this.removed === true) { + return this.connection; + } + this.removed = true; + + if (this.prev === null) { + if (this.next === null) { + // The node is the root and has no children + this.root = null; + this.last = null; + } + else { + // The node is the root + this.root = this.next; + this.next.prev = null; + } + } + else { + this.prev.next = this.next; + if (this.next) { + this.next.prev = this.prev + } + } + this.list.length--; + return this.connection; +} + +module.exports = LinkedList; diff --git a/lib/metadata.js b/lib/metadata.js index 757cf73..fc6532c 100644 --- a/lib/metadata.js +++ b/lib/metadata.js @@ -5,6 +5,7 @@ function Metadata(resolve, reject, query, options) { this.query = query; // The query in case we have to build a backtrace this.options = options || {}; this.cursor = false; + this.released = false; } Metadata.prototype.setCursor = function() { diff --git a/lib/pool.js b/lib/pool.js index 85e1412..ac409d6 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,5 +1,6 @@ var Promise = require('bluebird'); var Dequeue = require(__dirname+'/dequeue.js'); +var LinkedList = require(__dirname+'/linked_list.js'); var helper = require(__dirname+'/helper.js'); var Err = require(__dirname+'/error.js'); var events = require('events'); @@ -30,11 +31,13 @@ function Pool(r, options) { cursor: options.cursor || false, stream: options.stream || false, ssl: options.ssl || false, - pingInterval: options.pingInterval || this._r._pingInterval + pingInterval: options.pingInterval || this._r._pingInterval, + releaseFeed: options.releaseFeed || this._r._releaseFeed } this._log = options._log; - this._pool = new Dequeue(this.options.buffer+1); + //this._pool = new Dequeue(this.options.buffer+1); + this._pool = new LinkedList(); this._draining = false; this._drainingHandlers = null; // Store the resolve/reject methods once draining is called this._localhostToDrain = 0; // number of connections to "localhost" to remove @@ -125,7 +128,7 @@ Pool.prototype._increaseNumConnections = function() { } -Pool.prototype.putConnection = function(connection) { +Pool.prototype.putConnection = function(connection, shift) { var self = this; if (connection.end === false) { // Temporary attempt to fix #192 - this should not happen. @@ -149,52 +152,43 @@ Pool.prototype.putConnection = function(connection) { self._drainingHandlers.resolve(); } } - else if (self._extraConnections > 0) { + else if ((self._extraConnections > 0) && (Object.keys(connection.metadata).length === 0)) { self._extraConnections--; connection.close().error(function(error) { self._log('Fail to properly close a connection. Error:'+JSON.stringify(error)); }); clearTimeout(connection.timeout); } - /* - // We let the pool garbage collect these connections - else if (self.getAvailableLength()+1 > self.options.buffer) { // +1 for the connection we may put back - // Note that because we have available connections here, the pool master has no pending - // queries. - connection.close().error(function(error) { - self._log('Fail to properly close a connection. Error:'+JSON.stringify(error)); - }); - clearTimeout(connection.timeout); - } - */ else { - self._pool.push(connection); + var connectionLink; + if (shift === true) { + connectionLink = self._pool.unshift(connection); + } + else { + connectionLink = self._pool.push(connection); + } self.emit('available-size', self._pool.getLength()); self.emit('available-size-diff', 1); - self.emit('new-connection', connection); clearTimeout(connection.timeout); var timeoutCb = function() { - if (self._pool.get(0) === connection) { - if (self._pool.getLength() > self.options.buffer) { - self._pool.shift().close(); - self.emit('available-size', self._pool.getLength()); - self.emit('available-size-diff', -1); - } - else { - connection.timeout = setTimeout(timeoutCb, self.options.timeoutGb); - } + if (self._pool.getLength() > self.options.buffer) { + connectionLink.remove().close(); + self.emit('available-size', self._pool.getLength()); + self.emit('available-size-diff', -1); } else { - // This should technically never happens connection.timeout = setTimeout(timeoutCb, self.options.timeoutGb); } } connection.timeout = setTimeout(timeoutCb, self.options.timeoutGb); + self.emit('new-connection', connection); } }; +var index = 0; Pool.prototype.createConnection = function() { + index++; var self = this; self._increaseNumConnections(); self._openingConnections++; @@ -203,7 +197,7 @@ Pool.prototype.createConnection = function() { if (self._draining === true) { return; // Do not create a new connection if we are draining the pool. } - + (function(index) { return self._r.connect(self.options.connection).then(function(connection) { self.emit('created-connection', self); @@ -232,6 +226,10 @@ Pool.prototype.createConnection = function() { // We are going to close connection, but we don't want another process to use it before // So we remove it from the pool now (if it's inside) self._log('Error emitted by a connection: '+JSON.stringify(error)); + connection.node.remove(); + self.emit('available-size', self._pool.getLength()); + self.emit('available-size-diff', -1); + /* for(var i=0; i