-
Notifications
You must be signed in to change notification settings - Fork 11
Refactor #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Refactor #2
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,10 @@ Leverage.scripts = Leverage.scripts.concat( | |
* Add defaults to the supplied options. The following options are available: | ||
* | ||
* - redis: The Redis instance we should use to store data | ||
* - namespace: The namespace prefix to prevent collision's. | ||
* - interval: Expire interval to keep the server alive in Redis | ||
* - timeout: Timeout for sparks who are alive. | ||
* - latency: Time it takes for our Redis commands to execute. | ||
* - namespace: The namespace prefix to prevent collisions (default: 'metroplex'). | ||
* - interval: Expire interval to keep the server alive in Redis (default: 5 minutes) | ||
* - latency: Time it takes for our Redis commands to execute. (default: 500 ms) | ||
* - sparkTimeout: Timeout for sparks who are alive. This should be greater than the client ping interval. (default: 1 minute) | ||
* | ||
* @param {Primus} primus The Primus instance that received the plugin. | ||
* @param {Object} options Configuration. | ||
|
@@ -39,8 +39,8 @@ function Metroplex(primus, options) { | |
this.redis = options.redis || require('redis').createClient(); | ||
this.namespace = (options.namespace || 'metroplex') +':'; | ||
this.interval = options.interval || 5 * 60 * 1000; | ||
this.timeout = options.timeout || 30 * 60; | ||
this.latency = options.latency || 2000; | ||
this.sparkTimeout = options.sparkTimeout || 60 * 1000; | ||
this.latency = options.latency || 500; | ||
this.leverage = new Leverage(this.redis, { | ||
namespace: this.namespace | ||
}); | ||
|
@@ -92,32 +92,16 @@ Metroplex.readable('parse', function parse(server) { | |
Metroplex.readable('register', function register(address, fn) { | ||
var metroplex = this; | ||
|
||
metroplex.address = this.parse(address || metroplex.address); | ||
metroplex.address = metroplex.parse(address || metroplex.address); | ||
if (!metroplex.address) { | ||
if (fn) fn(); | ||
return this; | ||
} | ||
|
||
metroplex.leverage.annihilate(metroplex.address, function annihilate(err) { | ||
if (err) { | ||
if (fn) return fn(err); | ||
return metroplex.emit('error', err); | ||
} | ||
|
||
metroplex.redis.multi() | ||
.setex(metroplex.namespace + metroplex.address, metroplex.interval, Date.now()) | ||
.sadd(metroplex.namespace +'servers', metroplex.address) | ||
.exec(function register(err) { | ||
if (err) { | ||
if (fn) return fn(err); | ||
return metroplex.emit('error', err); | ||
} | ||
|
||
metroplex.emit('register', metroplex.address); | ||
metroplex.setInterval(); | ||
|
||
if (fn) fn(err, metroplex.address); | ||
}); | ||
metroplex.redis.psetex(metroplex.namespace + 'server:' + metroplex.address, metroplex.interval, Date.now(), function(err, result) { | ||
metroplex.emit('register', metroplex.address); | ||
metroplex.setInterval(); | ||
if (fn) fn(err, metroplex.address); | ||
}); | ||
}); | ||
|
||
|
@@ -132,23 +116,17 @@ Metroplex.readable('register', function register(address, fn) { | |
Metroplex.readable('unregister', function unregister(address, fn) { | ||
var metroplex = this; | ||
|
||
address = this.parse(address || metroplex.address); | ||
address = metroplex.parse(address || metroplex.address); | ||
if (!metroplex.address) { | ||
if (fn) fn(); | ||
return this; | ||
} | ||
|
||
metroplex.leverage.annihilate(address, function annihilate(err) { | ||
if (err) { | ||
if (fn) return fn(err); | ||
return metroplex.emit('error', err); | ||
} | ||
metroplex.redis.del(metroplex.namespace + 'server:' + metroplex.address); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would add a callback, handle the error and move the following stuff inside the callback body. |
||
metroplex.emit('unregister', address); | ||
|
||
metroplex.emit('unregister', address); | ||
|
||
if (metroplex.timer) clearInterval(metroplex.timer); | ||
if (fn) fn(err, address); | ||
}); | ||
if (metroplex.timer) clearInterval(metroplex.timer); | ||
if (fn) fn(null, address); | ||
|
||
return this; | ||
}); | ||
|
@@ -161,11 +139,7 @@ Metroplex.readable('unregister', function unregister(address, fn) { | |
* @api public | ||
*/ | ||
Metroplex.readable('connect', function connect(spark) { | ||
this.redis.multi() | ||
.hset(this.namespace +'sparks', spark.id, this.address) | ||
.sadd(this.namespace + this.address +':sparks', spark.id) | ||
.exec(); | ||
|
||
this.redis.psetex(this.namespace + 'spark:' + spark.id, this.sparkTimeout, this.address); | ||
return this; | ||
}); | ||
|
||
|
@@ -177,11 +151,7 @@ Metroplex.readable('connect', function connect(spark) { | |
* @api public | ||
*/ | ||
Metroplex.readable('disconnect', function disconnect(spark) { | ||
this.redis.multi() | ||
.hdel(this.namespace +'sparks', spark.id) | ||
.srem(this.namespace + this.address +':sparks', spark.id) | ||
.exec(); | ||
|
||
this.redis.del(this.namespace + 'spark:' + spark.id); | ||
return this; | ||
}); | ||
|
||
|
@@ -192,25 +162,32 @@ Metroplex.readable('disconnect', function disconnect(spark) { | |
* @returns {Metroplex} | ||
* @api public | ||
*/ | ||
Metroplex.readable('servers', function servers(self, fn) { | ||
Metroplex.readable('servers', function servers(fn) { | ||
var metroplex = this; | ||
|
||
if ('boolean' !== typeof self) { | ||
fn = self; | ||
self = 0; | ||
} | ||
|
||
this.redis.smembers(this.namespace +'servers', function smembers(err, members) { | ||
if (self) return fn(err, members); | ||
|
||
fn(err, (members || []).filter(function filter(address) { | ||
metroplex.redis.keys(metroplex.namespace + 'server:*', function keyList(err, list) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning ALL the servers can be useful, for example for debugging, so I would add back the |
||
fn(err, (list || []).map(function(key) { | ||
return key.replace(metroplex.namespace + 'server:', ''); | ||
}).filter(function filter(address) { | ||
return address !== metroplex.address; | ||
})); | ||
}); | ||
|
||
return this; | ||
}); | ||
|
||
/** | ||
* Reset the time to live for a registered spark. | ||
* | ||
* @param {Spark} spark The connection/spark from Primus. | ||
* @returns {Metroplex} | ||
* @api private | ||
**/ | ||
Metroplex.readable('heartbeat', function heartbeat(spark) { | ||
this.redis.psetex(this.namespace + 'spark:' + spark.id, this.sparkTimeout, this.address); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only need to update the timeout of the key, so we can use |
||
return this; | ||
}); | ||
|
||
/** | ||
* Get the server address for a given spark id. | ||
* | ||
|
@@ -220,23 +197,23 @@ Metroplex.readable('servers', function servers(self, fn) { | |
* @api public | ||
*/ | ||
Metroplex.readable('spark', function spark(id, fn) { | ||
this.redis.hget(this.namespace +'sparks', id, fn); | ||
this.redis.get(this.namespace + 'spark:' + id, fn); | ||
return this; | ||
}); | ||
|
||
/** | ||
* Get all server addresses for the given spark ids. | ||
* | ||
* @param {Array} args The spark id's we need to look up | ||
* @param {Array} ids The spark id's we need to look up | ||
* @param {Function} fn Callback. | ||
* @returns {Metroplex} | ||
* @api public | ||
*/ | ||
Metroplex.readable('sparks', function sparks(args, fn) { | ||
args.push(fn); | ||
args.shift(this.namespace +'sparks'); | ||
this.redis.hmget.apply(this.redis, args); | ||
|
||
Metroplex.readable('sparks', function sparks(ids, fn) { | ||
var metroplex = this; | ||
metroplex.leverage.multiget('spark:', ids, function(err, result) { | ||
fn(err, JSON.parse(result)); | ||
}); | ||
return this; | ||
}); | ||
|
||
|
@@ -252,31 +229,12 @@ Metroplex.readable('sparks', function sparks(args, fn) { | |
Metroplex.readable('setInterval', function setIntervals() { | ||
if (this.timer) clearInterval(this.timer); | ||
|
||
var alive = this.namespace + this.address +':alive' | ||
, redis = this.redis | ||
var redis = this.redis | ||
, metroplex = this; | ||
|
||
this.timer = setInterval(function interval() { | ||
// | ||
// Redis expects the expire value in seconds instead of milliseconds so we | ||
// need to correct our interval. | ||
// | ||
redis.setex(alive, metroplex.interval / 1000, Date.now()); | ||
|
||
metroplex.servers(function servers(err, list) { | ||
if (err) return metroplex.emit('error', err); | ||
|
||
list.forEach(function expired(address) { | ||
redis.get(metroplex.namespace + address, function get(err, stamp) { | ||
if (err || Date.now() - +stamp < metroplex.interval) return; | ||
|
||
metroplex.leverage.annihilate(address, function murdered(err) { | ||
if (err) return metroplex.emit('error', err); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}, this.interval - this.latency); | ||
metroplex.timer = setInterval(function interval() { | ||
redis.psetex(metroplex.namespace + 'server:' + metroplex.address, metroplex.interval, Date.now()); | ||
}, metroplex.interval - metroplex.latency); | ||
}); | ||
|
||
// | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
local namespace = '{leverage::namespace}' | ||
local prefix = KEYS[1] | ||
local keys = KEYS[2] | ||
local result = { } | ||
|
||
for key in string.gmatch(keys, '([^,]+)') do | ||
result[key] = redis.call('GET', namespace .. prefix .. key) | ||
end | ||
|
||
return cjson.encode(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that redis commands can fail only if called with a wrong syntax or against keys holding the wrong data type, but I would handle the error anyway, in the same way it was handled before.