Skip to content

Commit 3a5ea41

Browse files
authored
Merge pull request #1095 from cardstack/cs-6459-review-home-grown-job-queue-implementation
Add Queue interface and BrowserQueue implementation
2 parents 4eed5de + cb04012 commit 3a5ea41

File tree

6 files changed

+5917
-13367
lines changed

6 files changed

+5917
-13367
lines changed
+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import debounce from 'lodash/debounce';
2+
3+
import { v4 as uuidv4 } from 'uuid';
4+
5+
import {
6+
type Queue,
7+
type PgPrimitive,
8+
Job,
9+
Deferred,
10+
} from '@cardstack/runtime-common';
11+
12+
export class BrowserQueue implements Queue {
13+
#isDestroyed = false;
14+
#hasStarted = false;
15+
#flush: Promise<void> | undefined;
16+
17+
// no need for "onAfterJob--that's just the Job.done promise
18+
constructor(private onBeforeJob?: (jobId: string) => void) {}
19+
20+
private jobs: {
21+
jobId: string;
22+
category: string;
23+
arg: PgPrimitive;
24+
notifier: Deferred<any>;
25+
}[] = [];
26+
private categories: Map<string, (arg: any) => Promise<any>> = new Map();
27+
28+
get isDestroyed() {
29+
return this.#isDestroyed;
30+
}
31+
32+
get hasStarted() {
33+
return this.#hasStarted;
34+
}
35+
36+
async flush() {
37+
await this.#flush;
38+
}
39+
40+
start() {
41+
this.#hasStarted = true;
42+
}
43+
44+
register<A, T>(category: string, handler: (arg: A) => Promise<T>) {
45+
if (!this.#hasStarted) {
46+
throw new Error(`Cannot register category on unstarted Queue`);
47+
}
48+
if (this.isDestroyed) {
49+
throw new Error(`Cannot register category on a destroyed Queue`);
50+
}
51+
this.categories.set(category, handler);
52+
this.debouncedDrainJobs();
53+
}
54+
55+
async publish<T>(category: string, arg: PgPrimitive): Promise<Job<T>> {
56+
if (!this.#hasStarted) {
57+
throw new Error(`Cannot publish job on unstarted Queue`);
58+
}
59+
if (this.isDestroyed) {
60+
throw new Error(`Cannot publish job on a destroyed Queue`);
61+
}
62+
let jobId = uuidv4();
63+
let notifier = new Deferred<T>();
64+
let job = new Job(jobId, notifier);
65+
this.jobs.push({
66+
jobId,
67+
notifier,
68+
category,
69+
arg,
70+
});
71+
this.debouncedDrainJobs();
72+
return job;
73+
}
74+
75+
private debouncedDrainJobs = debounce(() => {
76+
this.drainJobs();
77+
}, 250);
78+
79+
private async drainJobs() {
80+
await this.flush();
81+
82+
let jobsDrained: () => void;
83+
this.#flush = new Promise((res) => (jobsDrained = res));
84+
let jobs = [...this.jobs];
85+
this.jobs = [];
86+
for (let workItem of jobs) {
87+
let { jobId, category, notifier, arg } = workItem;
88+
let handler = this.categories.get(category);
89+
if (!handler) {
90+
// no handler for this job, add it back to the queue
91+
this.jobs.push(workItem);
92+
continue;
93+
}
94+
95+
if (this.onBeforeJob) {
96+
this.onBeforeJob(jobId);
97+
}
98+
try {
99+
notifier.fulfill(await handler(arg));
100+
} catch (e: any) {
101+
notifier.reject(e);
102+
}
103+
}
104+
105+
jobsDrained!();
106+
}
107+
108+
async destroy() {
109+
this.#isDestroyed = true;
110+
await this.flush();
111+
}
112+
}
+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { module, test } from 'qunit';
2+
3+
import { type Queue } from '@cardstack/runtime-common';
4+
5+
import { BrowserQueue } from '@cardstack/host/lib/browser-queue';
6+
7+
module('Unit | queue | browser implementation', function (hooks) {
8+
let queue: Queue;
9+
10+
hooks.beforeEach(async function () {
11+
queue = new BrowserQueue();
12+
queue.start();
13+
});
14+
15+
hooks.afterEach(async function () {
16+
await queue.destroy();
17+
});
18+
19+
test('it can run a job', async function (assert) {
20+
let job = await queue.publish<number>('increment', 17, {
21+
queueName: 'first-ephemeral-realm-incrementing',
22+
});
23+
queue.register('increment', async (a: number) => a + 1);
24+
let result = await job.done;
25+
assert.strictEqual(result, 18);
26+
});
27+
28+
test(`a job can throw an exception`, async function (assert) {
29+
queue.register('increment', async (a: number) => a + 1);
30+
queue.register('boom', async () => {
31+
throw new Error('boom!');
32+
});
33+
let [errorJob, nonErrorJob] = await Promise.all([
34+
queue.publish<number>('boom', null),
35+
queue.publish<number>('increment', 17),
36+
]);
37+
38+
// assert that the error that was thrown does not prevent subsequent jobs
39+
// from running
40+
let [errorResults, nonErrorResults] = await Promise.allSettled([
41+
errorJob.done,
42+
nonErrorJob.done,
43+
]);
44+
if (errorResults.status === 'rejected') {
45+
assert.strictEqual(errorResults.reason.message, 'boom!');
46+
} else {
47+
assert.ok(false, `expected 'errorJob' to be rejected`);
48+
}
49+
if (nonErrorResults.status === 'fulfilled') {
50+
assert.strictEqual(nonErrorResults.value, 18);
51+
} else {
52+
assert.ok(false, `expected 'nonErrorJob' to be fulfilled`);
53+
}
54+
});
55+
});

packages/runtime-common/expression.ts

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import * as JSON from 'json-typescript';
2+
3+
export type Expression = (string | Param)[];
4+
5+
export type PgPrimitive =
6+
| number
7+
| string
8+
| boolean
9+
| JSON.Object
10+
| JSON.Arr
11+
| null;
12+
13+
export interface Param {
14+
param: PgPrimitive;
15+
kind: 'param';
16+
}

packages/runtime-common/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import { RealmPaths, type LocalPath } from './paths';
3737
import { Query } from './query';
3838
import { Loader } from './loader';
3939
export * from './constants';
40+
export * from './queue';
41+
export * from './expression';
4042
export { makeLogDefinitions, logger } from './log';
4143
export { RealmPaths, Loader, type LocalPath, type Query };
4244
export { NotLoaded, isNotLoadedError } from './not-loaded';

packages/runtime-common/queue.ts

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { PgPrimitive } from './expression';
2+
import { Deferred } from './deferred';
3+
4+
export interface QueueOpts {
5+
queueName?: string;
6+
}
7+
8+
export interface Queue {
9+
isDestroyed: boolean;
10+
hasStarted: boolean;
11+
// postgres needs time to initialize, so we only start our queue after
12+
// postgres is running
13+
start: () => void;
14+
destroy: () => Promise<void>;
15+
register: <A, T>(category: string, handler: (arg: A) => Promise<T>) => void;
16+
publish: <T>(
17+
category: string,
18+
arg: PgPrimitive,
19+
opts?: QueueOpts,
20+
) => Promise<Job<T>>;
21+
}
22+
23+
export interface JobNotifier {
24+
resolve: Function;
25+
reject: Function;
26+
}
27+
28+
export class Job<T> {
29+
constructor(
30+
public uuid: string,
31+
private notifier: Deferred<T>,
32+
) {}
33+
get done(): Promise<T> {
34+
return this.notifier.promise;
35+
}
36+
}

0 commit comments

Comments
 (0)