From 45e5326260dd9ffefda9b366468b3e5000792a80 Mon Sep 17 00:00:00 2001 From: Phillip Ho Date: Thu, 24 Apr 2025 23:59:50 +0800 Subject: [PATCH 1/3] fix: remove unneeded listeners that cleared caches --- src/server/index.ts | 2 +- .../routes/configuration/cache/update.ts | 2 +- src/shared/utils/cache/clear-cache.ts | 5 +- src/shared/utils/cron/clear-cache-cron.ts | 7 +- src/worker/index.ts | 12 -- src/worker/listeners/config-listener.ts | 114 ----------------- src/worker/listeners/webhook-listener.ts | 115 ------------------ 7 files changed, 5 insertions(+), 252 deletions(-) delete mode 100644 src/worker/listeners/config-listener.ts delete mode 100644 src/worker/listeners/webhook-listener.ts diff --git a/src/server/index.ts b/src/server/index.ts index 2628edbae..46eaecd04 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -133,5 +133,5 @@ export const initServer = async () => { writeOpenApiToFile(server); await updateTxListener(); - await clearCacheCron("server"); + await clearCacheCron(); }; diff --git a/src/server/routes/configuration/cache/update.ts b/src/server/routes/configuration/cache/update.ts index 5ea2c035e..c1f88211d 100644 --- a/src/server/routes/configuration/cache/update.ts +++ b/src/server/routes/configuration/cache/update.ts @@ -47,7 +47,7 @@ export async function updateCacheConfiguration(fastify: FastifyInstance) { await updateConfiguration({ ...req.body }); const config = await getConfig(false); // restarting cache cron with updated cron schedule - await clearCacheCron("server"); + await clearCacheCron(); res.status(StatusCodes.OK).send({ result: { clearCacheCronSchedule: config.clearCacheCronSchedule, diff --git a/src/shared/utils/cache/clear-cache.ts b/src/shared/utils/cache/clear-cache.ts index e70fa38be..10da2d757 100644 --- a/src/shared/utils/cache/clear-cache.ts +++ b/src/shared/utils/cache/clear-cache.ts @@ -1,4 +1,3 @@ -import type { env } from "../env"; import { accessTokenCache } from "./access-token"; import { invalidateConfig } from "./get-config"; import { sdkCache } from "./get-sdk"; @@ -6,9 +5,7 @@ import { walletsCache } from "./get-wallet"; import { webhookCache } from "./get-webhook"; import { keypairCache } from "./keypair"; -export const clearCache = async ( - _service: (typeof env)["LOG_SERVICES"][0], -): Promise => { +export const clearCache = async (): Promise => { invalidateConfig(); webhookCache.clear(); sdkCache.clear(); diff --git a/src/shared/utils/cron/clear-cache-cron.ts b/src/shared/utils/cron/clear-cache-cron.ts index 61c3d3bf3..6cb52a545 100644 --- a/src/shared/utils/cron/clear-cache-cron.ts +++ b/src/shared/utils/cron/clear-cache-cron.ts @@ -1,13 +1,10 @@ import { CronJob } from "cron"; import { clearCache } from "../cache/clear-cache"; import { getConfig } from "../cache/get-config"; -import type { env } from "../env"; let task: CronJob; -export const clearCacheCron = async ( - service: (typeof env)["LOG_SERVICES"][0], -) => { +export const clearCacheCron = async () => { const config = await getConfig(); if (!config.clearCacheCronSchedule) { @@ -20,7 +17,7 @@ export const clearCacheCron = async ( } task = new CronJob(config.clearCacheCronSchedule, async () => { - await clearCache(service); + await clearCache(); }); task.start(); }; diff --git a/src/worker/index.ts b/src/worker/index.ts index 41b3825d3..06c44e682 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -3,10 +3,6 @@ import { newConfigurationListener, updatedConfigurationListener, } from "./listeners/config-listener"; -import { - newWebhooksListener, - updatedWebhooksListener, -} from "./listeners/webhook-listener"; import { initCancelRecycledNoncesWorker } from "./tasks/cancel-recycled-nonces-worker"; import { initMineTransactionWorker } from "./tasks/mine-transaction-worker"; import { initNonceHealthCheckWorker } from "./tasks/nonce-health-check-worker"; @@ -31,14 +27,6 @@ export const initWorker = async () => { await initNonceResyncWorker(); await initWalletSubscriptionWorker(); - // Listen for new & updated configuration data. - await newConfigurationListener(); - await updatedConfigurationListener(); - - // Listen for new & updated webhooks data. - await newWebhooksListener(); - await updatedWebhooksListener(); - // Contract subscriptions. await chainIndexerListener(); }; diff --git a/src/worker/listeners/config-listener.ts b/src/worker/listeners/config-listener.ts deleted file mode 100644 index bda2e9c9f..000000000 --- a/src/worker/listeners/config-listener.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { knex } from "../../shared/db/client"; -import { getConfig } from "../../shared/utils/cache/get-config"; -import { clearCacheCron } from "../../shared/utils/cron/clear-cache-cron"; -import { logger } from "../../shared/utils/logger"; -import { chainIndexerListener } from "./chain-indexer-listener"; - -export const newConfigurationListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for new configuration data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN new_configuration_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - // Update Configs Data - await getConfig(false); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -}; - -export const updatedConfigurationListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for updated configuration data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN updated_configuration_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - // Update Configs Data - logger({ - service: "worker", - level: "info", - message: "Updated configuration data", - }); - await getConfig(false); - await clearCacheCron("worker"); - await chainIndexerListener(); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -}; diff --git a/src/worker/listeners/webhook-listener.ts b/src/worker/listeners/webhook-listener.ts deleted file mode 100644 index 759182170..000000000 --- a/src/worker/listeners/webhook-listener.ts +++ /dev/null @@ -1,115 +0,0 @@ -import { knex } from "../../shared/db/client"; -import { webhookCache } from "../../shared/utils/cache/get-webhook"; -import { logger } from "../../shared/utils/logger"; - -export const newWebhooksListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for new webhooks data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN new_webhook_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - logger({ - service: "worker", - level: "info", - message: "Received new webhooks data", - }); - // Update Webhooks Data - webhookCache.clear(); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -}; - -export const updatedWebhooksListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for updated webhooks data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN updated_webhook_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - // Update Configs Data - logger({ - service: "worker", - level: "info", - message: "Received updated webhooks data", - }); - webhookCache.clear(); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -}; From e7e2d618b308c9077c3233c48b687e315e5ddba7 Mon Sep 17 00:00:00 2001 From: Phillip Ho Date: Fri, 25 Apr 2025 00:05:38 +0800 Subject: [PATCH 2/3] lint --- src/worker/index.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/worker/index.ts b/src/worker/index.ts index 06c44e682..251c427ca 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,8 +1,4 @@ import { chainIndexerListener } from "./listeners/chain-indexer-listener"; -import { - newConfigurationListener, - updatedConfigurationListener, -} from "./listeners/config-listener"; import { initCancelRecycledNoncesWorker } from "./tasks/cancel-recycled-nonces-worker"; import { initMineTransactionWorker } from "./tasks/mine-transaction-worker"; import { initNonceHealthCheckWorker } from "./tasks/nonce-health-check-worker"; From 5a22dc19279dc231b818263c0dfa3e6a240246b7 Mon Sep 17 00:00:00 2001 From: Phillip Ho Date: Fri, 25 Apr 2025 00:09:22 +0800 Subject: [PATCH 3/3] remove websocket listener too, not functional since v2 --- src/server/index.ts | 2 - src/server/listeners/update-tx-listener.ts | 88 ---------------------- 2 files changed, 90 deletions(-) delete mode 100644 src/server/listeners/update-tx-listener.ts diff --git a/src/server/index.ts b/src/server/index.ts index 46eaecd04..80ec10994 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -8,7 +8,6 @@ import { env } from "../shared/utils/env"; import { logger } from "../shared/utils/logger"; import { metricsServer } from "../shared/utils/prometheus"; import { withServerUsageReporting } from "../shared/utils/usage"; -import { updateTxListener } from "./listeners/update-tx-listener"; import { withAdminRoutes } from "./middleware/admin-routes"; import { withAuth } from "./middleware/auth"; import { withCors } from "./middleware/cors"; @@ -132,6 +131,5 @@ export const initServer = async () => { }); writeOpenApiToFile(server); - await updateTxListener(); await clearCacheCron(); }; diff --git a/src/server/listeners/update-tx-listener.ts b/src/server/listeners/update-tx-listener.ts deleted file mode 100644 index e7302fd0e..000000000 --- a/src/server/listeners/update-tx-listener.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { knex } from "../../shared/db/client"; -import { TransactionDB } from "../../shared/db/transactions/db"; -import { logger } from "../../shared/utils/logger"; -import { toTransactionSchema } from "../schemas/transaction"; -import { subscriptionsData } from "../schemas/websocket"; -import { - formatSocketMessage, - getStatusMessageAndConnectionStatus, -} from "../utils/websocket"; - -export const updateTxListener = async (): Promise => { - logger({ - service: "server", - level: "info", - message: "Listening for updated transactions", - }); - - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN updated_transaction_data"); - - connection.on( - "notification", - async (msg: { channel: string; payload: string }) => { - const parsedPayload = JSON.parse(msg.payload); - - // Send websocket message - const index = subscriptionsData.findIndex( - (sub) => sub.requestId === parsedPayload.id, - ); - - if (index === -1) { - return; - } - - const userSubscription = subscriptionsData[index]; - const transaction = await TransactionDB.get(parsedPayload.id); - const returnData = transaction ? toTransactionSchema(transaction) : null; - - logger({ - service: "server", - level: "info", - message: `[updateTxListener] Sending websocket update for queueId: ${parsedPayload.id}, status ${returnData?.status}.`, - }); - - const { message, closeConnection } = - await getStatusMessageAndConnectionStatus(returnData); - userSubscription.socket.send( - await formatSocketMessage(returnData, message), - ); - closeConnection ? userSubscription.socket.close() : null; - }, - ); - - connection.on("end", async () => { - logger({ - service: "server", - level: "info", - message: "[updateTxListener] Connection database ended", - }); - - knex.client.releaseConnection(connection); - await knex.destroy(); - - logger({ - service: "server", - level: "info", - message: "[updateTxListener] Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "server", - level: "error", - message: "[updateTxListener] Database connection error", - error: err, - }); - - knex.client.releaseConnection(connection); - await knex.destroy(); - - logger({ - service: "worker", - level: "info", - message: "[updateTxListener] Released database connection on error", - }); - }); -};