From 84cc7b28de65c3ebc089ecae37eaddec5485a83d Mon Sep 17 00:00:00 2001 From: Luke Melia Date: Mon, 20 Jan 2025 18:40:34 -0500 Subject: [PATCH 1/2] Convert worker-manager TCP server to an HTTP server that exposes queue depth --- README.md | 10 +- packages/boxel-ui/test-app/.ember-cli.js | 2 +- packages/realm-server/main.ts | 54 ++----- .../scripts/start-worker-production.sh | 1 + .../scripts/start-worker-staging.sh | 1 + packages/realm-server/worker-manager.ts | 142 +++++++++++------- 6 files changed, 109 insertions(+), 101 deletions(-) diff --git a/README.md b/README.md index 2f45eb8e22..d0e2dea33a 100644 --- a/README.md +++ b/README.md @@ -82,10 +82,10 @@ Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` | :4201 | `/seed` seed realm | ✅ | 🚫 | | :4202 | `/test` host test realm, `/node-test` node test realm | ✅ | 🚫 | | :4205 | `/test` realm for matrix client tests (playwright controlled) | 🚫 | 🚫 | -| :4210 | Development Worker Manager (spins up 1 worker by default) | ✅ | 🚫 | -| :4211 | Test Worker Manager (spins up 1 worker by default) | ✅ | 🚫 | -| :4212 | Test Worker Manager for matrix client tests (playwright controlled - 1 worker) | ✅ | 🚫 | -| :4213 | Test Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) | ✅ | 🚫 | +| :4210 | Worker Manager (spins up 1 worker by default in development) | ✅ | 🚫 | +| :4211 | Worker Manager (spins up 1 worker by default) | ✅ | 🚫 | +| :4212 | Worker Manager for matrix client tests (playwright controlled - 1 worker) | ✅ | 🚫 | +| :4213 | Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) | ✅ | 🚫 | | :5001 | Mail user interface for viewing emails sent to local SMTP | ✅ | 🚫 | | :5435 | Postgres DB | ✅ | 🚫 | | :8008 | Matrix synapse server | ✅ | 🚫 | @@ -223,7 +223,7 @@ There is a ember-freestyle component explorer available to assist with developme 1. `cd packages/boxel-ui/test-app` 2. `pnpm start` -3. Visit http://localhost:4210/ in your browser +3. Visit http://localhost:4220/ in your browser ## Boxel Motion Demo App diff --git a/packages/boxel-ui/test-app/.ember-cli.js b/packages/boxel-ui/test-app/.ember-cli.js index d81fe1db49..9490e5747d 100644 --- a/packages/boxel-ui/test-app/.ember-cli.js +++ b/packages/boxel-ui/test-app/.ember-cli.js @@ -14,7 +14,7 @@ module.exports = { Setting `disableAnalytics` to true will prevent any data from being sent. */ - port: 4210, + port: 4220, testPort: 7356, disableAnalytics: false, }; diff --git a/packages/realm-server/main.ts b/packages/realm-server/main.ts index f253e7cacb..684c663618 100644 --- a/packages/realm-server/main.ts +++ b/packages/realm-server/main.ts @@ -11,7 +11,6 @@ import { NodeAdapter } from './node-realm'; import yargs from 'yargs'; import { RealmServer } from './server'; import { resolve } from 'path'; -import { createConnection, type Socket } from 'net'; import { makeFastBootIndexRunner } from './fastboot'; import { shimExternals } from './lib/externals'; import * as Sentry from '@sentry/node'; @@ -331,49 +330,20 @@ let autoMigrate = migrateDB || undefined; process.exit(-3); }); -let workerReadyDeferred: Deferred | undefined; async function waitForWorkerManager(port: number) { - const workerManager = await new Promise((r) => { - let socket = createConnection({ port }, () => { - log.info(`Connected to worker manager on port ${port}`); - r(socket); - }); - }); - - workerManager.on('data', (data) => { - let res = data.toString(); - if (!workerReadyDeferred) { - throw new Error( - `received unsolicited message from worker manager on port ${port}`, - ); + let isReady = false; + let timeout = Date.now() + 30_000; + do { + let response = await fetch(`http://localhost:${port}/`); + if (response.ok) { + let json = await response.json(); + isReady = json.ready; } - switch (res) { - case 'ready': - case 'not-ready': - workerReadyDeferred.fulfill(res === 'ready' ? true : false); - break; - default: - workerReadyDeferred.reject( - `unexpected response from worker manager: ${res}`, - ); - } - }); - - try { - let isReady = false; - let timeout = Date.now() + 30_000; - do { - workerReadyDeferred = new Deferred(); - workerManager.write('ready?'); - isReady = await workerReadyDeferred.promise; - } while (!isReady && Date.now() < timeout); - if (!isReady) { - throw new Error( - `timed out trying to connect to worker manager on port ${port}`, - ); - } - } finally { - workerManager.end(); + } while (!isReady && Date.now() < timeout); + if (!isReady) { + throw new Error( + `timed out trying to waiting for worker manager to be ready on port ${port}`, + ); } log.info('workers are ready'); } diff --git a/packages/realm-server/scripts/start-worker-production.sh b/packages/realm-server/scripts/start-worker-production.sh index 9fb91fae80..f9edef02fe 100755 --- a/packages/realm-server/scripts/start-worker-production.sh +++ b/packages/realm-server/scripts/start-worker-production.sh @@ -3,6 +3,7 @@ NODE_NO_WARNINGS=1 \ ts-node \ --transpileOnly worker-manager \ + --port=4210 \ --allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \ --highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \ --matrixURL='https://matrix.boxel.ai' \ diff --git a/packages/realm-server/scripts/start-worker-staging.sh b/packages/realm-server/scripts/start-worker-staging.sh index 6e1380fe97..4ce61b9efe 100755 --- a/packages/realm-server/scripts/start-worker-staging.sh +++ b/packages/realm-server/scripts/start-worker-staging.sh @@ -3,6 +3,7 @@ NODE_NO_WARNINGS=1 \ ts-node \ --transpileOnly worker-manager \ + --port=4210 \ --allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \ --highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \ --matrixURL='https://matrix-staging.stack.cards' \ diff --git a/packages/realm-server/worker-manager.ts b/packages/realm-server/worker-manager.ts index af2c3a1c7b..119923cfab 100644 --- a/packages/realm-server/worker-manager.ts +++ b/packages/realm-server/worker-manager.ts @@ -4,15 +4,19 @@ import { logger, userInitiatedPriority, systemInitiatedPriority, + query, } from '@cardstack/runtime-common'; import yargs from 'yargs'; import * as Sentry from '@sentry/node'; -import { createServer } from 'net'; import flattenDeep from 'lodash/flattenDeep'; import { spawn } from 'child_process'; import pluralize from 'pluralize'; +import Koa from 'koa'; +import Router from '@koa/router'; +import { ecsMetadata, fullRequestURL, livenessCheck } from './middleware'; +import { PgAdapter } from '@cardstack/postgres'; -let log = logger('worker'); +let log = logger('worker-manager'); const REALM_SECRET_SEED = process.env.REALM_SECRET_SEED; if (!REALM_SECRET_SEED) { @@ -34,8 +38,10 @@ let { .usage('Start worker manager') .options({ port: { - description: 'TCP port for worker to communicate readiness (for tests)', + description: + 'HTTP port for worker manager to communicate readiness and status', type: 'number', + demandOption: true, }, highPriorityCount: { description: @@ -75,63 +81,93 @@ let isExiting = false; process.on('SIGINT', () => (isExiting = true)); process.on('SIGTERM', () => (isExiting = true)); -if (port != null) { - // in tests we start a simple TCP server to communicate to the realm when - // the worker is ready to start processing jobs - let server = createServer((socket) => { - log.info(`realm connected to worker manager`); - socket.on('data', (data) => { - if (data.toString() === 'ready?') { - socket.write(isReady ? 'ready' : 'not-ready'); - } - }); - socket.on('close', (hadError) => { - log.info(`realm has disconnected${hadError ? ' due to an error' : ''}.`); - }); - socket.on('error', (err: any) => { - console.error(`realm disconnected from worker manager: ${err.message}`); - }); - }); - server.unref(); +let dbAdapter = new PgAdapter({}); - server.listen(port, () => { - log.info(`worker manager listening for realm on port ${port}`); - }); +let webServer = new Koa(); +let router = new Router(); +router.head('/', livenessCheck); +router.get('/', async (ctxt: Koa.Context, _next: Koa.Next) => { + let result = { + ready: isReady, + } as Record; + if (isReady) { + let [{ queue_depth }] = (await query(dbAdapter, [ + `SELECT COUNT(*) as queue_depth FROM jobs WHERE status='unfulfilled'`, + ])) as { + queue_depth: string; + }[]; + result = { + ...result, + highPriorityWorkers: highPriorityCount, + allPriorityWorkers: allPriorityCount, + queueDepth: parseInt(queue_depth, 10), + }; + } + ctxt.set('Content-Type', 'application/json'); + ctxt.body = JSON.stringify(result); + ctxt.status = isReady ? 200 : 503; +}); + +webServer + .use(router.routes()) + .use((ctxt: Koa.Context, next: Koa.Next) => { + log.info( + `<-- ${ctxt.method} ${ctxt.req.headers.accept} ${ + fullRequestURL(ctxt).href + }`, + ); - const shutdown = () => { - log.info(`Shutting down server for worker manager...`); - server.close((err) => { - if (err) { - log.error(`Error while closing the server for worker manager:`, err); - process.exit(1); - } - log.info(`Server closed for worker manager.`); - process.exit(0); + ctxt.res.on('finish', () => { + log.info( + `--> ${ctxt.method} ${ctxt.req.headers.accept} ${ + fullRequestURL(ctxt).href + }: ${ctxt.status}`, + ); + log.debug(JSON.stringify(ctxt.req.headers)); }); - }; + return next(); + }) + .use(ecsMetadata); - process.on('SIGINT', shutdown); - process.on('SIGTERM', shutdown); - process.on('uncaughtException', (err) => { - log.error(`Uncaught exception in worker manager:`, err); - shutdown(); - }); +webServer.on('error', (err: any) => { + console.error(`worker manager HTTP server error: ${err.message}`); +}); - process.on('message', (message) => { - if (message === 'stop') { - console.log(`stopping realm server on port ${port}...`); - server.close(() => { - console.log(`worker manager on port ${port} has stopped`); - if (process.send) { - process.send('stopped'); - } - }); - } else if (message === 'kill') { - console.log(`Ending worker manager process for ${port}...`); - process.exit(0); +let webServerInstance = webServer.listen(port); +log.info(`worker manager HTTP listening on port ${port}`); + +const shutdown = (onShutdown?: () => void) => { + log.info(`Shutting down server for worker manager...`); + webServerInstance.closeAllConnections(); + webServerInstance.close((err?: Error) => { + if (err) { + log.error(`Error while closing the server for worker manager HTTP:`, err); + process.exit(1); } + dbAdapter.close(); // warning this is async + log.info(`worker manager HTTP on port ${port} has stopped.`); + onShutdown?.(); + process.exit(0); }); -} +}; + +process.on('SIGINT', shutdown); +process.on('SIGTERM', shutdown); +process.on('uncaughtException', (err) => { + log.error(`Uncaught exception in worker manager:`, err); + shutdown(); +}); + +process.on('message', (message) => { + if (message === 'stop') { + shutdown(() => { + process.send?.('stopped'); + }); + } else if (message === 'kill') { + console.log(`Ending worker manager process for ${port}...`); + process.exit(0); + } +}); (async () => { log.info( From 227d2201e8e066bec3cdc3c9e130195f29232ba5 Mon Sep 17 00:00:00 2001 From: Luke Melia Date: Tue, 21 Jan 2025 11:20:05 -0500 Subject: [PATCH 2/2] Update to retain the TCP -> HTTP conversion but drop the queue depth. - In a separate PR, we will add an endpoint to realm server to report on queue stats starting with queue depth. --- README.md | 6 +- .../scripts/start-worker-production.sh | 1 - .../scripts/start-worker-staging.sh | 1 - packages/realm-server/worker-manager.ts | 108 +++++++++--------- 4 files changed, 57 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index d0e2dea33a..1bddceb8d6 100644 --- a/README.md +++ b/README.md @@ -82,8 +82,8 @@ Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` | :4201 | `/seed` seed realm | ✅ | 🚫 | | :4202 | `/test` host test realm, `/node-test` node test realm | ✅ | 🚫 | | :4205 | `/test` realm for matrix client tests (playwright controlled) | 🚫 | 🚫 | -| :4210 | Worker Manager (spins up 1 worker by default in development) | ✅ | 🚫 | -| :4211 | Worker Manager (spins up 1 worker by default) | ✅ | 🚫 | +| :4210 | Development Worker Manager (spins up 1 worker by default) | ✅ | 🚫 | +| :4211 | Test Worker Manager (spins up 1 worker by default) | ✅ | 🚫 | | :4212 | Worker Manager for matrix client tests (playwright controlled - 1 worker) | ✅ | 🚫 | | :4213 | Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) | ✅ | 🚫 | | :5001 | Mail user interface for viewing emails sent to local SMTP | ✅ | 🚫 | @@ -290,7 +290,7 @@ To run the `packages/realm-server/` workspace tests start: ### Boxel UI 1. `cd packages/boxel-ui/test-app` -2. `pnpm test` (or `pnpm start` and visit http://localhost:4210/tests to run tests in the browser) +2. `pnpm test` (or `pnpm start` and visit http://localhost:4220/tests to run tests in the browser) ### Boxel Motion diff --git a/packages/realm-server/scripts/start-worker-production.sh b/packages/realm-server/scripts/start-worker-production.sh index f9edef02fe..9fb91fae80 100755 --- a/packages/realm-server/scripts/start-worker-production.sh +++ b/packages/realm-server/scripts/start-worker-production.sh @@ -3,7 +3,6 @@ NODE_NO_WARNINGS=1 \ ts-node \ --transpileOnly worker-manager \ - --port=4210 \ --allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \ --highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \ --matrixURL='https://matrix.boxel.ai' \ diff --git a/packages/realm-server/scripts/start-worker-staging.sh b/packages/realm-server/scripts/start-worker-staging.sh index 4ce61b9efe..6e1380fe97 100755 --- a/packages/realm-server/scripts/start-worker-staging.sh +++ b/packages/realm-server/scripts/start-worker-staging.sh @@ -3,7 +3,6 @@ NODE_NO_WARNINGS=1 \ ts-node \ --transpileOnly worker-manager \ - --port=4210 \ --allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \ --highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \ --matrixURL='https://matrix-staging.stack.cards' \ diff --git a/packages/realm-server/worker-manager.ts b/packages/realm-server/worker-manager.ts index 119923cfab..522db06fac 100644 --- a/packages/realm-server/worker-manager.ts +++ b/packages/realm-server/worker-manager.ts @@ -4,7 +4,6 @@ import { logger, userInitiatedPriority, systemInitiatedPriority, - query, } from '@cardstack/runtime-common'; import yargs from 'yargs'; import * as Sentry from '@sentry/node'; @@ -14,7 +13,14 @@ import pluralize from 'pluralize'; import Koa from 'koa'; import Router from '@koa/router'; import { ecsMetadata, fullRequestURL, livenessCheck } from './middleware'; -import { PgAdapter } from '@cardstack/postgres'; +import { Server } from 'http'; + +/* About the Worker Manager + * + * This process runs on each queue worker container and is responsible starting and monitoring the worker processes. It does this via IPC (inter-process communication). + * In test and development environments, the worker manager is also responsible for providing a readiness check HTTP endpoint so that tests can wait until the worker + * manager is ready before proceeding. + */ let log = logger('worker-manager'); @@ -41,7 +47,6 @@ let { description: 'HTTP port for worker manager to communicate readiness and status', type: 'number', - demandOption: true, }, highPriorityCount: { description: @@ -81,70 +86,65 @@ let isExiting = false; process.on('SIGINT', () => (isExiting = true)); process.on('SIGTERM', () => (isExiting = true)); -let dbAdapter = new PgAdapter({}); - -let webServer = new Koa(); -let router = new Router(); -router.head('/', livenessCheck); -router.get('/', async (ctxt: Koa.Context, _next: Koa.Next) => { - let result = { - ready: isReady, - } as Record; - if (isReady) { - let [{ queue_depth }] = (await query(dbAdapter, [ - `SELECT COUNT(*) as queue_depth FROM jobs WHERE status='unfulfilled'`, - ])) as { - queue_depth: string; - }[]; - result = { - ...result, - highPriorityWorkers: highPriorityCount, - allPriorityWorkers: allPriorityCount, - queueDepth: parseInt(queue_depth, 10), - }; - } - ctxt.set('Content-Type', 'application/json'); - ctxt.body = JSON.stringify(result); - ctxt.status = isReady ? 200 : 503; -}); +let webServerInstance: Server | undefined; -webServer - .use(router.routes()) - .use((ctxt: Koa.Context, next: Koa.Next) => { - log.info( - `<-- ${ctxt.method} ${ctxt.req.headers.accept} ${ - fullRequestURL(ctxt).href - }`, - ); +if (port) { + let webServer = new Koa(); + let router = new Router(); + router.head('/', livenessCheck); + router.get('/', async (ctxt: Koa.Context, _next: Koa.Next) => { + let result = { + ready: isReady, + } as Record; + if (isReady) { + result = { + ...result, + highPriorityWorkers: highPriorityCount, + allPriorityWorkers: allPriorityCount, + }; + } + ctxt.set('Content-Type', 'application/json'); + ctxt.body = JSON.stringify(result); + ctxt.status = isReady ? 200 : 503; + }); - ctxt.res.on('finish', () => { + webServer + .use(router.routes()) + .use((ctxt: Koa.Context, next: Koa.Next) => { log.info( - `--> ${ctxt.method} ${ctxt.req.headers.accept} ${ + `<-- ${ctxt.method} ${ctxt.req.headers.accept} ${ fullRequestURL(ctxt).href - }: ${ctxt.status}`, + }`, ); - log.debug(JSON.stringify(ctxt.req.headers)); - }); - return next(); - }) - .use(ecsMetadata); -webServer.on('error', (err: any) => { - console.error(`worker manager HTTP server error: ${err.message}`); -}); + ctxt.res.on('finish', () => { + log.info( + `--> ${ctxt.method} ${ctxt.req.headers.accept} ${ + fullRequestURL(ctxt).href + }: ${ctxt.status}`, + ); + log.debug(JSON.stringify(ctxt.req.headers)); + }); + return next(); + }) + .use(ecsMetadata); -let webServerInstance = webServer.listen(port); -log.info(`worker manager HTTP listening on port ${port}`); + webServer.on('error', (err: any) => { + log.error(`worker manager HTTP server error: ${err.message}`); + }); + + webServerInstance = webServer.listen(port); + log.info(`worker manager HTTP listening on port ${port}`); +} const shutdown = (onShutdown?: () => void) => { log.info(`Shutting down server for worker manager...`); - webServerInstance.closeAllConnections(); - webServerInstance.close((err?: Error) => { + webServerInstance?.closeAllConnections(); + webServerInstance?.close((err?: Error) => { if (err) { log.error(`Error while closing the server for worker manager HTTP:`, err); process.exit(1); } - dbAdapter.close(); // warning this is async log.info(`worker manager HTTP on port ${port} has stopped.`); onShutdown?.(); process.exit(0); @@ -164,7 +164,7 @@ process.on('message', (message) => { process.send?.('stopped'); }); } else if (message === 'kill') { - console.log(`Ending worker manager process for ${port}...`); + log.info(`Ending worker manager process for ${port}...`); process.exit(0); } });