@@ -8,6 +8,7 @@ import { bulkInsertContractEventLogs } from "../../db/contractEventLogs/createCo
8
8
import { getContractSubscriptionsByChainId } from "../../db/contractSubscriptions/getContractSubscriptions" ;
9
9
import { bulkInsertContractTransactionReceipts } from "../../db/contractTransactionReceipts/createContractTransactionReceipts" ;
10
10
import { PrismaTransaction } from "../../schema/prisma" ;
11
+ import { dedupeArray } from "../../utils/array" ;
11
12
import { getContract } from "../../utils/cache/getContract" ;
12
13
import { getSdk } from "../../utils/cache/getSdk" ;
13
14
import { logger } from "../../utils/logger" ;
@@ -106,9 +107,7 @@ export const getSubscribedContractsLogs = async (
106
107
const logs = await ethGetLogs ( params ) ;
107
108
108
109
// cache the contracts and abi
109
- const uniqueContractAddresses = [
110
- ...new Set < string > ( logs . map ( ( log ) => log . address ) ) ,
111
- ] ;
110
+ const uniqueContractAddresses = dedupeArray ( logs . map ( ( log ) => log . address ) ) ;
112
111
const contracts = await Promise . all (
113
112
uniqueContractAddresses . map ( async ( address ) => {
114
113
const contract = await getContract ( {
@@ -124,9 +123,7 @@ export const getSubscribedContractsLogs = async (
124
123
} , { } as Record < string , SmartContract < ethers . BaseContract > > ) ;
125
124
126
125
// cache the blocks and their timestamps
127
- const uniqueBlockNumbers = [
128
- ...new Set < number > ( logs . map ( ( log ) => log . blockNumber ) ) ,
129
- ] ;
126
+ const uniqueBlockNumbers = dedupeArray ( logs . map ( ( log ) => log . blockNumber ) ) ;
130
127
const blockDetails = await Promise . all (
131
128
uniqueBlockNumbers . map ( async ( blockNumber ) => ( {
132
129
blockNumber,
@@ -168,7 +165,9 @@ export const getSubscribedContractsLogs = async (
168
165
}
169
166
}
170
167
171
- const block = blockCache [ log . blockNumber ] ;
168
+ // Block may be null if the RPC does not yet have block details. Fall back to current time.
169
+ const block = blockCache [ log . blockNumber ] as ethers . providers . Block | null ;
170
+ const timestamp = block ? new Date ( block . timestamp * 1000 ) : new Date ( ) ;
172
171
173
172
// format the log entry
174
173
return {
@@ -183,7 +182,7 @@ export const getSubscribedContractsLogs = async (
183
182
data : log . data ,
184
183
eventName : decodedEventName ,
185
184
decodedLog : decodedLog ,
186
- timestamp : new Date ( block . timestamp * 1000 ) , // ethers timestamp is s, Date uses ms
185
+ timestamp,
187
186
transactionIndex : log . transactionIndex ,
188
187
logIndex : log . logIndex ,
189
188
} ;
@@ -285,67 +284,63 @@ export const createChainIndexerTask = async (
285
284
}
286
285
287
286
const sdk = await getSdk ( { chainId } ) ;
288
-
289
287
const provider = sdk . getProvider ( ) ;
290
- const currentBlockNumber = await provider . getBlockNumber ( ) ;
291
288
292
- // check if up-to-date
293
- if ( lastIndexedBlock >= currentBlockNumber ) {
289
+ // Get latest block with logs. This should be the maxBlock to query up to.
290
+ const logs = await provider . getLogs ( { fromBlock : "latest" } ) ;
291
+ if ( logs . length === 0 ) {
294
292
return ;
295
293
}
294
+ const currentBlockNumber = logs [ 0 ] . blockNumber ;
296
295
297
- // limit max block numbers
298
- let toBlockNumber = currentBlockNumber ;
299
- if ( currentBlockNumber - ( lastIndexedBlock + 1 ) > maxBlocksToIndex ) {
300
- toBlockNumber = lastIndexedBlock + 1 + maxBlocksToIndex ;
296
+ // Check if up-to-date.
297
+ if ( lastIndexedBlock >= currentBlockNumber ) {
298
+ return ;
301
299
}
302
300
301
+ // Limit max block number.
302
+ const toBlockNumber = Math . min (
303
+ currentBlockNumber ,
304
+ lastIndexedBlock + maxBlocksToIndex ,
305
+ ) ;
306
+
303
307
const subscribedContracts = await getContractSubscriptionsByChainId (
304
308
chainId ,
305
309
) ;
306
- const subscribedContractAddresses = [
307
- ...new Set < string > (
308
- subscribedContracts . map (
309
- ( subscribedContract ) => subscribedContract . contractAddress ,
310
- ) ,
310
+ const subscribedContractAddresses = dedupeArray (
311
+ subscribedContracts . map (
312
+ ( subscribedContract ) => subscribedContract . contractAddress ,
311
313
) ,
312
- ] ;
314
+ ) ;
313
315
314
316
await Promise . all ( [
317
+ // Checks eth_getLogs.
315
318
indexContractEvents ( {
316
319
pgtx,
317
320
chainId,
318
- fromBlockNumber : lastIndexedBlock + 1 ,
321
+ fromBlockNumber : lastIndexedBlock ,
319
322
toBlockNumber,
320
323
subscribedContractAddresses,
321
324
} ) ,
325
+ // Checks eth_getBlockByNumber.
322
326
indexTransactionReceipts ( {
323
327
pgtx,
324
328
chainId,
325
- fromBlockNumber : lastIndexedBlock + 1 ,
329
+ fromBlockNumber : lastIndexedBlock ,
326
330
toBlockNumber,
327
331
subscribedContractAddresses,
328
332
} ) ,
329
333
] ) ;
330
334
331
- // update the block number
332
- try {
333
- await upsertChainIndexer ( {
334
- pgtx,
335
- chainId,
336
- currentBlockNumber : toBlockNumber , // last indexed block
337
- } ) ;
338
- } catch ( error ) {
339
- logger ( {
340
- service : "worker" ,
341
- level : "error" ,
342
- message : `Failed to update latest block number - Chain Indexer: ${ chainId } ` ,
343
- error : error ,
344
- } ) ;
345
- }
335
+ // Update the last processed block number.
336
+ await upsertChainIndexer ( {
337
+ pgtx,
338
+ chainId,
339
+ currentBlockNumber : toBlockNumber ,
340
+ } ) ;
346
341
} ,
347
342
{
348
- timeout : 5 * 60000 , // 3 minutes timeout
343
+ timeout : 5 * 60 * 1000 ,
349
344
} ,
350
345
) ;
351
346
} catch ( err : any ) {
0 commit comments