Skip to content

Commit f991509

Browse files
committed
[major] Add ability to retrieve the sparks of a given server
1 parent 25ce3ec commit f991509

File tree

4 files changed

+115
-57
lines changed

4 files changed

+115
-57
lines changed

README.md

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Metroplex a Redis based spark/connection registry for Primus.
66

77
## Installation
88

9-
Metroplex is released in the npm registry and can therefor be installed using:
9+
Metroplex is released in the npm registry and can therefore be installed using:
1010

1111
```
1212
npm install --save metroplex
@@ -43,15 +43,15 @@ this plugin:
4343
node process is still alive. The interval determines the interval of these
4444
updates. When the interval is reached we update the key in the database with
4545
the current EPOCH as well as start a scan for possible dead servers and
46-
removing them. The default interval `300000` ms
46+
removing them. The default interval `300000` ms.
4747
- *latency*: The maximum time it would take to update the `alive` key in Redis.
4848
This time is subtracted from the set `interval` so we update the key BEFORE
4949
it expires. Defaults to `2000` ms.
5050
- *address* The address or public URL on which this SPECIFIC server is
5151
reachable. Should be without path name. When nothing is supplied we try to be
52-
somewhat smart and read the address and port and server type from the server
53-
that Primus is attached to and compose an URL like: `http://0.0.0.0:8080` from
54-
it.
52+
somewhat smart and read the address, the port and the type of the server from
53+
the server that Primus is attached to and compose an URL like:
54+
`http://0.0.0.0:8080` from it.
5555

5656
These options should be provided in the options object of the Primus server:
5757

@@ -82,41 +82,64 @@ The following **public** methods are available.
8282
#### metroplex.servers
8383

8484
```js
85-
metroplex.servers(fn)
85+
metroplex.servers(self, sparks, fn)
8686
```
8787

88-
List all the servers in our current registry.
88+
This method returns all the servers in the registry or the servers for the
89+
given spark ids. It takes the following arguments:
90+
91+
##### self
92+
93+
An optional boolean flag to specify if the result should include the current
94+
server or not. It defaults to `false` and has no effect if the `sparks`
95+
argument is provided.
8996

9097
```js
91-
metroplex.servers(function (err, servers) {
98+
metroplex.servers(true, function (err, servers) {
99+
// `servers` is an array with all the servers in the registry.
92100
console.log(servers);
93101
});
94102
```
95103

96-
#### metroplex.spark
104+
##### sparks
105+
106+
A spark id or an array of spark ids. If this argument is provided the method
107+
returns only the server for the given spark ids. We don't check if the spark
108+
id/s is/are hosted on the current server. It's up to the developer to prevent
109+
useless database calls.
97110

98111
```js
99-
metroplex.spark(id, fn)
112+
metroplex.servers(['ad8a-280z-18', 'y97x-42480-13'], function (err, servers) {
113+
// `servers` is an array with the servers of the two sparks.
114+
console.log(servers);
115+
});
100116
```
101117

102-
Get the server for the given spark id. It does not check if the spark is hosted
103-
on the current server. That's up to the developer to implement.
104-
105118
```js
106-
metroplex.spark(id, function (err, server) {
107-
console.log(server);
119+
metroplex.servers('ad8a-280z-18', function (err, address) {
120+
// `address` is the server of the given spark.
121+
console.log(address);
108122
});
109123
```
110124

125+
##### fn
126+
127+
A callback function that follows the usual error first pattern.
128+
111129
#### metroplex.sparks
112130

113131
```js
114-
metroplex.sparks(sparks, fn)
132+
metroplex.sparks(address, fn)
115133
```
116134

117-
Get the servers for each id in the given `sparks` array. It will return an
118-
object and just like `metroplex.spark` it does not check if the spark is hosted
119-
on the current server.
135+
This method returns all the spark ids for the given server address.
136+
137+
```js
138+
metroplex.sparks('http://192.168.0.10:3000', function (err, ids) {
139+
// `ids` is an array of spark ids.
140+
console.log(ids);
141+
});
142+
```
120143

121144
### Omega Supreme integration
122145

metroplex.js

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -186,56 +186,58 @@ Metroplex.readable('disconnect', function disconnect(spark) {
186186
});
187187

188188
/**
189-
* Get all current registered servers except our selfs.
189+
* Get all the servers in the registry or the servers for the given spark id(s).
190190
*
191-
* @param {Function} fn Callback
191+
* @param {Boolean} [self] Whether to include ourselves in the result.
192+
* @param {String|Array<String>} [sparks] A spark id or an array of spark ids.
193+
* @param {Function} fn Callback.
192194
* @returns {Metroplex}
193195
* @api public
194196
*/
195-
Metroplex.readable('servers', function servers(self, fn) {
196-
var metroplex = this;
197+
Metroplex.readable('servers', function servers(self, sparks, fn) {
198+
var namespace = this.namespace
199+
, redis = this.redis
200+
, metroplex = this;
197201

198202
if ('boolean' !== typeof self) {
199-
fn = self;
200-
self = 0;
203+
fn = sparks;
204+
sparks = self;
205+
self = false;
201206
}
202207

203-
metroplex.redis.smembers(this.namespace +'servers', function smembers(err, members) {
204-
if (self) return fn(err, members);
208+
if ('function' === typeof sparks) {
209+
fn = sparks;
210+
sparks = null;
211+
}
205212

206-
fn(err, (members || []).filter(function filter(address) {
207-
return address !== metroplex.address;
208-
}));
209-
});
213+
if (!sparks) {
214+
redis.smembers(namespace +'servers', function smembers(err, members) {
215+
if (self) return fn(err, members);
210216

211-
return this;
212-
});
217+
fn(err, (members || []).filter(function filter(address) {
218+
return address !== metroplex.address;
219+
}));
220+
});
221+
} else if (Array.isArray(sparks)) {
222+
redis.hmget.apply(redis, [namespace +'sparks'].concat(sparks).concat(fn));
223+
} else {
224+
redis.hget(namespace +'spark', sparks, fn);
225+
}
213226

214-
/**
215-
* Get the server address for a given spark id.
216-
*
217-
* @param {String} id The spark id who's server address we want to retrieve.
218-
* @param {Function} fn Callback
219-
* @returns {Metroplex}
220-
* @api public
221-
*/
222-
Metroplex.readable('spark', function spark(id, fn) {
223-
this.redis.hget(this.namespace +'sparks', id, fn);
224227
return this;
225228
});
226229

227230
/**
228-
* Get all server addresses for the given spark ids.
231+
* Get all the spark ids for the given server address.
229232
*
230-
* @param {Array} ids The spark id's we need to look up
233+
* @param {String} address The server address.
231234
* @param {Function} fn Callback.
232235
* @returns {Metroplex}
233236
* @api public
234237
*/
235-
Metroplex.readable('sparks', function sparks(ids, fn) {
236-
var key = this.namespace +'sparks';
238+
Metroplex.readable('sparks', function sparks(address, fn) {
239+
this.redis.smembers(this.namespace + this.address +':sparks', fn);
237240

238-
this.redis.hmget.apply(this.redis, [key].concat(ids).concat(fn));
239241
return this;
240242
});
241243

omega.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ module.exports = function forwards(primus) {
5353
forward.sparks = function sparks(ids, msg, fn) {
5454
fn = fn || nope;
5555

56-
metroplex.sparks(ids, function sparks(err, servers) {
56+
metroplex.servers(ids, function sparks(err, servers) {
5757
if (err) return fn(err);
5858

5959
servers = Object.keys(servers).reduce(function fn(memo, spark) {
@@ -81,9 +81,9 @@ module.exports = function forwards(primus) {
8181
};
8282

8383
/**
84-
* Forward the message to a specific spark
84+
* Forward the message to a specific spark.
8585
*
86-
* @param {String} id Spark id
86+
* @param {String} id Spark id.
8787
* @param {Mixed} msg Message to broadcast.
8888
* @param {Function} fn Completion callback.
8989
* @returns {Forward}
@@ -92,7 +92,7 @@ module.exports = function forwards(primus) {
9292
forward.spark = function spark(id, msg, fn) {
9393
fn = fn || nope;
9494

95-
metroplex.spark(id, function spark(err, server) {
95+
metroplex.servers(id, function spark(err, server) {
9696
if (err) return fn(err);
9797

9898
forward(server, msg, id, fn);

test/integration.test.js

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,19 +157,21 @@ describe('plugin', function () {
157157
http.listen(portnumber);
158158
});
159159

160-
it('finds servers for a list of sparks', function (next) {
160+
it('finds the servers for a list of sparks', function (next) {
161161
server.use('metroplex', metroplex);
162162
server2.use('metroplex', metroplex);
163163

164-
var clients = []
164+
var Socket2 = server2.Socket
165+
, Socket = server.Socket
166+
, clients = []
165167
, length = 10;
166168

167169
function push(address) {
168170
return function (id) {
169171
clients.push({ id: id, address: address });
170172

171173
if (clients.length === length) {
172-
server.metroplex.sparks(clients.map(function (client) {
174+
server.metroplex.servers(clients.map(function (client) {
173175
return client.id;
174176
}), function (err, addresses) {
175177
if (err) return next(err);
@@ -186,11 +188,42 @@ describe('plugin', function () {
186188

187189
server.once('register', function (address) {
188190
var len = length / 2;
189-
while (len--) new server.Socket(address).id(push(address));
191+
while (len--) new Socket(address).id(push(address));
190192
});
191193
server2.once('register', function (address) {
192194
var len = length / 2;
193-
while (len--) new server2.Socket(address).id(push(address));
195+
while (len--) new Socket2(address).id(push(address));
196+
});
197+
});
198+
199+
it('finds the sparks in a server', function (next) {
200+
server.use('metroplex', metroplex);
201+
202+
var Socket = server.Socket
203+
, clients = []
204+
, length = 10;
205+
206+
function push(address) {
207+
return function (id) {
208+
clients.push(id);
209+
210+
if (clients.length === length) {
211+
server.metroplex.sparks(address, function (err, sparks) {
212+
if (err) return next(err);
213+
214+
clients.forEach(function (id) {
215+
assume(sparks.indexOf(id)).is.above(-1);
216+
});
217+
218+
next();
219+
});
220+
}
221+
};
222+
}
223+
224+
server.once('register', function (address) {
225+
var len = length;
226+
while (len--) new Socket(address).id(push(address));
194227
});
195228
});
196229
});

0 commit comments

Comments
 (0)