-
Notifications
You must be signed in to change notification settings - Fork 91
fix: Use latest eth_getLogs block as toBlock #512
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export const dedupeArray = <T>(array: T[]): T[] => { | ||
return [...new Set(array)]; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import { bulkInsertContractEventLogs } from "../../db/contractEventLogs/createCo | |
import { getContractSubscriptionsByChainId } from "../../db/contractSubscriptions/getContractSubscriptions"; | ||
import { bulkInsertContractTransactionReceipts } from "../../db/contractTransactionReceipts/createContractTransactionReceipts"; | ||
import { PrismaTransaction } from "../../schema/prisma"; | ||
import { dedupeArray } from "../../utils/array"; | ||
import { getContract } from "../../utils/cache/getContract"; | ||
import { getSdk } from "../../utils/cache/getSdk"; | ||
import { logger } from "../../utils/logger"; | ||
|
@@ -106,9 +107,7 @@ export const getSubscribedContractsLogs = async ( | |
const logs = await ethGetLogs(params); | ||
|
||
// cache the contracts and abi | ||
const uniqueContractAddresses = [ | ||
...new Set<string>(logs.map((log) => log.address)), | ||
]; | ||
const uniqueContractAddresses = dedupeArray(logs.map((log) => log.address)); | ||
const contracts = await Promise.all( | ||
uniqueContractAddresses.map(async (address) => { | ||
const contract = await getContract({ | ||
|
@@ -124,9 +123,7 @@ export const getSubscribedContractsLogs = async ( | |
}, {} as Record<string, SmartContract<ethers.BaseContract>>); | ||
|
||
// cache the blocks and their timestamps | ||
const uniqueBlockNumbers = [ | ||
...new Set<number>(logs.map((log) => log.blockNumber)), | ||
]; | ||
const uniqueBlockNumbers = dedupeArray(logs.map((log) => log.blockNumber)); | ||
const blockDetails = await Promise.all( | ||
uniqueBlockNumbers.map(async (blockNumber) => ({ | ||
blockNumber, | ||
|
@@ -168,7 +165,9 @@ export const getSubscribedContractsLogs = async ( | |
} | ||
} | ||
|
||
const block = blockCache[log.blockNumber]; | ||
// Block may be null if the RPC does not yet have block details. Fall back to current time. | ||
const block = blockCache[log.blockNumber] as ethers.providers.Block | null; | ||
const timestamp = block ? new Date(block.timestamp * 1000) : new Date(); | ||
|
||
// format the log entry | ||
return { | ||
|
@@ -183,7 +182,7 @@ export const getSubscribedContractsLogs = async ( | |
data: log.data, | ||
eventName: decodedEventName, | ||
decodedLog: decodedLog, | ||
timestamp: new Date(block.timestamp * 1000), // ethers timestamp is s, Date uses ms | ||
timestamp, | ||
transactionIndex: log.transactionIndex, | ||
logIndex: log.logIndex, | ||
}; | ||
|
@@ -285,67 +284,63 @@ export const createChainIndexerTask = async ( | |
} | ||
|
||
const sdk = await getSdk({ chainId }); | ||
|
||
const provider = sdk.getProvider(); | ||
const currentBlockNumber = await provider.getBlockNumber(); | ||
|
||
// check if up-to-date | ||
if (lastIndexedBlock >= currentBlockNumber) { | ||
// Get latest block with logs. This should be the maxBlock to query up to. | ||
const logs = await provider.getLogs({ fromBlock: "latest" }); | ||
if (logs.length === 0) { | ||
return; | ||
} | ||
const currentBlockNumber = logs[0].blockNumber; | ||
Comment on lines
+297
to
+302
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential fix 2: Use latest log block instead of RPC's block height |
||
|
||
// limit max block numbers | ||
let toBlockNumber = currentBlockNumber; | ||
if (currentBlockNumber - (lastIndexedBlock + 1) > maxBlocksToIndex) { | ||
toBlockNumber = lastIndexedBlock + 1 + maxBlocksToIndex; | ||
// Check if up-to-date. | ||
if (lastIndexedBlock >= currentBlockNumber) { | ||
return; | ||
} | ||
|
||
// Limit max block number. | ||
const toBlockNumber = Math.min( | ||
currentBlockNumber, | ||
lastIndexedBlock + maxBlocksToIndex, | ||
); | ||
|
||
const subscribedContracts = await getContractSubscriptionsByChainId( | ||
chainId, | ||
); | ||
const subscribedContractAddresses = [ | ||
...new Set<string>( | ||
subscribedContracts.map( | ||
(subscribedContract) => subscribedContract.contractAddress, | ||
), | ||
const subscribedContractAddresses = dedupeArray( | ||
subscribedContracts.map( | ||
(subscribedContract) => subscribedContract.contractAddress, | ||
), | ||
]; | ||
); | ||
|
||
await Promise.all([ | ||
// Checks eth_getLogs. | ||
indexContractEvents({ | ||
pgtx, | ||
chainId, | ||
fromBlockNumber: lastIndexedBlock + 1, | ||
fromBlockNumber: lastIndexedBlock, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential fix 3: Overlap the "boundary" blocks just in case there's incomplete data from RPC |
||
toBlockNumber, | ||
subscribedContractAddresses, | ||
}), | ||
// Checks eth_getBlockByNumber. | ||
indexTransactionReceipts({ | ||
pgtx, | ||
chainId, | ||
fromBlockNumber: lastIndexedBlock + 1, | ||
fromBlockNumber: lastIndexedBlock, | ||
toBlockNumber, | ||
subscribedContractAddresses, | ||
}), | ||
]); | ||
|
||
// update the block number | ||
try { | ||
await upsertChainIndexer({ | ||
pgtx, | ||
chainId, | ||
currentBlockNumber: toBlockNumber, // last indexed block | ||
}); | ||
} catch (error) { | ||
logger({ | ||
service: "worker", | ||
level: "error", | ||
message: `Failed to update latest block number - Chain Indexer: ${chainId}`, | ||
error: error, | ||
}); | ||
} | ||
// Update the last processed block number. | ||
await upsertChainIndexer({ | ||
pgtx, | ||
chainId, | ||
currentBlockNumber: toBlockNumber, | ||
}); | ||
}, | ||
{ | ||
timeout: 5 * 60000, // 3 minutes timeout | ||
timeout: 5 * 60 * 1000, | ||
}, | ||
); | ||
} catch (err: any) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential fix 1: Handle if block is not found by falling back on timestamp.