Skip to content

Commit 79a119c

Browse files
committed
feat: Add webhooks to contract subscriptions Redis workers
1 parent f22d72d commit 79a119c

30 files changed

+696
-310
lines changed

package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"@fastify/type-provider-typebox": "^3.2.0",
3737
"@fastify/websocket": "^8.2.0",
3838
"@google-cloud/kms": "^4.0.0",
39-
"@prisma/client": "5.2.0",
39+
"@prisma/client": "^5.14.0-dev.65",
4040
"@sinclair/typebox": "^0.31.28",
4141
"@t3-oss/env-core": "^0.6.0",
4242
"@thirdweb-dev/auth": "^4.1.55",
@@ -47,6 +47,7 @@
4747
"@types/base-64": "^1.0.2",
4848
"base-64": "^1.0.0",
4949
"body-parser": "^1.20.2",
50+
"bullmq": "^5.7.8",
5051
"cookie": "^0.5.0",
5152
"cookie-parser": "^1.4.6",
5253
"copyfiles": "^2.4.1",
@@ -59,14 +60,16 @@
5960
"fastify": "^4.15.0",
6061
"fastify-plugin": "^4.5.0",
6162
"http-status-codes": "^2.2.0",
63+
"ioredis": "^5.4.1",
6264
"jsonwebtoken": "^9.0.2",
6365
"knex": "^3.1.0",
6466
"mnemonist": "^0.39.8",
6567
"node-cron": "^3.0.2",
6668
"pg": "^8.11.3",
6769
"pino": "^8.15.1",
6870
"pino-pretty": "^10.0.0",
69-
"prisma": "^5.2.0",
71+
"prisma": "5.14.0-dev.61",
72+
"superjson": "^2.2.1",
7073
"thirdweb": "^5.1.0",
7174
"uuid": "^9.0.1",
7275
"viem": "^1.14.0",

src/db/contractEventLogs/createContractEventLogs.ts

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,18 @@
1+
import { ContractEventLogs, Prisma } from "@prisma/client";
12
import { PrismaTransaction } from "../../schema/prisma";
23
import { getPrismaWithPostgresTx } from "../client";
34

4-
interface ContractEventLogEntry {
5-
chainId: number;
6-
blockNumber: number;
7-
contractAddress: string;
8-
transactionHash: string;
9-
topic0?: string;
10-
topic1?: string;
11-
topic2?: string;
12-
topic3?: string;
13-
data: string;
14-
decodedLog?: any; // Assuming JSON object for decodedLog
15-
eventName?: string;
16-
timestamp: Date;
17-
transactionIndex: number;
18-
logIndex: number;
19-
}
20-
215
export interface BulkInsertContractLogsParams {
226
pgtx?: PrismaTransaction;
23-
logs: ContractEventLogEntry[];
7+
logs: Prisma.ContractEventLogsCreateInput[];
248
}
259

2610
export const bulkInsertContractEventLogs = async ({
2711
pgtx,
2812
logs,
29-
}: BulkInsertContractLogsParams) => {
13+
}: BulkInsertContractLogsParams): Promise<ContractEventLogs[]> => {
3014
const prisma = getPrismaWithPostgresTx(pgtx);
31-
return await prisma.contractEventLogs.createMany({
15+
return await prisma.contractEventLogs.createManyAndReturn({
3216
data: logs,
3317
skipDuplicates: true,
3418
});

src/db/contractSubscriptions/getContractSubscriptions.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,18 @@ export const isContractSubscribed = async ({
2121
else return true;
2222
};
2323

24-
export const getContractSubscriptionsByChainId = async (chainId: number) => {
24+
export const getContractSubscriptionsByChainId = async (
25+
chainId: number,
26+
includeWebhook = false,
27+
) => {
2528
return await prisma.contractSubscriptions.findMany({
2629
where: {
2730
chainId,
2831
deletedAt: null,
2932
},
33+
include: {
34+
webhook: includeWebhook,
35+
},
3036
});
3137
};
3238

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,19 @@
1+
import { ContractTransactionReceipts, Prisma } from "@prisma/client";
12
import { PrismaTransaction } from "../../schema/prisma";
23
import { getPrismaWithPostgresTx } from "../client";
34

4-
interface ContractTransactionReceiptEntry {
5-
chainId: number;
6-
blockNumber: number;
7-
contractId: string;
8-
contractAddress: string;
9-
transactionHash: string;
10-
blockHash: string;
11-
timestamp: Date;
12-
to: string;
13-
from: string;
14-
transactionIndex: number;
15-
value: string;
16-
data: string;
17-
gasUsed: string;
18-
effectiveGasPrice: string;
19-
status: number;
20-
}
21-
225
export interface BulkInsertContractLogsParams {
236
pgtx?: PrismaTransaction;
24-
txReceipts: ContractTransactionReceiptEntry[];
7+
receipts: Prisma.ContractTransactionReceiptsCreateInput[];
258
}
269

2710
export const bulkInsertContractTransactionReceipts = async ({
2811
pgtx,
29-
txReceipts,
30-
}: BulkInsertContractLogsParams) => {
12+
receipts,
13+
}: BulkInsertContractLogsParams): Promise<ContractTransactionReceipts[]> => {
3114
const prisma = getPrismaWithPostgresTx(pgtx);
32-
return await prisma.contractTransactionReceipts.createMany({
33-
data: txReceipts,
15+
return await prisma.contractTransactionReceipts.createManyAndReturn({
16+
data: receipts,
3417
skipDuplicates: true,
3518
});
3619
};

src/db/webhooks/createWebhook.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Webhooks } from "@prisma/client";
12
import { createHash, randomBytes } from "crypto";
23
import { WebhooksEventTypes } from "../../schema/webhooks";
34
import { prisma } from "../client";
@@ -12,7 +13,7 @@ export const insertWebhook = async ({
1213
url,
1314
name,
1415
eventType,
15-
}: CreateWebhooksParams) => {
16+
}: CreateWebhooksParams): Promise<Webhooks> => {
1617
// generate random bytes
1718
const bytes = randomBytes(4096);
1819
// hash the bytes to create the secret (this will not be stored by itself)

src/db/webhooks/getAllWebhooks.ts

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,10 @@
11
import { Webhooks } from "@prisma/client";
2-
import { SanitizedWebHooksSchema } from "../../schema/webhooks";
32
import { prisma } from "../client";
43

5-
export const getAllWebhooks = async (): Promise<SanitizedWebHooksSchema[]> => {
6-
let webhooks = await prisma.webhooks.findMany({
4+
export const getAllWebhooks = async (): Promise<Webhooks[]> => {
5+
return await prisma.webhooks.findMany({
76
orderBy: {
87
id: "asc",
98
},
109
});
11-
12-
return sanitizeData(webhooks);
13-
};
14-
15-
const sanitizeData = (data: Webhooks[]): SanitizedWebHooksSchema[] => {
16-
return data.map((webhook) => {
17-
return {
18-
url: webhook.url,
19-
name: webhook.name,
20-
eventType: webhook.eventType,
21-
secret: webhook.secret ? webhook.secret : undefined,
22-
createdAt: webhook.createdAt.toISOString(),
23-
active: webhook.revokedAt ? false : true,
24-
id: webhook.id,
25-
};
26-
});
2710
};
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE "contract_subscriptions" ADD COLUMN "webhookId" INTEGER;
3+
4+
-- AddForeignKey
5+
ALTER TABLE "contract_subscriptions" ADD CONSTRAINT "contract_subscriptions_webhookId_fkey" FOREIGN KEY ("webhookId") REFERENCES "webhooks"("id") ON DELETE CASCADE ON UPDATE CASCADE;

src/prisma/schema.prisma

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,15 @@ model Transactions {
162162
}
163163

164164
model Webhooks {
165-
id Int @id @default(autoincrement()) @map("id")
166-
name String? @map("name")
167-
url String @map("url")
168-
secret String @map("secret")
169-
eventType String @map("evenType")
170-
createdAt DateTime @default(now()) @map("createdAt")
171-
updatedAt DateTime @updatedAt @map("updatedAt")
172-
revokedAt DateTime? @map("revokedAt")
165+
id Int @id @default(autoincrement()) @map("id")
166+
name String? @map("name")
167+
url String @map("url")
168+
secret String @map("secret")
169+
eventType String @map("evenType")
170+
createdAt DateTime @default(now()) @map("createdAt")
171+
updatedAt DateTime @updatedAt @map("updatedAt")
172+
revokedAt DateTime? @map("revokedAt")
173+
ContractSubscriptions ContractSubscriptions[]
173174
174175
@@map("webhooks")
175176
}
@@ -189,11 +190,14 @@ model ContractSubscriptions {
189190
id String @id @default(uuid()) @map("id")
190191
chainId Int
191192
contractAddress String
193+
webhookId Int?
192194
193195
createdAt DateTime @default(now())
194196
updatedAt DateTime @updatedAt
195197
deletedAt DateTime?
196198
199+
webhook Webhooks? @relation(fields: [webhookId], references: [id], onDelete: Cascade)
200+
197201
// optimize distinct lookups
198202
@@index([chainId])
199203
@@map("contract_subscriptions")

src/schema/webhooks.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,7 @@ export enum WebhooksEventTypes {
77
ALL_TX = "all_transactions",
88
BACKEND_WALLET_BALANCE = "backend_wallet_balance",
99
AUTH = "auth",
10-
}
11-
12-
export interface SanitizedWebHooksSchema {
13-
url: string;
14-
name: string | null;
15-
eventType: string;
16-
secret?: string;
17-
createdAt: string;
18-
active: boolean;
19-
id: number;
10+
CONTRACT_SUBSCRIPTION = "contract_subscription",
2011
}
2112

2213
export interface WalletBalanceWebhookSchema {

src/server/middleware/auth.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -400,17 +400,18 @@ const handleAuthWebhooks = async (
400400
const authWebhooks = await getWebhook(WebhooksEventTypes.AUTH);
401401
if (authWebhooks.length > 0) {
402402
const authResponses = await Promise.all(
403-
authWebhooks.map((webhook) =>
404-
sendWebhookRequest(webhook, {
403+
authWebhooks.map(async (webhook) => {
404+
const { ok } = await sendWebhookRequest(webhook, {
405405
url: req.url,
406406
method: req.method,
407407
headers: req.headers,
408408
params: req.params,
409409
query: req.query,
410410
cookies: req.cookies,
411411
body: req.body,
412-
}),
413-
),
412+
});
413+
return ok;
414+
}),
414415
);
415416

416417
if (authResponses.every((ok) => !!ok)) {

src/server/routes/contract/events/getEventLogsByTimestamp.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@ import { Static, Type } from "@sinclair/typebox";
22
import { FastifyInstance } from "fastify";
33
import { StatusCodes } from "http-status-codes";
44
import { getEventLogsByBlockTimestamp } from "../../../../db/contractEventLogs/getContractEventLogs";
5-
import {
6-
eventLogsSchema,
7-
standardResponseSchema,
8-
} from "../../../schemas/sharedApiSchemas";
5+
import { eventLogSchema } from "../../../schemas/eventLog";
6+
import { standardResponseSchema } from "../../../schemas/sharedApiSchemas";
97

108
const requestQuerySchema = Type.Object({
119
contractAddresses: Type.Optional(Type.Array(Type.String())),
@@ -16,7 +14,7 @@ const requestQuerySchema = Type.Object({
1614

1715
const responseSchema = Type.Object({
1816
result: Type.Object({
19-
logs: eventLogsSchema,
17+
logs: Type.Array(eventLogSchema),
2018
status: Type.String(),
2119
}),
2220
});

src/server/routes/contract/events/paginateEventLogs.ts

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ import { FastifyInstance } from "fastify";
33
import { StatusCodes } from "http-status-codes";
44
import { getConfiguration } from "../../../../db/configuration/getConfiguration";
55
import { getEventLogsByCursor } from "../../../../db/contractEventLogs/getContractEventLogs";
6-
import {
7-
eventLogsSchema,
8-
standardResponseSchema,
9-
} from "../../../schemas/sharedApiSchemas";
6+
import { eventLogSchema, toEventLogSchema } from "../../../schemas/eventLog";
7+
import { standardResponseSchema } from "../../../schemas/sharedApiSchemas";
108

119
const requestQuerySchema = Type.Object({
1210
cursor: Type.Optional(Type.String()),
@@ -18,7 +16,7 @@ const requestQuerySchema = Type.Object({
1816
const responseSchema = Type.Object({
1917
result: Type.Object({
2018
cursor: Type.Optional(Type.String()),
21-
logs: eventLogsSchema,
19+
logs: Type.Array(eventLogSchema),
2220
status: Type.String(),
2321
}),
2422
});
@@ -89,42 +87,18 @@ export async function pageEventLogs(fastify: FastifyInstance) {
8987
Date.now() - config.cursorDelaySeconds * 1000,
9088
);
9189

92-
const { cursor: newCursor, logs: resultLogs } =
93-
await getEventLogsByCursor({
94-
cursor,
95-
limit: pageSize,
96-
topics,
97-
contractAddresses: standardizedContractAddresses,
98-
maxCreatedAt,
99-
});
100-
101-
const logs = resultLogs.map((log) => {
102-
const topics: string[] = [];
103-
[log.topic0, log.topic1, log.topic2, log.topic3].forEach((val) => {
104-
if (val) {
105-
topics.push(val);
106-
}
107-
});
108-
109-
return {
110-
chainId: log.chainId,
111-
contractAddress: log.contractAddress,
112-
blockNumber: log.blockNumber,
113-
transactionHash: log.transactionHash,
114-
topics,
115-
data: log.data,
116-
eventName: log.eventName ?? undefined,
117-
decodedLog: log.decodedLog,
118-
timestamp: log.timestamp.getTime(),
119-
transactionIndex: log.transactionIndex,
120-
logIndex: log.logIndex,
121-
};
90+
const { cursor: newCursor, logs } = await getEventLogsByCursor({
91+
cursor,
92+
limit: pageSize,
93+
topics,
94+
contractAddresses: standardizedContractAddresses,
95+
maxCreatedAt,
12296
});
12397

12498
reply.status(StatusCodes.OK).send({
12599
result: {
126100
cursor: newCursor,
127-
logs,
101+
logs: logs.map(toEventLogSchema),
128102
status: "success",
129103
},
130104
});

src/server/routes/contract/transactions/getTransactionReceipts.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import { createCustomError } from "../../../middleware/error";
77
import {
88
contractParamSchema,
99
standardResponseSchema,
10-
transactionReceiptsSchema,
1110
} from "../../../schemas/sharedApiSchemas";
11+
import { transactionReceiptSchema } from "../../../schemas/transactionReceipt";
1212
import { getChainIdFromChain } from "../../../utils/chain";
1313

1414
const requestQuerySchema = Type.Object({
@@ -18,7 +18,7 @@ const requestQuerySchema = Type.Object({
1818

1919
const responseSchema = Type.Object({
2020
result: Type.Object({
21-
receipts: transactionReceiptsSchema,
21+
receipts: Type.Array(transactionReceiptSchema),
2222
status: Type.String(),
2323
}),
2424
});

0 commit comments

Comments
 (0)