Skip to content

Commit 8bca842

Browse files
committed
make Redis backward compatible
1 parent f1c6a7a commit 8bca842

File tree

12 files changed

+168
-138
lines changed

12 files changed

+168
-138
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- DropForeignKey
2+
ALTER TABLE "contract_subscriptions" DROP CONSTRAINT "contract_subscriptions_webhookId_fkey";
3+
4+
-- AddForeignKey
5+
ALTER TABLE "contract_subscriptions" ADD CONSTRAINT "contract_subscriptions_webhookId_fkey" FOREIGN KEY ("webhookId") REFERENCES "webhooks"("id") ON DELETE SET NULL ON UPDATE CASCADE;

src/prisma/schema.prisma

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ model ContractSubscriptions {
196196
updatedAt DateTime @updatedAt
197197
deletedAt DateTime?
198198
199-
webhook Webhooks? @relation(fields: [webhookId], references: [id], onDelete: Cascade)
199+
webhook Webhooks? @relation(fields: [webhookId], references: [id], onDelete: SetNull)
200200
201201
// optimize distinct lookups
202202
@@index([chainId])

src/server/routes/contract/subscriptions/addContractSubscription.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ export async function addContractSubscription(fastify: FastifyInstance) {
9494

9595
const webhook = await insertWebhook({
9696
eventType: WebhooksEventTypes.CONTRACT_SUBSCRIPTION,
97-
name: "Auto-generated for contract subscription",
97+
name: "(Auto-generated)",
9898
url: webhookUrl,
9999
});
100100
webhookId = webhook.id;

src/server/routes/system/health.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@ import { FastifyInstance } from "fastify";
33
import { StatusCodes } from "http-status-codes";
44
import { isDatabaseHealthy } from "../../../db/client";
55
import { env } from "../../../utils/env";
6+
import { redis } from "../../../utils/redis/redis";
67

7-
type EngineFeature = "KEYPAIR_AUTH";
8+
type EngineFeature = "KEYPAIR_AUTH" | "CONTRACT_SUBSCRIPTIONS";
89

910
const ReplySchemaOk = Type.Object({
1011
status: Type.String(),
1112
engineVersion: Type.Optional(Type.String()),
12-
features: Type.Array(Type.Union([Type.Literal("KEYPAIR_AUTH")])),
13+
features: Type.Array(
14+
Type.Union([
15+
Type.Literal("KEYPAIR_AUTH"),
16+
Type.Literal("CONTRACT_SUBSCRIPTIONS"),
17+
]),
18+
),
1319
});
1420

1521
const ReplySchemaError = Type.Object({
@@ -54,6 +60,10 @@ export async function healthCheck(fastify: FastifyInstance) {
5460

5561
const getFeatures = (): EngineFeature[] => {
5662
const features: EngineFeature[] = [];
63+
5764
if (env.ENABLE_KEYPAIR_AUTH) features.push("KEYPAIR_AUTH");
65+
// Contract subscriptions requires Redis.
66+
if (redis) features.push("CONTRACT_SUBSCRIPTIONS");
67+
5868
return features;
5969
};

src/utils/env.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export const env = createEnv({
7171
.number()
7272
.nonnegative()
7373
.default(0),
74-
REDIS_URL: z.string().default("redis://localhost:6379"),
74+
REDIS_URL: z.string().optional(),
7575
},
7676
clientPrefix: "NEVER_USED",
7777
client: {},

src/utils/redis/redis.ts

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,47 @@ import Redis from "ioredis";
22
import { env } from "../env";
33
import { logger } from "../logger";
44

5-
export const redis = new Redis(env.REDIS_URL, {
6-
maxRetriesPerRequest: null,
7-
});
5+
export let redis: Redis | null;
86

9-
redis.on("error", (err) => () => {
10-
logger({
11-
level: "error",
12-
message: `Redis error: ${err}`,
13-
service: "worker",
14-
});
15-
});
16-
redis.on("connect", () =>
17-
logger({
18-
level: "info",
19-
message: "Redis connected",
20-
service: "worker",
21-
}),
22-
);
23-
redis.on("reconnecting", () =>
24-
logger({
25-
level: "info",
26-
message: "Redis reconnecting",
27-
service: "worker",
28-
}),
29-
);
30-
redis.on("ready", () => {
31-
logger({
32-
level: "info",
33-
message: "Redis ready",
34-
service: "worker",
35-
});
36-
});
7+
if (env.REDIS_URL) {
8+
try {
9+
redis = new Redis(env.REDIS_URL, {
10+
maxRetriesPerRequest: null,
11+
});
12+
13+
redis.on("error", (err) => () => {
14+
logger({
15+
level: "error",
16+
message: `Redis error: ${err}`,
17+
service: "worker",
18+
});
19+
});
20+
redis.on("connect", () =>
21+
logger({
22+
level: "info",
23+
message: "Redis connected",
24+
service: "worker",
25+
}),
26+
);
27+
redis.on("reconnecting", () =>
28+
logger({
29+
level: "info",
30+
message: "Redis reconnecting",
31+
service: "worker",
32+
}),
33+
);
34+
redis.on("ready", () => {
35+
logger({
36+
level: "info",
37+
message: "Redis ready",
38+
service: "worker",
39+
});
40+
});
41+
} catch (e) {
42+
logger({
43+
level: "error",
44+
message: `Error initializing Redis: ${e}`,
45+
service: "worker",
46+
});
47+
}
48+
}

src/utils/webhook.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ export const sendWebhooks = async (webhooks: WebhookData[]) => {
129129
level: "debug",
130130
message: "No webhook set or active, skipping webhook send",
131131
});
132-
133132
return;
134133
}
135134

src/worker/queues/queues.ts

Lines changed: 29 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,8 @@
1-
import {
2-
ContractEventLogs,
3-
ContractTransactionReceipts,
4-
Webhooks,
5-
} from "@prisma/client";
6-
import { JobsOptions, Queue } from "bullmq";
7-
import superjson from "superjson";
8-
import { WebhooksEventTypes } from "../../schema/webhooks";
1+
import { Job, JobsOptions, Worker } from "bullmq";
2+
import { env } from "../../utils/env";
93
import { logger } from "../../utils/logger";
10-
import { redis } from "../../utils/redis/redis";
114

12-
const defaultJobOptions: JobsOptions = {
5+
export const defaultJobOptions: JobsOptions = {
136
attempts: 3,
147
// Retries after 5s, 10s, 20s.
158
backoff: { type: "exponential", delay: 5_000 },
@@ -20,54 +13,30 @@ const defaultJobOptions: JobsOptions = {
2013
},
2114
};
2215

23-
/**
24-
* Sends webhooks to configured webhook URLs.
25-
*/
26-
export type EnqueueContractSubscriptionWebhookData = {
27-
type: WebhooksEventTypes.CONTRACT_SUBSCRIPTION;
28-
webhook: Webhooks;
29-
eventLog?: ContractEventLogs;
30-
transactionReceipt?: ContractTransactionReceipts;
31-
};
32-
// TODO: Add other webhook event types here.
33-
export type EnqueueWebhookData = EnqueueContractSubscriptionWebhookData;
34-
35-
export interface WebhookJob {
36-
data: EnqueueWebhookData;
37-
webhook: Webhooks;
38-
}
39-
40-
export class WebhookQueue {
41-
private static q = new Queue<string>("webhook", {
42-
connection: redis,
43-
defaultJobOptions,
16+
export const logWorkerEvents = (worker: Worker) => {
17+
worker.on("active", (job: Job) => {
18+
logger({
19+
level: "debug",
20+
message: `[${worker.name}] Processing: ${job.id}`,
21+
service: "worker",
22+
});
4423
});
45-
46-
static add = async (data: EnqueueWebhookData) => {
47-
switch (data.type) {
48-
case WebhooksEventTypes.CONTRACT_SUBSCRIPTION:
49-
return this.enqueueContractSubscriptionWebhook(data);
50-
default:
51-
logger({
52-
service: "worker",
53-
level: "warn",
54-
message: `Unexpected webhook type: ${data.type}`,
55-
});
56-
}
57-
};
58-
59-
private static enqueueContractSubscriptionWebhook = async (
60-
data: EnqueueContractSubscriptionWebhookData,
61-
) => {
62-
const { type, webhook, eventLog, transactionReceipt } = data;
63-
if (!eventLog && !transactionReceipt) {
64-
throw 'Must provide "eventLog" or "transactionReceipt".';
65-
}
66-
67-
if (!webhook.revokedAt && type === webhook.eventType) {
68-
const job: WebhookJob = { data, webhook };
69-
const serialized = superjson.stringify(job);
70-
await this.q.add(`${data.type}:${webhook.id}`, serialized);
71-
}
72-
};
73-
}
24+
worker.on("completed", (job: Job) => {
25+
logger({
26+
level: "debug",
27+
message: `[${worker.name}] Completed: ${job.id}`,
28+
service: "worker",
29+
});
30+
});
31+
worker.on("failed", (job: Job | undefined, err: Error) => {
32+
logger({
33+
level: "error",
34+
message: `[${worker.name}] Failed: ${
35+
job?.id ?? "<no job ID>"
36+
} with error: ${err.message} ${
37+
env.NODE_ENV === "development" ? err.stack : ""
38+
}`,
39+
service: "worker",
40+
});
41+
});
42+
};

src/worker/queues/webhookQueue.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import {
2+
ContractEventLogs,
3+
ContractTransactionReceipts,
4+
Webhooks,
5+
} from "@prisma/client";
6+
import { Queue } from "bullmq";
7+
import SuperJSON from "superjson";
8+
import { WebhooksEventTypes } from "../../schema/webhooks";
9+
import { logger } from "../../utils/logger";
10+
import { redis } from "../../utils/redis/redis";
11+
import { defaultJobOptions } from "./queues";
12+
13+
// Queue
14+
const _queue = redis
15+
? new Queue<string>("webhook", {
16+
connection: redis,
17+
defaultJobOptions,
18+
})
19+
: null;
20+
21+
export type EnqueueContractSubscriptionWebhookData = {
22+
type: WebhooksEventTypes.CONTRACT_SUBSCRIPTION;
23+
webhook: Webhooks;
24+
eventLog?: ContractEventLogs;
25+
transactionReceipt?: ContractTransactionReceipts;
26+
};
27+
// TODO: Add other webhook event types here.
28+
type EnqueueWebhookData = EnqueueContractSubscriptionWebhookData;
29+
30+
export interface WebhookJob {
31+
data: EnqueueWebhookData;
32+
webhook: Webhooks;
33+
}
34+
35+
export const enqueueWebhook = async (data: EnqueueWebhookData) => {
36+
switch (data.type) {
37+
case WebhooksEventTypes.CONTRACT_SUBSCRIPTION:
38+
return enqueueContractSubscriptionWebhook(data);
39+
default:
40+
logger({
41+
service: "worker",
42+
level: "warn",
43+
message: `Unexpected webhook type: ${data.type}`,
44+
});
45+
}
46+
};
47+
48+
const enqueueContractSubscriptionWebhook = async (
49+
data: EnqueueContractSubscriptionWebhookData,
50+
) => {
51+
if (!_queue) return;
52+
53+
const { type, webhook, eventLog, transactionReceipt } = data;
54+
if (!eventLog && !transactionReceipt) {
55+
throw 'Must provide "eventLog" or "transactionReceipt".';
56+
}
57+
58+
if (!webhook.revokedAt && type === webhook.eventType) {
59+
const job: WebhookJob = { data, webhook };
60+
const serialized = SuperJSON.stringify(job);
61+
await _queue.add(`${data.type}:${webhook.id}`, serialized);
62+
}
63+
};

src/worker/queues/worker.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

src/worker/tasks/chainIndexer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { WebhooksEventTypes } from "../../schema/webhooks";
1818
import { getContract } from "../../utils/cache/getContract";
1919
import { getSdk } from "../../utils/cache/getSdk";
2020
import { logger } from "../../utils/logger";
21-
import { WebhookQueue } from "../queues/queues";
21+
import { enqueueWebhook } from "../queues/webhookQueue";
2222
import { getContractId } from "../utils/contractId";
2323

2424
export interface GetSubscribedContractsLogsParams {
@@ -383,7 +383,7 @@ export const createChainIndexerTask = async (args: {
383383
const webhooks =
384384
webhooksByContractAddress[eventLog.contractAddress] ?? [];
385385
for (const webhook of webhooks) {
386-
await WebhookQueue.add({
386+
await enqueueWebhook({
387387
type: WebhooksEventTypes.CONTRACT_SUBSCRIPTION,
388388
webhook,
389389
eventLog,
@@ -395,7 +395,7 @@ export const createChainIndexerTask = async (args: {
395395
webhooksByContractAddress[transactionReceipt.contractAddress] ??
396396
[];
397397
for (const webhook of webhooks) {
398-
await WebhookQueue.add({
398+
await enqueueWebhook({
399399
type: WebhooksEventTypes.CONTRACT_SUBSCRIPTION,
400400
webhook,
401401
transactionReceipt,

0 commit comments

Comments
 (0)