Skip to content

Refactor #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Metroplex.server = function server(primus, options) {

primus.on('connection', function connection(spark) {
metroplex.connect(spark);
spark.on('incoming::ping', function heartbeat() {
metroplex.heartbeat(spark);
});
}).on('disconnection', function disconnection(spark) {
metroplex.disconnect(spark);
}).on('close', function close(options, next) {
Expand Down
134 changes: 46 additions & 88 deletions metroplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ Leverage.scripts = Leverage.scripts.concat(
* Add defaults to the supplied options. The following options are available:
*
* - redis: The Redis instance we should use to store data
* - namespace: The namespace prefix to prevent collision's.
* - interval: Expire interval to keep the server alive in Redis
* - timeout: Timeout for sparks who are alive.
* - latency: Time it takes for our Redis commands to execute.
* - namespace: The namespace prefix to prevent collisions (default: 'metroplex').
* - interval: Expire interval to keep the server alive in Redis (default: 5 minutes)
* - latency: Time it takes for our Redis commands to execute. (default: 500 ms)
* - sparkTimeout: Timeout for sparks who are alive. This should be greater than the client ping interval. (default: 1 minute)
*
* @param {Primus} primus The Primus instance that received the plugin.
* @param {Object} options Configuration.
Expand All @@ -39,8 +39,8 @@ function Metroplex(primus, options) {
this.redis = options.redis || require('redis').createClient();
this.namespace = (options.namespace || 'metroplex') +':';
this.interval = options.interval || 5 * 60 * 1000;
this.timeout = options.timeout || 30 * 60;
this.latency = options.latency || 2000;
this.sparkTimeout = options.sparkTimeout || 60 * 1000;
this.latency = options.latency || 500;
this.leverage = new Leverage(this.redis, {
namespace: this.namespace
});
Expand Down Expand Up @@ -92,32 +92,16 @@ Metroplex.readable('parse', function parse(server) {
Metroplex.readable('register', function register(address, fn) {
var metroplex = this;

metroplex.address = this.parse(address || metroplex.address);
metroplex.address = metroplex.parse(address || metroplex.address);
if (!metroplex.address) {
if (fn) fn();
return this;
}

metroplex.leverage.annihilate(metroplex.address, function annihilate(err) {
if (err) {
if (fn) return fn(err);
return metroplex.emit('error', err);
}

metroplex.redis.multi()
.setex(metroplex.namespace + metroplex.address, metroplex.interval, Date.now())
.sadd(metroplex.namespace +'servers', metroplex.address)
.exec(function register(err) {
if (err) {
if (fn) return fn(err);
return metroplex.emit('error', err);
}

metroplex.emit('register', metroplex.address);
metroplex.setInterval();

if (fn) fn(err, metroplex.address);
});
metroplex.redis.psetex(metroplex.namespace + 'server:' + metroplex.address, metroplex.interval, Date.now(), function(err, result) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that redis commands can fail only if called with a wrong syntax or against keys holding the wrong data type, but I would handle the error anyway, in the same way it was handled before.

metroplex.emit('register', metroplex.address);
metroplex.setInterval();
if (fn) fn(err, metroplex.address);
});
});

Expand All @@ -132,23 +116,17 @@ Metroplex.readable('register', function register(address, fn) {
Metroplex.readable('unregister', function unregister(address, fn) {
var metroplex = this;

address = this.parse(address || metroplex.address);
address = metroplex.parse(address || metroplex.address);
if (!metroplex.address) {
if (fn) fn();
return this;
}

metroplex.leverage.annihilate(address, function annihilate(err) {
if (err) {
if (fn) return fn(err);
return metroplex.emit('error', err);
}
metroplex.redis.del(metroplex.namespace + 'server:' + metroplex.address);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a callback, handle the error and move the following stuff inside the callback body.

metroplex.emit('unregister', address);

metroplex.emit('unregister', address);

if (metroplex.timer) clearInterval(metroplex.timer);
if (fn) fn(err, address);
});
if (metroplex.timer) clearInterval(metroplex.timer);
if (fn) fn(null, address);

return this;
});
Expand All @@ -161,11 +139,7 @@ Metroplex.readable('unregister', function unregister(address, fn) {
* @api public
*/
Metroplex.readable('connect', function connect(spark) {
this.redis.multi()
.hset(this.namespace +'sparks', spark.id, this.address)
.sadd(this.namespace + this.address +':sparks', spark.id)
.exec();

this.redis.psetex(this.namespace + 'spark:' + spark.id, this.sparkTimeout, this.address);
return this;
});

Expand All @@ -177,11 +151,7 @@ Metroplex.readable('connect', function connect(spark) {
* @api public
*/
Metroplex.readable('disconnect', function disconnect(spark) {
this.redis.multi()
.hdel(this.namespace +'sparks', spark.id)
.srem(this.namespace + this.address +':sparks', spark.id)
.exec();

this.redis.del(this.namespace + 'spark:' + spark.id);
return this;
});

Expand All @@ -192,25 +162,32 @@ Metroplex.readable('disconnect', function disconnect(spark) {
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('servers', function servers(self, fn) {
Metroplex.readable('servers', function servers(fn) {
var metroplex = this;

if ('boolean' !== typeof self) {
fn = self;
self = 0;
}

this.redis.smembers(this.namespace +'servers', function smembers(err, members) {
if (self) return fn(err, members);

fn(err, (members || []).filter(function filter(address) {
metroplex.redis.keys(metroplex.namespace + 'server:*', function keyList(err, list) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning ALL the servers can be useful, for example for debugging, so I would add back the self argument.

fn(err, (list || []).map(function(key) {
return key.replace(metroplex.namespace + 'server:', '');
}).filter(function filter(address) {
return address !== metroplex.address;
}));
});

return this;
});

/**
* Reset the time to live for a registered spark.
*
* @param {Spark} spark The connection/spark from Primus.
* @returns {Metroplex}
* @api private
**/
Metroplex.readable('heartbeat', function heartbeat(spark) {
this.redis.psetex(this.namespace + 'spark:' + spark.id, this.sparkTimeout, this.address);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to update the timeout of the key, so we can use PEXPIRE, without setting the value again.

return this;
});

/**
* Get the server address for a given spark id.
*
Expand All @@ -220,23 +197,23 @@ Metroplex.readable('servers', function servers(self, fn) {
* @api public
*/
Metroplex.readable('spark', function spark(id, fn) {
this.redis.hget(this.namespace +'sparks', id, fn);
this.redis.get(this.namespace + 'spark:' + id, fn);
return this;
});

/**
* Get all server addresses for the given spark ids.
*
* @param {Array} args The spark id's we need to look up
* @param {Array} ids The spark id's we need to look up
* @param {Function} fn Callback.
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('sparks', function sparks(args, fn) {
args.push(fn);
args.shift(this.namespace +'sparks');
this.redis.hmget.apply(this.redis, args);

Metroplex.readable('sparks', function sparks(ids, fn) {
var metroplex = this;
metroplex.leverage.multiget('spark:', ids, function(err, result) {
fn(err, JSON.parse(result));
});
return this;
});

Expand All @@ -252,31 +229,12 @@ Metroplex.readable('sparks', function sparks(args, fn) {
Metroplex.readable('setInterval', function setIntervals() {
if (this.timer) clearInterval(this.timer);

var alive = this.namespace + this.address +':alive'
, redis = this.redis
var redis = this.redis
, metroplex = this;

this.timer = setInterval(function interval() {
//
// Redis expects the expire value in seconds instead of milliseconds so we
// need to correct our interval.
//
redis.setex(alive, metroplex.interval / 1000, Date.now());

metroplex.servers(function servers(err, list) {
if (err) return metroplex.emit('error', err);

list.forEach(function expired(address) {
redis.get(metroplex.namespace + address, function get(err, stamp) {
if (err || Date.now() - +stamp < metroplex.interval) return;

metroplex.leverage.annihilate(address, function murdered(err) {
if (err) return metroplex.emit('error', err);
});
});
});
});
}, this.interval - this.latency);
metroplex.timer = setInterval(function interval() {
redis.psetex(metroplex.namespace + 'server:' + metroplex.address, metroplex.interval, Date.now());
}, metroplex.interval - metroplex.latency);
});

//
Expand Down
32 changes: 0 additions & 32 deletions redis/annihilate.lua

This file was deleted.

10 changes: 10 additions & 0 deletions redis/multiget.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local namespace = '{leverage::namespace}'
local prefix = KEYS[1]
local keys = KEYS[2]
local result = { }

for key in string.gmatch(keys, '([^,]+)') do
result[key] = redis.call('GET', namespace .. prefix .. key)
end

return cjson.encode(result)
Loading