Skip to content

chore: Allow overriding Contract Subscription cron #882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"aws-kms-signer": "^0.5.3",
"base-64": "^1.0.0",
"bullmq": "^5.11.0",
"cron": "^4.3.0",
"cron-parser": "^4.9.0",
"crypto": "^1.0.1",
"crypto-js": "^4.2.0",
Expand All @@ -63,7 +64,6 @@
"jsonwebtoken": "^9.0.2",
"knex": "^3.1.0",
"mnemonist": "^0.39.8",
"node-cron": "^3.0.2",
"ox": "0.6.9",
"pg": "^8.11.3",
"prisma": "^5.14.0",
Expand All @@ -82,7 +82,6 @@
"@types/crypto-js": "^4.2.2",
"@types/jsonwebtoken": "^9.0.6",
"@types/node": "^18.15.4",
"@types/node-cron": "^3.0.8",
"@types/pg": "^8.6.6",
"@types/uuid": "^9.0.1",
"@types/ws": "^8.5.5",
Expand Down
9 changes: 6 additions & 3 deletions src/shared/utils/cron/clear-cache-cron.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import cron from "node-cron";
import { CronJob } from "cron";
import { clearCache } from "../cache/clear-cache";
import { getConfig } from "../cache/get-config";
import type { env } from "../env";

let task: cron.ScheduledTask;
let task: CronJob;

export const clearCacheCron = async (
service: (typeof env)["LOG_SERVICES"][0],
) => {
Expand All @@ -13,11 +14,13 @@ export const clearCacheCron = async (
return;
}

// Stop the existing task if it exists.
if (task) {
task.stop();
}

task = cron.schedule(config.clearCacheCronSchedule, async () => {
task = new CronJob(config.clearCacheCronSchedule, async () => {
await clearCache(service);
});
task.start();
};
8 changes: 4 additions & 4 deletions src/shared/utils/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ export const env = createEnv({
QUEUE_FAIL_HISTORY_COUNT: z.coerce.number().default(10_000),
// Sets the number of recent nonces to map to queue IDs.
NONCE_MAP_COUNT: z.coerce.number().default(10_000),
// Sets the estimated number of blocks to query per contract subscription job. Defaults to 1 block (real-time).
CONTRACT_SUBSCRIPTION_BLOCK_RANGE: z.coerce.number().default(1),
// Overrides the cron schedule for contract subscription jobs.
CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE: z.string().optional(),

ENABLE_KEYPAIR_AUTH: boolEnvSchema(false),
ENABLE_CUSTOM_HMAC_AUTH: boolEnvSchema(false),
Expand Down Expand Up @@ -138,8 +138,8 @@ export const env = createEnv({
QUEUE_COMPLETE_HISTORY_COUNT: process.env.QUEUE_COMPLETE_HISTORY_COUNT,
QUEUE_FAIL_HISTORY_COUNT: process.env.QUEUE_FAIL_HISTORY_COUNT,
NONCE_MAP_COUNT: process.env.NONCE_MAP_COUNT,
CONTRACT_SUBSCRIPTION_BLOCK_RANGE:
process.env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE:
process.env.CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE,
EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS:
process.env.EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS,
EXPERIMENTAL__MAX_GAS_PRICE_WEI:
Expand Down
55 changes: 23 additions & 32 deletions src/worker/indexers/chain-indexer-registry.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
import cron from "node-cron";
import { getBlockTimeSeconds } from "../../shared/utils/indexer/get-block-time";
import { logger } from "../../shared/utils/logger";
import { handleContractSubscriptions } from "../tasks/chain-indexer";
import { env } from "../../shared/utils/env";
import { CronJob } from "cron";

// @TODO: Move all worker logic to Bullmq to better handle multiple hosts.
export const INDEXER_REGISTRY = {} as Record<number, cron.ScheduledTask>;
export const INDEXER_REGISTRY: Record<number, CronJob> = {};

export const addChainIndexer = async (chainId: number) => {
if (INDEXER_REGISTRY[chainId]) {
return;
}

// Estimate the block time in the last 100 blocks. Default to 2 second block times.
let blockTimeSeconds: number;
try {
blockTimeSeconds = await getBlockTimeSeconds(chainId, 100);
} catch (error) {
logger({
service: "worker",
level: "error",
message: `Could not estimate block time for chain ${chainId}`,
error,
});
blockTimeSeconds = 2;
let cronSchedule = env.CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE;
if (!cronSchedule) {
// Estimate the block time in the last 100 blocks. Default to 2 second block times.
let blockTimeSeconds: number;
try {
blockTimeSeconds = await getBlockTimeSeconds(chainId, 100);
} catch (error) {
logger({
service: "worker",
level: "error",
message: `Could not estimate block time for chain ${chainId}`,
error,
});
blockTimeSeconds = 2;
}
cronSchedule = createScheduleSeconds(blockTimeSeconds);
}
const cronSchedule = createScheduleSeconds({
blockTimeSeconds,
numBlocks: env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
});
logger({
service: "worker",
level: "info",
Expand All @@ -37,7 +37,7 @@ export const addChainIndexer = async (chainId: number) => {

let inProgress = false;

const task = cron.schedule(cronSchedule, async () => {
const task = new CronJob(cronSchedule, async () => {
if (inProgress) {
return;
}
Expand All @@ -58,6 +58,7 @@ export const addChainIndexer = async (chainId: number) => {
});

INDEXER_REGISTRY[chainId] = task;
task.start();
};

export const removeChainIndexer = async (chainId: number) => {
Expand All @@ -76,17 +77,7 @@ export const removeChainIndexer = async (chainId: number) => {
delete INDEXER_REGISTRY[chainId];
};

/**
* Returns the cron schedule given the chain's block time and the number of blocks to batch per job.
* Minimum is every 2 seconds.
*/
function createScheduleSeconds({
blockTimeSeconds,
numBlocks,
}: { blockTimeSeconds: number; numBlocks: number }) {
const pollFrequencySeconds = Math.max(
Math.round(blockTimeSeconds * numBlocks),
2,
);
return `*/${pollFrequencySeconds} * * * * *`;
function createScheduleSeconds(blockTimeSeconds: number) {
const pollIntervalSeconds = Math.max(Math.round(blockTimeSeconds), 2);
return `*/${pollIntervalSeconds} * * * * *`;
}
9 changes: 6 additions & 3 deletions src/worker/listeners/chain-indexer-listener.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import cron from "node-cron";
import { CronJob } from "cron";
import { getConfig } from "../../shared/utils/cache/get-config";
import { logger } from "../../shared/utils/logger";
import { manageChainIndexers } from "../tasks/manage-chain-indexers";

let processChainIndexerStarted = false;
let task: cron.ScheduledTask;
let task: CronJob;

export const chainIndexerListener = async (): Promise<void> => {
const config = await getConfig();
if (!config.indexerListenerCronSchedule) {
return;
}

// Stop the existing task if it exists.
if (task) {
task.stop();
}

task = cron.schedule(config.indexerListenerCronSchedule, async () => {
task = new CronJob(config.indexerListenerCronSchedule, async () => {
if (!processChainIndexerStarted) {
processChainIndexerStarted = true;
await manageChainIndexers();
Expand All @@ -28,4 +30,5 @@ export const chainIndexerListener = async (): Promise<void> => {
});
}
});
task.start();
};
35 changes: 18 additions & 17 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4131,6 +4131,11 @@
resolved "https://registry.yarnpkg.com/@types/long/-/long-4.0.2.tgz#b74129719fc8d11c01868010082d483b7545591a"
integrity sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA==

"@types/luxon@~3.6.0":
version "3.6.2"
resolved "https://registry.yarnpkg.com/@types/luxon/-/luxon-3.6.2.tgz#be6536931801f437eafcb9c0f6d6781f72308041"
integrity sha512-R/BdP7OxEMc44l2Ex5lSXHoIXTB2JLNa3y2QISIbr58U/YcsffyQrYW//hZSdrfxrjRZj3GcUoxMPGdO8gSYuw==

"@types/markdown-it@^14.1.1":
version "14.1.2"
resolved "https://registry.yarnpkg.com/@types/markdown-it/-/markdown-it-14.1.2.tgz#57f2532a0800067d9b934f3521429a2e8bfb4c61"
Expand All @@ -4154,11 +4159,6 @@
resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.34.tgz#10964ba0dee6ac4cd462e2795b6bebd407303433"
integrity sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==

"@types/node-cron@^3.0.8":
version "3.0.11"
resolved "https://registry.yarnpkg.com/@types/node-cron/-/node-cron-3.0.11.tgz#70b7131f65038ae63cfe841354c8aba363632344"
integrity sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==

"@types/node@*", "@types/node@>=13.7.0":
version "22.10.2"
resolved "https://registry.yarnpkg.com/@types/node/-/node-22.10.2.tgz#a485426e6d1fdafc7b0d4c7b24e2c78182ddabb9"
Expand Down Expand Up @@ -5923,6 +5923,14 @@ cron-parser@^4.6.0, cron-parser@^4.9.0:
dependencies:
luxon "^3.2.1"

cron@^4.3.0:
version "4.3.0"
resolved "https://registry.yarnpkg.com/cron/-/cron-4.3.0.tgz#c5a62872f74f72294cf1cadef34c72ad8d8f50b5"
integrity sha512-ciiYNLfSlF9MrDqnbMdRWFiA6oizSF7kA1osPP9lRzNu0Uu+AWog1UKy7SkckiDY2irrNjeO6qLyKnXC8oxmrw==
dependencies:
"@types/luxon" "~3.6.0"
luxon "~3.6.0"

cross-fetch@^3.1.4:
version "3.1.8"
resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.8.tgz#0327eba65fd68a7d119f8fb2bf9334a1a7956f82"
Expand Down Expand Up @@ -8367,6 +8375,11 @@ luxon@^3.2.1:
resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.5.0.tgz#6b6f65c5cd1d61d1fd19dbf07ee87a50bf4b8e20"
integrity sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==

luxon@~3.6.0:
version "3.6.1"
resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.6.1.tgz#d283ffc4c0076cb0db7885ec6da1c49ba97e47b0"
integrity sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==

magic-sdk@^13.6.2:
version "13.6.2"
resolved "https://registry.yarnpkg.com/magic-sdk/-/magic-sdk-13.6.2.tgz#68766fd9d1805332d2a00e5da1bd30fce251a6ac"
Expand Down Expand Up @@ -8728,13 +8741,6 @@ node-addon-api@^7.0.0:
resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-7.1.1.tgz#1aba6693b0f255258a049d621329329322aad558"
integrity sha512-5m3bsyrjFWE1xf7nz7YXdN4udnVtXK6/Yfgn5qnahL6bCkf2yKt4k3nuTKAtT4r3IG8JNR2ncsIMdZuAzJjHQQ==

node-cron@^3.0.2:
version "3.0.3"
resolved "https://registry.yarnpkg.com/node-cron/-/node-cron-3.0.3.tgz#c4bc7173dd96d96c50bdb51122c64415458caff2"
integrity sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A==
dependencies:
uuid "8.3.2"

node-fetch-native@^1.6.4:
version "1.6.4"
resolved "https://registry.yarnpkg.com/node-fetch-native/-/node-fetch-native-1.6.4.tgz#679fc8fd8111266d47d7e72c379f1bed9acff06e"
Expand Down Expand Up @@ -10890,11 +10896,6 @@ uuid@8.0.0:
resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.0.0.tgz#bc6ccf91b5ff0ac07bbcdbf1c7c4e150db4dbb6c"
integrity sha512-jOXGuXZAWdsTH7eZLtyXMqUb9EcWMGZNbL9YcGBJl4MH4nrxHmZJhEHvyLFrkxo+28uLb/NYRcStH48fnD0Vzw==

uuid@8.3.2:
version "8.3.2"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2"
integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==

uuid@9.0.0:
version "9.0.0"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-9.0.0.tgz#592f550650024a38ceb0c562f2f6aa435761efb5"
Expand Down
Loading