diff --git a/package.json b/package.json index f382862e3..af53876f8 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "aws-kms-signer": "^0.5.3", "base-64": "^1.0.0", "bullmq": "^5.11.0", + "cron": "^4.3.0", "cron-parser": "^4.9.0", "crypto": "^1.0.1", "crypto-js": "^4.2.0", @@ -63,7 +64,6 @@ "jsonwebtoken": "^9.0.2", "knex": "^3.1.0", "mnemonist": "^0.39.8", - "node-cron": "^3.0.2", "ox": "0.6.9", "pg": "^8.11.3", "prisma": "^5.14.0", @@ -82,7 +82,6 @@ "@types/crypto-js": "^4.2.2", "@types/jsonwebtoken": "^9.0.6", "@types/node": "^18.15.4", - "@types/node-cron": "^3.0.8", "@types/pg": "^8.6.6", "@types/uuid": "^9.0.1", "@types/ws": "^8.5.5", diff --git a/src/shared/utils/cron/clear-cache-cron.ts b/src/shared/utils/cron/clear-cache-cron.ts index 6fed64673..61c3d3bf3 100644 --- a/src/shared/utils/cron/clear-cache-cron.ts +++ b/src/shared/utils/cron/clear-cache-cron.ts @@ -1,9 +1,10 @@ -import cron from "node-cron"; +import { CronJob } from "cron"; import { clearCache } from "../cache/clear-cache"; import { getConfig } from "../cache/get-config"; import type { env } from "../env"; -let task: cron.ScheduledTask; +let task: CronJob; + export const clearCacheCron = async ( service: (typeof env)["LOG_SERVICES"][0], ) => { @@ -13,11 +14,13 @@ export const clearCacheCron = async ( return; } + // Stop the existing task if it exists. if (task) { task.stop(); } - task = cron.schedule(config.clearCacheCronSchedule, async () => { + task = new CronJob(config.clearCacheCronSchedule, async () => { await clearCache(service); }); + task.start(); }; diff --git a/src/shared/utils/env.ts b/src/shared/utils/env.ts index 7582ac767..a81a7c9a5 100644 --- a/src/shared/utils/env.ts +++ b/src/shared/utils/env.ts @@ -86,8 +86,8 @@ export const env = createEnv({ QUEUE_FAIL_HISTORY_COUNT: z.coerce.number().default(10_000), // Sets the number of recent nonces to map to queue IDs. NONCE_MAP_COUNT: z.coerce.number().default(10_000), - // Sets the estimated number of blocks to query per contract subscription job. Defaults to 1 block (real-time). - CONTRACT_SUBSCRIPTION_BLOCK_RANGE: z.coerce.number().default(1), + // Overrides the cron schedule for contract subscription jobs. + CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE: z.string().optional(), ENABLE_KEYPAIR_AUTH: boolEnvSchema(false), ENABLE_CUSTOM_HMAC_AUTH: boolEnvSchema(false), @@ -138,8 +138,8 @@ export const env = createEnv({ QUEUE_COMPLETE_HISTORY_COUNT: process.env.QUEUE_COMPLETE_HISTORY_COUNT, QUEUE_FAIL_HISTORY_COUNT: process.env.QUEUE_FAIL_HISTORY_COUNT, NONCE_MAP_COUNT: process.env.NONCE_MAP_COUNT, - CONTRACT_SUBSCRIPTION_BLOCK_RANGE: - process.env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE, + CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE: + process.env.CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE, EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS: process.env.EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS, EXPERIMENTAL__MAX_GAS_PRICE_WEI: diff --git a/src/worker/indexers/chain-indexer-registry.ts b/src/worker/indexers/chain-indexer-registry.ts index bfd596b35..dae0cdf1e 100644 --- a/src/worker/indexers/chain-indexer-registry.ts +++ b/src/worker/indexers/chain-indexer-registry.ts @@ -1,34 +1,34 @@ -import cron from "node-cron"; import { getBlockTimeSeconds } from "../../shared/utils/indexer/get-block-time"; import { logger } from "../../shared/utils/logger"; import { handleContractSubscriptions } from "../tasks/chain-indexer"; import { env } from "../../shared/utils/env"; +import { CronJob } from "cron"; // @TODO: Move all worker logic to Bullmq to better handle multiple hosts. -export const INDEXER_REGISTRY = {} as Record; +export const INDEXER_REGISTRY: Record = {}; export const addChainIndexer = async (chainId: number) => { if (INDEXER_REGISTRY[chainId]) { return; } - // Estimate the block time in the last 100 blocks. Default to 2 second block times. - let blockTimeSeconds: number; - try { - blockTimeSeconds = await getBlockTimeSeconds(chainId, 100); - } catch (error) { - logger({ - service: "worker", - level: "error", - message: `Could not estimate block time for chain ${chainId}`, - error, - }); - blockTimeSeconds = 2; + let cronSchedule = env.CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE; + if (!cronSchedule) { + // Estimate the block time in the last 100 blocks. Default to 2 second block times. + let blockTimeSeconds: number; + try { + blockTimeSeconds = await getBlockTimeSeconds(chainId, 100); + } catch (error) { + logger({ + service: "worker", + level: "error", + message: `Could not estimate block time for chain ${chainId}`, + error, + }); + blockTimeSeconds = 2; + } + cronSchedule = createScheduleSeconds(blockTimeSeconds); } - const cronSchedule = createScheduleSeconds({ - blockTimeSeconds, - numBlocks: env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE, - }); logger({ service: "worker", level: "info", @@ -37,7 +37,7 @@ export const addChainIndexer = async (chainId: number) => { let inProgress = false; - const task = cron.schedule(cronSchedule, async () => { + const task = new CronJob(cronSchedule, async () => { if (inProgress) { return; } @@ -58,6 +58,7 @@ export const addChainIndexer = async (chainId: number) => { }); INDEXER_REGISTRY[chainId] = task; + task.start(); }; export const removeChainIndexer = async (chainId: number) => { @@ -76,17 +77,7 @@ export const removeChainIndexer = async (chainId: number) => { delete INDEXER_REGISTRY[chainId]; }; -/** - * Returns the cron schedule given the chain's block time and the number of blocks to batch per job. - * Minimum is every 2 seconds. - */ -function createScheduleSeconds({ - blockTimeSeconds, - numBlocks, -}: { blockTimeSeconds: number; numBlocks: number }) { - const pollFrequencySeconds = Math.max( - Math.round(blockTimeSeconds * numBlocks), - 2, - ); - return `*/${pollFrequencySeconds} * * * * *`; +function createScheduleSeconds(blockTimeSeconds: number) { + const pollIntervalSeconds = Math.max(Math.round(blockTimeSeconds), 2); + return `*/${pollIntervalSeconds} * * * * *`; } diff --git a/src/worker/listeners/chain-indexer-listener.ts b/src/worker/listeners/chain-indexer-listener.ts index f2c05e6e0..785b5f64a 100644 --- a/src/worker/listeners/chain-indexer-listener.ts +++ b/src/worker/listeners/chain-indexer-listener.ts @@ -1,21 +1,23 @@ -import cron from "node-cron"; +import { CronJob } from "cron"; import { getConfig } from "../../shared/utils/cache/get-config"; import { logger } from "../../shared/utils/logger"; import { manageChainIndexers } from "../tasks/manage-chain-indexers"; let processChainIndexerStarted = false; -let task: cron.ScheduledTask; +let task: CronJob; export const chainIndexerListener = async (): Promise => { const config = await getConfig(); if (!config.indexerListenerCronSchedule) { return; } + + // Stop the existing task if it exists. if (task) { task.stop(); } - task = cron.schedule(config.indexerListenerCronSchedule, async () => { + task = new CronJob(config.indexerListenerCronSchedule, async () => { if (!processChainIndexerStarted) { processChainIndexerStarted = true; await manageChainIndexers(); @@ -28,4 +30,5 @@ export const chainIndexerListener = async (): Promise => { }); } }); + task.start(); }; diff --git a/yarn.lock b/yarn.lock index f5d594ef6..70ddf56d9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4131,6 +4131,11 @@ resolved "https://registry.yarnpkg.com/@types/long/-/long-4.0.2.tgz#b74129719fc8d11c01868010082d483b7545591a" integrity sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA== +"@types/luxon@~3.6.0": + version "3.6.2" + resolved "https://registry.yarnpkg.com/@types/luxon/-/luxon-3.6.2.tgz#be6536931801f437eafcb9c0f6d6781f72308041" + integrity sha512-R/BdP7OxEMc44l2Ex5lSXHoIXTB2JLNa3y2QISIbr58U/YcsffyQrYW//hZSdrfxrjRZj3GcUoxMPGdO8gSYuw== + "@types/markdown-it@^14.1.1": version "14.1.2" resolved "https://registry.yarnpkg.com/@types/markdown-it/-/markdown-it-14.1.2.tgz#57f2532a0800067d9b934f3521429a2e8bfb4c61" @@ -4154,11 +4159,6 @@ resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.34.tgz#10964ba0dee6ac4cd462e2795b6bebd407303433" integrity sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g== -"@types/node-cron@^3.0.8": - version "3.0.11" - resolved "https://registry.yarnpkg.com/@types/node-cron/-/node-cron-3.0.11.tgz#70b7131f65038ae63cfe841354c8aba363632344" - integrity sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg== - "@types/node@*", "@types/node@>=13.7.0": version "22.10.2" resolved "https://registry.yarnpkg.com/@types/node/-/node-22.10.2.tgz#a485426e6d1fdafc7b0d4c7b24e2c78182ddabb9" @@ -5923,6 +5923,14 @@ cron-parser@^4.6.0, cron-parser@^4.9.0: dependencies: luxon "^3.2.1" +cron@^4.3.0: + version "4.3.0" + resolved "https://registry.yarnpkg.com/cron/-/cron-4.3.0.tgz#c5a62872f74f72294cf1cadef34c72ad8d8f50b5" + integrity sha512-ciiYNLfSlF9MrDqnbMdRWFiA6oizSF7kA1osPP9lRzNu0Uu+AWog1UKy7SkckiDY2irrNjeO6qLyKnXC8oxmrw== + dependencies: + "@types/luxon" "~3.6.0" + luxon "~3.6.0" + cross-fetch@^3.1.4: version "3.1.8" resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.8.tgz#0327eba65fd68a7d119f8fb2bf9334a1a7956f82" @@ -8367,6 +8375,11 @@ luxon@^3.2.1: resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.5.0.tgz#6b6f65c5cd1d61d1fd19dbf07ee87a50bf4b8e20" integrity sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ== +luxon@~3.6.0: + version "3.6.1" + resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.6.1.tgz#d283ffc4c0076cb0db7885ec6da1c49ba97e47b0" + integrity sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ== + magic-sdk@^13.6.2: version "13.6.2" resolved "https://registry.yarnpkg.com/magic-sdk/-/magic-sdk-13.6.2.tgz#68766fd9d1805332d2a00e5da1bd30fce251a6ac" @@ -8728,13 +8741,6 @@ node-addon-api@^7.0.0: resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-7.1.1.tgz#1aba6693b0f255258a049d621329329322aad558" integrity sha512-5m3bsyrjFWE1xf7nz7YXdN4udnVtXK6/Yfgn5qnahL6bCkf2yKt4k3nuTKAtT4r3IG8JNR2ncsIMdZuAzJjHQQ== -node-cron@^3.0.2: - version "3.0.3" - resolved "https://registry.yarnpkg.com/node-cron/-/node-cron-3.0.3.tgz#c4bc7173dd96d96c50bdb51122c64415458caff2" - integrity sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A== - dependencies: - uuid "8.3.2" - node-fetch-native@^1.6.4: version "1.6.4" resolved "https://registry.yarnpkg.com/node-fetch-native/-/node-fetch-native-1.6.4.tgz#679fc8fd8111266d47d7e72c379f1bed9acff06e" @@ -10890,11 +10896,6 @@ uuid@8.0.0: resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.0.0.tgz#bc6ccf91b5ff0ac07bbcdbf1c7c4e150db4dbb6c" integrity sha512-jOXGuXZAWdsTH7eZLtyXMqUb9EcWMGZNbL9YcGBJl4MH4nrxHmZJhEHvyLFrkxo+28uLb/NYRcStH48fnD0Vzw== -uuid@8.3.2: - version "8.3.2" - resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2" - integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg== - uuid@9.0.0: version "9.0.0" resolved "https://registry.yarnpkg.com/uuid/-/uuid-9.0.0.tgz#592f550650024a38ceb0c562f2f6aa435761efb5"