Skip to content

Commit 4bca7d2

Browse files
committed
[Refactor]: worker as a separate entry point for cluster primary
1 parent 0d11ef2 commit 4bca7d2

File tree

4 files changed

+146
-44
lines changed

4 files changed

+146
-44
lines changed

packages/fastboot-app-server/src/fastboot-app-server.js

+88-43
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
const assert = require('assert');
44
const cluster = require('cluster');
55
const os = require('os');
6-
7-
const Worker = require('./worker');
6+
const path = require('path');
7+
const serialize = require('./utils/serialization').serialize;
88

99
class FastBootAppServer {
1010
constructor(options) {
@@ -33,37 +33,17 @@ class FastBootAppServer {
3333

3434
this.propagateUI();
3535

36-
if (cluster.isWorker) {
37-
this.worker = new Worker({
38-
ui: this.ui,
39-
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
40-
cache: this.cache,
41-
gzip: this.gzip,
42-
host: this.host,
43-
port: this.port,
44-
username: this.username,
45-
password: this.password,
46-
httpServer: this.httpServer,
47-
beforeMiddleware: this.beforeMiddleware,
48-
afterMiddleware: this.afterMiddleware,
49-
buildSandboxGlobals: this.buildSandboxGlobals,
50-
chunkedResponse: this.chunkedResponse,
51-
});
36+
this.workerCount = options.workerCount ||
37+
(process.env.NODE_ENV === 'test' ? 1 : null) ||
38+
os.cpus().length;
5239

53-
this.worker.start();
54-
} else {
55-
this.workerCount = options.workerCount ||
56-
(process.env.NODE_ENV === 'test' ? 1 : null) ||
57-
os.cpus().length;
40+
this._clusterInitialized = false;
5841

59-
assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
60-
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
61-
}
42+
assert(this.distPath || this.downloader, "FastBootAppServer must be provided with either a distPath or a downloader option.");
43+
assert(!(this.distPath && this.downloader), "FastBootAppServer must be provided with either a distPath or a downloader option, but not both.");
6244
}
6345

6446
start() {
65-
if (cluster.isWorker) { return; }
66-
6747
return this.initializeApp()
6848
.then(() => this.subscribeToNotifier())
6949
.then(() => this.forkWorkers())
@@ -74,6 +54,9 @@ class FastBootAppServer {
7454
})
7555
.catch(err => {
7656
this.ui.writeLine(err.stack);
57+
})
58+
.finally(() => {
59+
this._clusterInitialized = true;
7760
});
7861
}
7962

@@ -137,6 +120,12 @@ class FastBootAppServer {
137120
}
138121
}
139122

123+
/**
124+
* send message to worker
125+
*
126+
* @method broadcast
127+
* @param {Object} message
128+
*/
140129
broadcast(message) {
141130
let workers = cluster.workers;
142131

@@ -152,6 +141,10 @@ class FastBootAppServer {
152141
forkWorkers() {
153142
let promises = [];
154143

144+
// https://nodejs.org/api/cluster.html#cluster_cluster_setupprimary_settings
145+
// Note: cluster.setupPrimary in v16.0.0
146+
cluster.setupMaster(this.clusterSetupPrimary());
147+
155148
for (let i = 0; i < this.workerCount; i++) {
156149
promises.push(this.forkWorker());
157150
}
@@ -160,31 +153,53 @@ class FastBootAppServer {
160153
}
161154

162155
forkWorker() {
163-
let env = this.buildWorkerEnv();
164-
let worker = cluster.fork(env);
156+
let worker = cluster.fork(this.buildWorkerEnv());
165157

166-
this.ui.writeLine(`forked worker ${worker.process.pid}`);
158+
this.ui.writeLine(`Worker ${worker.process.pid} forked`);
159+
160+
let firstBootResolve;
161+
let firstBootReject;
162+
const firstBootPromise = new Promise((resolve, reject) => {
163+
firstBootResolve = resolve;
164+
firstBootReject = reject;
165+
});
166+
167+
if (this._clusterInitialized) {
168+
firstBootResolve();
169+
}
170+
171+
worker.on('online', () => {
172+
this.ui.writeLine(`Worker ${worker.process.pid} online.`);
173+
});
174+
175+
worker.on('message', (message) => {
176+
if (message.event === 'http-online') {
177+
this.ui.writeLine(`Worker ${worker.process.pid} healthy.`);
178+
firstBootResolve();
179+
}
180+
});
167181

168182
worker.on('exit', (code, signal) => {
183+
let error;
169184
if (signal) {
170-
this.ui.writeLine(`worker was killed by signal: ${signal}`);
185+
error = new Error(`Worker ${worker.process.pid} killed by signal: ${signal}`);
171186
} else if (code !== 0) {
172-
this.ui.writeLine(`worker exited with error code: ${code}`);
187+
error = new Error(`Worker ${worker.process.pid} exited with error code: ${code}`);
173188
} else {
174-
this.ui.writeLine(`worker exited`);
189+
error = new Error(`Worker ${worker.process.pid} exited gracefully. It should only exit when told to do so.`);
175190
}
176191

177-
this.forkWorker();
192+
if (!this._clusterInitialized) {
193+
// Do not respawn for a failed first launch.
194+
firstBootReject(error);
195+
} else {
196+
// Do respawn if you've ever successfully been initialized.
197+
this.ui.writeLine(error);
198+
this.forkWorker();
199+
}
178200
});
179201

180-
return new Promise(resolve => {
181-
this.ui.writeLine('worker online');
182-
worker.on('message', message => {
183-
if (message.event === 'http-online') {
184-
resolve();
185-
}
186-
});
187-
});
202+
return firstBootPromise;
188203
}
189204

190205
buildWorkerEnv() {
@@ -197,6 +212,36 @@ class FastBootAppServer {
197212
return env;
198213
}
199214

215+
/**
216+
* Extension point to allow configuring the default fork configuration.
217+
*
218+
* @method clusterSetupPrimary
219+
* @returns {Object}
220+
* @public
221+
*/
222+
clusterSetupPrimary() {
223+
const workerOptions = {
224+
ui: this.ui,
225+
distPath: this.distPath || process.env.FASTBOOT_DIST_PATH,
226+
cache: this.cache,
227+
gzip: this.gzip,
228+
host: this.host,
229+
port: this.port,
230+
username: this.username,
231+
password: this.password,
232+
httpServer: this.httpServer,
233+
beforeMiddleware: this.beforeMiddleware,
234+
afterMiddleware: this.afterMiddleware,
235+
buildSandboxGlobals: this.buildSandboxGlobals,
236+
chunkedResponse: this.chunkedResponse,
237+
};
238+
239+
const workerPath = this.workerPath || path.join(__dirname, './worker-start.js');
240+
return {
241+
exec: workerPath,
242+
args: [serialize(workerOptions)]
243+
};
244+
}
200245
}
201246

202247
module.exports = FastBootAppServer;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
3+
/**
4+
* The purpose of this module is to provide a serialization layer for passing arguments to a new Worker instance
5+
* This allows us to completely separate the cluster worker from the cluster primary
6+
*/
7+
8+
function circularReplacer() {
9+
const seen = new WeakSet();
10+
11+
return (key, value) => {
12+
if (typeof value === 'object' && value !== null) {
13+
if (seen.has(value)) {
14+
return;
15+
}
16+
17+
seen.add(value);
18+
}
19+
20+
return value;
21+
}
22+
}
23+
24+
function serialize(object) {
25+
let data = encodeURIComponent(JSON.stringify(object, circularReplacer()));
26+
let buff = new Buffer.from(data);
27+
return buff.toString('base64');
28+
}
29+
30+
function deserialize(string) {
31+
let buff = new Buffer.from(string, 'base64');
32+
return JSON.parse(decodeURIComponent(buff.toString('ascii')));
33+
}
34+
35+
module.exports = {
36+
serialize,
37+
deserialize
38+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
'use strict';
2+
3+
const ClusterWorker = require('./worker');
4+
const worker = new ClusterWorker();
5+
6+
worker.start();

packages/fastboot-app-server/src/worker.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@
33
const FastBoot = require('fastboot');
44
const fastbootMiddleware = require('fastboot-express-middleware');
55
const ExpressHTTPServer = require('./express-http-server');
6+
const deserialize = require('./utils/serialization').deserialize;
7+
const UI = require('./ui');
68

79
class Worker {
8-
constructor(options) {
10+
constructor(argOptions) {
11+
this.forkOptions = deserialize(process.argv[2])
12+
// Define the enumerated options set.
13+
// Combination of any launch options and any directly passed options.
14+
const options = Object.assign({}, this.forkOptions, argOptions);
15+
16+
this.ui = new UI();
917
this.distPath = options.distPath;
1018
this.httpServer = options.httpServer;
1119
this.ui = options.ui;
@@ -56,6 +64,11 @@ class Worker {
5664
process.on('message', message => this.handleMessage(message));
5765
}
5866

67+
/**
68+
* received messages from primary
69+
* @method handleMessage
70+
* @param {Object} message
71+
*/
5972
handleMessage(message) {
6073
switch (message.event) {
6174
case 'reload':

0 commit comments

Comments
 (0)