From 769f6f8fa68b061338cb2459ae06ae92a717bd04 Mon Sep 17 00:00:00 2001 From: Phillip Ho Date: Wed, 1 May 2024 19:12:36 -0700 Subject: [PATCH 1/2] feat: Add subscription delay envvar --- src/utils/env.ts | 6 ++++++ src/worker/indexers/chainIndexerRegistry.ts | 15 ++++++++++++++- src/worker/tasks/chainIndexer.ts | 14 +++++++++----- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/utils/env.ts b/src/utils/env.ts index 600cc997b..036f9fa29 100644 --- a/src/utils/env.ts +++ b/src/utils/env.ts @@ -67,6 +67,10 @@ export const env = createEnv({ SDK_BATCH_TIME_LIMIT: z.coerce.number().default(0), SDK_BATCH_SIZE_LIMIT: z.coerce.number().default(100), ENABLE_KEYPAIR_AUTH: boolSchema("false"), + CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS: z.coerce + .number() + .nonnegative() + .default(0), }, clientPrefix: "NEVER_USED", client: {}, @@ -88,6 +92,8 @@ export const env = createEnv({ SDK_BATCH_TIME_LIMIT: process.env.SDK_BATCH_TIME_LIMIT, SDK_BATCH_SIZE_LIMIT: process.env.SDK_BATCH_SIZE_LIMIT, ENABLE_KEYPAIR_AUTH: process.env.ENABLE_KEYPAIR_AUTH, + CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS: + process.env.CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS, }, onValidationError: (error: ZodError) => { console.error( diff --git a/src/worker/indexers/chainIndexerRegistry.ts b/src/worker/indexers/chainIndexerRegistry.ts index 22bb49a4b..334483c4f 100644 --- a/src/worker/indexers/chainIndexerRegistry.ts +++ b/src/worker/indexers/chainIndexerRegistry.ts @@ -1,5 +1,6 @@ import cron from "node-cron"; import { getConfig } from "../../utils/cache/getConfig"; +import { env } from "../../utils/env"; import { getBlockTimeSeconds } from "../../utils/indexer/getBlockTime"; import { logger } from "../../utils/logger"; import { createChainIndexerTask } from "../tasks/chainIndexer"; @@ -18,6 +19,8 @@ export const addChainIndexer = async (chainId: number) => { let processStarted = false; const config = await getConfig(); + + // Estimate block time. const blockTimeSeconds = await getBlockTimeSeconds(chainId); const blocksIn5Seconds = Math.round((1 / blockTimeSeconds) * 5); @@ -26,7 +29,17 @@ export const addChainIndexer = async (chainId: number) => { blocksIn5Seconds * 4, ); - const handler = await createChainIndexerTask(chainId, maxBlocksToIndex); + // Compute block offset based on delay. + // Example: 10s delay with a 3s block time = 4 blocks offset + const toBlockOffset = env.CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS + ? Math.ceil(env.CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS / blockTimeSeconds) + : 0; + + const handler = await createChainIndexerTask({ + chainId, + maxBlocksToIndex, + toBlockOffset, + }); const cronSchedule = createScheduleSeconds( Math.max(Math.round(blockTimeSeconds), 1), diff --git a/src/worker/tasks/chainIndexer.ts b/src/worker/tasks/chainIndexer.ts index 616a90e73..59d320834 100644 --- a/src/worker/tasks/chainIndexer.ts +++ b/src/worker/tasks/chainIndexer.ts @@ -268,10 +268,13 @@ const indexTransactionReceipts = async ({ } }; -export const createChainIndexerTask = async ( - chainId: number, - maxBlocksToIndex: number, -) => { +export const createChainIndexerTask = async (args: { + chainId: number; + maxBlocksToIndex: number; + toBlockOffset: number; +}) => { + const { chainId, maxBlocksToIndex, toBlockOffset } = args; + const chainIndexerTask = async () => { try { await prisma.$transaction( @@ -287,7 +290,8 @@ export const createChainIndexerTask = async ( const sdk = await getSdk({ chainId }); const provider = sdk.getProvider(); - const currentBlockNumber = await provider.getBlockNumber(); + const currentBlockNumber = + (await provider.getBlockNumber()) - toBlockOffset; // check if up-to-date if (lastIndexedBlock >= currentBlockNumber) { From cf3123e74b3a507eedf414138e326e28603cdc7a Mon Sep 17 00:00:00 2001 From: farhanW3 Date: Wed, 1 May 2024 19:30:25 -0700 Subject: [PATCH 2/2] removed +1 of blockNumber --- src/worker/tasks/chainIndexer.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/worker/tasks/chainIndexer.ts b/src/worker/tasks/chainIndexer.ts index 59d320834..850b1ceb7 100644 --- a/src/worker/tasks/chainIndexer.ts +++ b/src/worker/tasks/chainIndexer.ts @@ -300,8 +300,8 @@ export const createChainIndexerTask = async (args: { // limit max block numbers let toBlockNumber = currentBlockNumber; - if (currentBlockNumber - (lastIndexedBlock + 1) > maxBlocksToIndex) { - toBlockNumber = lastIndexedBlock + 1 + maxBlocksToIndex; + if (currentBlockNumber - lastIndexedBlock > maxBlocksToIndex) { + toBlockNumber = lastIndexedBlock + maxBlocksToIndex; } const subscribedContracts = await getContractSubscriptionsByChainId( @@ -319,14 +319,14 @@ export const createChainIndexerTask = async (args: { indexContractEvents({ pgtx, chainId, - fromBlockNumber: lastIndexedBlock + 1, + fromBlockNumber: lastIndexedBlock, toBlockNumber, subscribedContractAddresses, }), indexTransactionReceipts({ pgtx, chainId, - fromBlockNumber: lastIndexedBlock + 1, + fromBlockNumber: lastIndexedBlock, toBlockNumber, subscribedContractAddresses, }),