diff --git a/src/server/routes/configuration/cors/add.ts b/src/server/routes/configuration/cors/add.ts index b004e5b57..94607dbd0 100644 --- a/src/server/routes/configuration/cors/add.ts +++ b/src/server/routes/configuration/cors/add.ts @@ -2,6 +2,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; +import { dedupeArray } from "../../../../utils/array"; import { getConfig } from "../../../../utils/cache/getConfig"; import { standardResponseSchema } from "../../../schemas/sharedApiSchemas"; import { mandatoryAllowedCorsUrls } from "../../../utils/cors-urls"; @@ -54,12 +55,10 @@ export async function addUrlToCorsConfiguration(fastify: FastifyInstance) { }); await updateConfiguration({ - accessControlAllowOrigin: [ - ...new Set([ - ...urlsToAdd, - ...oldConfig.accessControlAllowOrigin.split(","), - ]), - ].join(","), + accessControlAllowOrigin: dedupeArray([ + ...urlsToAdd, + ...oldConfig.accessControlAllowOrigin.split(","), + ]).join(","), }); // Fetch and return the updated configuration diff --git a/src/server/routes/configuration/cors/remove.ts b/src/server/routes/configuration/cors/remove.ts index 662c439eb..7c0b56866 100644 --- a/src/server/routes/configuration/cors/remove.ts +++ b/src/server/routes/configuration/cors/remove.ts @@ -2,6 +2,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; +import { dedupeArray } from "../../../../utils/array"; import { getConfig } from "../../../../utils/cache/getConfig"; import { standardResponseSchema } from "../../../schemas/sharedApiSchemas"; import { mandatoryAllowedCorsUrls } from "../../../utils/cors-urls"; @@ -61,9 +62,7 @@ export async function removeUrlToCorsConfiguration(fastify: FastifyInstance) { .filter((url) => !urlsToRemove.includes(url)); await updateConfiguration({ - accessControlAllowOrigin: [...new Set([...newAllowOriginsList])].join( - ",", - ), + accessControlAllowOrigin: dedupeArray(newAllowOriginsList).join(","), }); // Fetch and return the updated configuration diff --git a/src/server/routes/configuration/cors/set.ts b/src/server/routes/configuration/cors/set.ts index a119e4ce8..1b952b05d 100644 --- a/src/server/routes/configuration/cors/set.ts +++ b/src/server/routes/configuration/cors/set.ts @@ -2,6 +2,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; +import { dedupeArray } from "../../../../utils/array"; import { getConfig } from "../../../../utils/cache/getConfig"; import { standardResponseSchema } from "../../../schemas/sharedApiSchemas"; import { mandatoryAllowedCorsUrls } from "../../../utils/cors-urls"; @@ -44,13 +45,11 @@ export async function setUrlsToCorsConfiguration(fastify: FastifyInstance) { handler: async (req, res) => { const urls = req.body.urls.map((url) => url.trim()); - // Add required domains and dedupe. - const dedupe = Array.from( - new Set([...urls, ...mandatoryAllowedCorsUrls]), - ); - await updateConfiguration({ - accessControlAllowOrigin: dedupe.join(","), + accessControlAllowOrigin: dedupeArray([ + ...urls, + ...mandatoryAllowedCorsUrls, + ]).join(","), }); // Fetch and return the updated configuration diff --git a/src/utils/array.ts b/src/utils/array.ts new file mode 100644 index 000000000..a1aaa4804 --- /dev/null +++ b/src/utils/array.ts @@ -0,0 +1,3 @@ +export const dedupeArray = (array: T[]): T[] => { + return [...new Set(array)]; +}; diff --git a/src/worker/tasks/chainIndexer.ts b/src/worker/tasks/chainIndexer.ts index 616a90e73..9d6e1b831 100644 --- a/src/worker/tasks/chainIndexer.ts +++ b/src/worker/tasks/chainIndexer.ts @@ -8,11 +8,14 @@ import { bulkInsertContractEventLogs } from "../../db/contractEventLogs/createCo import { getContractSubscriptionsByChainId } from "../../db/contractSubscriptions/getContractSubscriptions"; import { bulkInsertContractTransactionReceipts } from "../../db/contractTransactionReceipts/createContractTransactionReceipts"; import { PrismaTransaction } from "../../schema/prisma"; +import { dedupeArray } from "../../utils/array"; import { getContract } from "../../utils/cache/getContract"; import { getSdk } from "../../utils/cache/getSdk"; import { logger } from "../../utils/logger"; import { getContractId } from "../utils/contractId"; +const NUM_BLOCKS_TO_RECHECK = 2; + export interface GetSubscribedContractsLogsParams { chainId: number; contractAddresses: string[]; @@ -106,9 +109,7 @@ export const getSubscribedContractsLogs = async ( const logs = await ethGetLogs(params); // cache the contracts and abi - const uniqueContractAddresses = [ - ...new Set(logs.map((log) => log.address)), - ]; + const uniqueContractAddresses = dedupeArray(logs.map((log) => log.address)); const contracts = await Promise.all( uniqueContractAddresses.map(async (address) => { const contract = await getContract({ @@ -124,9 +125,7 @@ export const getSubscribedContractsLogs = async ( }, {} as Record>); // cache the blocks and their timestamps - const uniqueBlockNumbers = [ - ...new Set(logs.map((log) => log.blockNumber)), - ]; + const uniqueBlockNumbers = dedupeArray(logs.map((log) => log.blockNumber)); const blockDetails = await Promise.all( uniqueBlockNumbers.map(async (blockNumber) => ({ blockNumber, @@ -168,7 +167,9 @@ export const getSubscribedContractsLogs = async ( } } - const block = blockCache[log.blockNumber]; + // Block may be null if the RPC does not yet have block details. Fall back to current time. + const block = blockCache[log.blockNumber] as ethers.providers.Block | null; + const timestamp = block ? new Date(block.timestamp * 1000) : new Date(); // format the log entry return { @@ -183,7 +184,7 @@ export const getSubscribedContractsLogs = async ( data: log.data, eventName: decodedEventName, decodedLog: decodedLog, - timestamp: new Date(block.timestamp * 1000), // ethers timestamp is s, Date uses ms + timestamp, transactionIndex: log.transactionIndex, logIndex: log.logIndex, }; @@ -279,73 +280,75 @@ export const createChainIndexerTask = async ( let lastIndexedBlock; try { lastIndexedBlock = await getBlockForIndexing({ chainId, pgtx }); + + // RPC may provide incomplete data for new blocks from the previous scan. + // Re-check the last few blocks to capture any missing onchain data. + if (lastIndexedBlock > 0) { + lastIndexedBlock -= NUM_BLOCKS_TO_RECHECK; + } } catch (error) { // row is locked, return return; } const sdk = await getSdk({ chainId }); - const provider = sdk.getProvider(); - const currentBlockNumber = await provider.getBlockNumber(); - // check if up-to-date - if (lastIndexedBlock >= currentBlockNumber) { + // Get latest block with logs. This should be the maxBlock to query up to. + const logs = await provider.getLogs({ fromBlock: "latest" }); + if (logs.length === 0) { return; } + const currentBlockNumber = logs[0].blockNumber; - // limit max block numbers - let toBlockNumber = currentBlockNumber; - if (currentBlockNumber - (lastIndexedBlock + 1) > maxBlocksToIndex) { - toBlockNumber = lastIndexedBlock + 1 + maxBlocksToIndex; + // Check if up-to-date. + if (lastIndexedBlock >= currentBlockNumber) { + return; } + // Limit max block number. + const toBlockNumber = Math.min( + currentBlockNumber, + lastIndexedBlock + maxBlocksToIndex, + ); + const subscribedContracts = await getContractSubscriptionsByChainId( chainId, ); - const subscribedContractAddresses = [ - ...new Set( - subscribedContracts.map( - (subscribedContract) => subscribedContract.contractAddress, - ), + const subscribedContractAddresses = dedupeArray( + subscribedContracts.map( + (subscribedContract) => subscribedContract.contractAddress, ), - ]; + ); await Promise.all([ + // Checks eth_getLogs. indexContractEvents({ pgtx, chainId, - fromBlockNumber: lastIndexedBlock + 1, + fromBlockNumber: lastIndexedBlock, toBlockNumber, subscribedContractAddresses, }), + // Checks eth_getBlockByNumber. indexTransactionReceipts({ pgtx, chainId, - fromBlockNumber: lastIndexedBlock + 1, + fromBlockNumber: lastIndexedBlock, toBlockNumber, subscribedContractAddresses, }), ]); - // update the block number - try { - await upsertChainIndexer({ - pgtx, - chainId, - currentBlockNumber: toBlockNumber, // last indexed block - }); - } catch (error) { - logger({ - service: "worker", - level: "error", - message: `Failed to update latest block number - Chain Indexer: ${chainId}`, - error: error, - }); - } + // Update the last processed block number. + await upsertChainIndexer({ + pgtx, + chainId, + currentBlockNumber: toBlockNumber, + }); }, { - timeout: 5 * 60000, // 3 minutes timeout + timeout: 5 * 60 * 1000, }, ); } catch (err: any) {