4
4
logger ,
5
5
userInitiatedPriority ,
6
6
systemInitiatedPriority ,
7
- query ,
8
7
} from '@cardstack/runtime-common' ;
9
8
import yargs from 'yargs' ;
10
9
import * as Sentry from '@sentry/node' ;
@@ -14,7 +13,14 @@ import pluralize from 'pluralize';
14
13
import Koa from 'koa' ;
15
14
import Router from '@koa/router' ;
16
15
import { ecsMetadata , fullRequestURL , livenessCheck } from './middleware' ;
17
- import { PgAdapter } from '@cardstack/postgres' ;
16
+ import { Server } from 'http' ;
17
+
18
+ /* About the Worker Manager
19
+ *
20
+ * This process runs on each queue worker container and is responsible starting and monitoring the worker processes. It does this via IPC (inter-process communication).
21
+ * In test and development environments, the worker manager is also responsible for providing a readiness check HTTP endpoint so that tests can wait until the worker
22
+ * manager is ready before proceeding.
23
+ */
18
24
19
25
let log = logger ( 'worker-manager' ) ;
20
26
41
47
description :
42
48
'HTTP port for worker manager to communicate readiness and status' ,
43
49
type : 'number' ,
44
- demandOption : true ,
45
50
} ,
46
51
highPriorityCount : {
47
52
description :
@@ -81,70 +86,65 @@ let isExiting = false;
81
86
process . on ( 'SIGINT' , ( ) => ( isExiting = true ) ) ;
82
87
process . on ( 'SIGTERM' , ( ) => ( isExiting = true ) ) ;
83
88
84
- let dbAdapter = new PgAdapter ( { } ) ;
85
-
86
- let webServer = new Koa < Koa . DefaultState , Koa . Context > ( ) ;
87
- let router = new Router ( ) ;
88
- router . head ( '/' , livenessCheck ) ;
89
- router . get ( '/' , async ( ctxt : Koa . Context , _next : Koa . Next ) => {
90
- let result = {
91
- ready : isReady ,
92
- } as Record < string , boolean | number > ;
93
- if ( isReady ) {
94
- let [ { queue_depth } ] = ( await query ( dbAdapter , [
95
- `SELECT COUNT(*) as queue_depth FROM jobs WHERE status='unfulfilled'` ,
96
- ] ) ) as {
97
- queue_depth : string ;
98
- } [ ] ;
99
- result = {
100
- ...result ,
101
- highPriorityWorkers : highPriorityCount ,
102
- allPriorityWorkers : allPriorityCount ,
103
- queueDepth : parseInt ( queue_depth , 10 ) ,
104
- } ;
105
- }
106
- ctxt . set ( 'Content-Type' , 'application/json' ) ;
107
- ctxt . body = JSON . stringify ( result ) ;
108
- ctxt . status = isReady ? 200 : 503 ;
109
- } ) ;
89
+ let webServerInstance : Server | undefined ;
110
90
111
- webServer
112
- . use ( router . routes ( ) )
113
- . use ( ( ctxt : Koa . Context , next : Koa . Next ) => {
114
- log . info (
115
- `<-- ${ ctxt . method } ${ ctxt . req . headers . accept } ${
116
- fullRequestURL ( ctxt ) . href
117
- } `,
118
- ) ;
91
+ if ( port ) {
92
+ let webServer = new Koa < Koa . DefaultState , Koa . Context > ( ) ;
93
+ let router = new Router ( ) ;
94
+ router . head ( '/' , livenessCheck ) ;
95
+ router . get ( '/' , async ( ctxt : Koa . Context , _next : Koa . Next ) => {
96
+ let result = {
97
+ ready : isReady ,
98
+ } as Record < string , boolean | number > ;
99
+ if ( isReady ) {
100
+ result = {
101
+ ...result ,
102
+ highPriorityWorkers : highPriorityCount ,
103
+ allPriorityWorkers : allPriorityCount ,
104
+ } ;
105
+ }
106
+ ctxt . set ( 'Content-Type' , 'application/json' ) ;
107
+ ctxt . body = JSON . stringify ( result ) ;
108
+ ctxt . status = isReady ? 200 : 503 ;
109
+ } ) ;
119
110
120
- ctxt . res . on ( 'finish' , ( ) => {
111
+ webServer
112
+ . use ( router . routes ( ) )
113
+ . use ( ( ctxt : Koa . Context , next : Koa . Next ) => {
121
114
log . info (
122
- `--> ${ ctxt . method } ${ ctxt . req . headers . accept } ${
115
+ `<-- ${ ctxt . method } ${ ctxt . req . headers . accept } ${
123
116
fullRequestURL ( ctxt ) . href
124
- } : ${ ctxt . status } `,
117
+ } `,
125
118
) ;
126
- log . debug ( JSON . stringify ( ctxt . req . headers ) ) ;
127
- } ) ;
128
- return next ( ) ;
129
- } )
130
- . use ( ecsMetadata ) ;
131
119
132
- webServer . on ( 'error' , ( err : any ) => {
133
- console . error ( `worker manager HTTP server error: ${ err . message } ` ) ;
134
- } ) ;
120
+ ctxt . res . on ( 'finish' , ( ) => {
121
+ log . info (
122
+ `--> ${ ctxt . method } ${ ctxt . req . headers . accept } ${
123
+ fullRequestURL ( ctxt ) . href
124
+ } : ${ ctxt . status } `,
125
+ ) ;
126
+ log . debug ( JSON . stringify ( ctxt . req . headers ) ) ;
127
+ } ) ;
128
+ return next ( ) ;
129
+ } )
130
+ . use ( ecsMetadata ) ;
135
131
136
- let webServerInstance = webServer . listen ( port ) ;
137
- log . info ( `worker manager HTTP listening on port ${ port } ` ) ;
132
+ webServer . on ( 'error' , ( err : any ) => {
133
+ log . error ( `worker manager HTTP server error: ${ err . message } ` ) ;
134
+ } ) ;
135
+
136
+ webServerInstance = webServer . listen ( port ) ;
137
+ log . info ( `worker manager HTTP listening on port ${ port } ` ) ;
138
+ }
138
139
139
140
const shutdown = ( onShutdown ?: ( ) => void ) => {
140
141
log . info ( `Shutting down server for worker manager...` ) ;
141
- webServerInstance . closeAllConnections ( ) ;
142
- webServerInstance . close ( ( err ?: Error ) => {
142
+ webServerInstance ? .closeAllConnections ( ) ;
143
+ webServerInstance ? .close ( ( err ?: Error ) => {
143
144
if ( err ) {
144
145
log . error ( `Error while closing the server for worker manager HTTP:` , err ) ;
145
146
process . exit ( 1 ) ;
146
147
}
147
- dbAdapter . close ( ) ; // warning this is async
148
148
log . info ( `worker manager HTTP on port ${ port } has stopped.` ) ;
149
149
onShutdown ?.( ) ;
150
150
process . exit ( 0 ) ;
@@ -164,7 +164,7 @@ process.on('message', (message) => {
164
164
process . send ?.( 'stopped' ) ;
165
165
} ) ;
166
166
} else if ( message === 'kill' ) {
167
- console . log ( `Ending worker manager process for ${ port } ...` ) ;
167
+ log . info ( `Ending worker manager process for ${ port } ...` ) ;
168
168
process . exit ( 0 ) ;
169
169
}
170
170
} ) ;
0 commit comments