forked from sorribas/fully-connected-topology
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
138 lines (103 loc) · 3.51 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import {connect as _connect, createServer} from 'net'
import {EventEmitter} from 'events'
import {read, write} from 'length-prefixed-message'
import networkAddress from 'network-address'
export default class Topology extends EventEmitter {
me = ''
peers = {}
server = null
constructor (me, peers) {
super()
this.me = me
if (!(this instanceof Topology)) return new Topology(me, peers)
if (/^\d+$/.test(me)) me = networkAddress() + ':' + me
if (this.me) this.listen(Number(me.split(':')[1]))
EventEmitter.call(this)
if (peers) for (let peer of Object.values(peers)) this.add(peer)
}
peer (addr) {
return (this.peers[addr] && this.peers[addr].socket) || null
}
listen (port) {
this.server = createServer(socket => {this.onconnection(socket)})
this.server.listen(port)
}
add (addr) {
if (addr === this.me) return
var host = addr.split(':')[0]
var port = Number(addr.split(':')[1])
var peer = this.peers[addr] = this.peers[addr] || {id:addr}
peer.host = host
peer.port = port
peer.retries = 0
peer.reconnectTimeout = peer.reconnectTimeout || null
peer.pendingSocket = peer.pendingSocket || null
peer.socket = peer.socket || null
this.connect(peer)
}
remove (addr) {
if (addr === this.me) return
var peer = this.peers[addr]
if (!peer) return
delete this.peers[addr]
peer.host = null // will stop reconnects
if (peer.socket) peer.socket.destroy()
if (peer.pendingSocket) peer.pendingSocket.destroy()
clearTimeout(peer.reconnectTimeout)
}
destroy () {
if (this.server) this.server.close()
Object.keys(this.peers).forEach(this.remove.bind(this))
}
get connections () {
var peers = this.peers
return Object.keys(peers).map(id => {return peers[id].socket}).filter(socket => socket)
}
onconnection (socket) {
this.errorHandle(socket)
read(socket, from => {
from = from.toString()
var peer = this.peers[from] = this.peers[from] || {id:from}
if (from > this.me) return this.connect(peer, socket)
write(socket, this.me)
this.attachCleanup(peer, socket)
this.onready(peer, socket)
})
};
attachCleanup (peer, socket) {
socket.on('close', function () {
if (peer.socket === socket) peer.socket = null
if (peer.pendingSocket === socket) peer.pendingSocket = null
if (peer.socket) return
if (!peer.host) return delete this.peers[peer.id]
peer.retries++
peer.reconnectTimeout = setTimeout(() => {connect(peer)}, (1 << peer.retries) * 250)
this.emit('reconnect', peer.id, peer.retries)
})
};
errorHandle (socket) {
socket.on('error', () => {socket.destroy()})
// 15s to do the handshake
socket.setTimeout(15000, () => {socket.destroy()})
};
onready (peer, socket) {
socket.setTimeout(0) // reset timeout
var oldSocket = peer.socket
peer.retries = 0
peer.socket = socket
peer.pendingSocket = null
if (oldSocket) oldSocket.destroy()
this.emit('connection', peer.socket, peer.id)
};
connect (peer, socket) {
if (peer.socket || peer.pendingSocket) return socket && socket.destroy()
if (peer.reconnectTimeout) clearTimeout(peer.reconnectTimeout)
if (!socket) socket = _connect(peer.port, peer.host)
write(socket, this.me)
peer.pendingSocket = socket
if (this.me > peer.id) return this.onconnection(socket)
this.errorHandle(socket)
this.attachCleanup(peer, socket)
read(socket, () => {this.onready(peer, socket)})
};
}