diff --git a/src/server/index.ts b/src/server/index.ts index 2628edbae..80ec10994 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -8,7 +8,6 @@ import { env } from "../shared/utils/env"; import { logger } from "../shared/utils/logger"; import { metricsServer } from "../shared/utils/prometheus"; import { withServerUsageReporting } from "../shared/utils/usage"; -import { updateTxListener } from "./listeners/update-tx-listener"; import { withAdminRoutes } from "./middleware/admin-routes"; import { withAuth } from "./middleware/auth"; import { withCors } from "./middleware/cors"; @@ -132,6 +131,5 @@ export const initServer = async () => { }); writeOpenApiToFile(server); - await updateTxListener(); - await clearCacheCron("server"); + await clearCacheCron(); }; diff --git a/src/server/listeners/update-tx-listener.ts b/src/server/listeners/update-tx-listener.ts deleted file mode 100644 index e7302fd0e..000000000 --- a/src/server/listeners/update-tx-listener.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { knex } from "../../shared/db/client"; -import { TransactionDB } from "../../shared/db/transactions/db"; -import { logger } from "../../shared/utils/logger"; -import { toTransactionSchema } from "../schemas/transaction"; -import { subscriptionsData } from "../schemas/websocket"; -import { - formatSocketMessage, - getStatusMessageAndConnectionStatus, -} from "../utils/websocket"; - -export const updateTxListener = async (): Promise => { - logger({ - service: "server", - level: "info", - message: "Listening for updated transactions", - }); - - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN updated_transaction_data"); - - connection.on( - "notification", - async (msg: { channel: string; payload: string }) => { - const parsedPayload = JSON.parse(msg.payload); - - // Send websocket message - const index = subscriptionsData.findIndex( - (sub) => sub.requestId === parsedPayload.id, - ); - - if (index === -1) { - return; - } - - const userSubscription = subscriptionsData[index]; - const transaction = await TransactionDB.get(parsedPayload.id); - const returnData = transaction ? toTransactionSchema(transaction) : null; - - logger({ - service: "server", - level: "info", - message: `[updateTxListener] Sending websocket update for queueId: ${parsedPayload.id}, status ${returnData?.status}.`, - }); - - const { message, closeConnection } = - await getStatusMessageAndConnectionStatus(returnData); - userSubscription.socket.send( - await formatSocketMessage(returnData, message), - ); - closeConnection ? userSubscription.socket.close() : null; - }, - ); - - connection.on("end", async () => { - logger({ - service: "server", - level: "info", - message: "[updateTxListener] Connection database ended", - }); - - knex.client.releaseConnection(connection); - await knex.destroy(); - - logger({ - service: "server", - level: "info", - message: "[updateTxListener] Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "server", - level: "error", - message: "[updateTxListener] Database connection error", - error: err, - }); - - knex.client.releaseConnection(connection); - await knex.destroy(); - - logger({ - service: "worker", - level: "info", - message: "[updateTxListener] Released database connection on error", - }); - }); -}; diff --git a/src/server/routes/configuration/cache/update.ts b/src/server/routes/configuration/cache/update.ts index 5ea2c035e..c1f88211d 100644 --- a/src/server/routes/configuration/cache/update.ts +++ b/src/server/routes/configuration/cache/update.ts @@ -47,7 +47,7 @@ export async function updateCacheConfiguration(fastify: FastifyInstance) { await updateConfiguration({ ...req.body }); const config = await getConfig(false); // restarting cache cron with updated cron schedule - await clearCacheCron("server"); + await clearCacheCron(); res.status(StatusCodes.OK).send({ result: { clearCacheCronSchedule: config.clearCacheCronSchedule, diff --git a/src/shared/utils/cache/clear-cache.ts b/src/shared/utils/cache/clear-cache.ts index e70fa38be..10da2d757 100644 --- a/src/shared/utils/cache/clear-cache.ts +++ b/src/shared/utils/cache/clear-cache.ts @@ -1,4 +1,3 @@ -import type { env } from "../env"; import { accessTokenCache } from "./access-token"; import { invalidateConfig } from "./get-config"; import { sdkCache } from "./get-sdk"; @@ -6,9 +5,7 @@ import { walletsCache } from "./get-wallet"; import { webhookCache } from "./get-webhook"; import { keypairCache } from "./keypair"; -export const clearCache = async ( - _service: (typeof env)["LOG_SERVICES"][0], -): Promise => { +export const clearCache = async (): Promise => { invalidateConfig(); webhookCache.clear(); sdkCache.clear(); diff --git a/src/shared/utils/cron/clear-cache-cron.ts b/src/shared/utils/cron/clear-cache-cron.ts index 61c3d3bf3..6cb52a545 100644 --- a/src/shared/utils/cron/clear-cache-cron.ts +++ b/src/shared/utils/cron/clear-cache-cron.ts @@ -1,13 +1,10 @@ import { CronJob } from "cron"; import { clearCache } from "../cache/clear-cache"; import { getConfig } from "../cache/get-config"; -import type { env } from "../env"; let task: CronJob; -export const clearCacheCron = async ( - service: (typeof env)["LOG_SERVICES"][0], -) => { +export const clearCacheCron = async () => { const config = await getConfig(); if (!config.clearCacheCronSchedule) { @@ -20,7 +17,7 @@ export const clearCacheCron = async ( } task = new CronJob(config.clearCacheCronSchedule, async () => { - await clearCache(service); + await clearCache(); }); task.start(); }; diff --git a/src/worker/index.ts b/src/worker/index.ts index 41b3825d3..251c427ca 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,12 +1,4 @@ import { chainIndexerListener } from "./listeners/chain-indexer-listener"; -import { - newConfigurationListener, - updatedConfigurationListener, -} from "./listeners/config-listener"; -import { - newWebhooksListener, - updatedWebhooksListener, -} from "./listeners/webhook-listener"; import { initCancelRecycledNoncesWorker } from "./tasks/cancel-recycled-nonces-worker"; import { initMineTransactionWorker } from "./tasks/mine-transaction-worker"; import { initNonceHealthCheckWorker } from "./tasks/nonce-health-check-worker"; @@ -31,14 +23,6 @@ export const initWorker = async () => { await initNonceResyncWorker(); await initWalletSubscriptionWorker(); - // Listen for new & updated configuration data. - await newConfigurationListener(); - await updatedConfigurationListener(); - - // Listen for new & updated webhooks data. - await newWebhooksListener(); - await updatedWebhooksListener(); - // Contract subscriptions. await chainIndexerListener(); }; diff --git a/src/worker/listeners/config-listener.ts b/src/worker/listeners/config-listener.ts deleted file mode 100644 index bda2e9c9f..000000000 --- a/src/worker/listeners/config-listener.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { knex } from "../../shared/db/client"; -import { getConfig } from "../../shared/utils/cache/get-config"; -import { clearCacheCron } from "../../shared/utils/cron/clear-cache-cron"; -import { logger } from "../../shared/utils/logger"; -import { chainIndexerListener } from "./chain-indexer-listener"; - -export const newConfigurationListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for new configuration data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN new_configuration_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - // Update Configs Data - await getConfig(false); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -}; - -export const updatedConfigurationListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for updated configuration data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN updated_configuration_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - // Update Configs Data - logger({ - service: "worker", - level: "info", - message: "Updated configuration data", - }); - await getConfig(false); - await clearCacheCron("worker"); - await chainIndexerListener(); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -}; diff --git a/src/worker/listeners/webhook-listener.ts b/src/worker/listeners/webhook-listener.ts deleted file mode 100644 index 759182170..000000000 --- a/src/worker/listeners/webhook-listener.ts +++ /dev/null @@ -1,115 +0,0 @@ -import { knex } from "../../shared/db/client"; -import { webhookCache } from "../../shared/utils/cache/get-webhook"; -import { logger } from "../../shared/utils/logger"; - -export const newWebhooksListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for new webhooks data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN new_webhook_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - logger({ - service: "worker", - level: "info", - message: "Received new webhooks data", - }); - // Update Webhooks Data - webhookCache.clear(); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -}; - -export const updatedWebhooksListener = async (): Promise => { - logger({ - service: "worker", - level: "info", - message: "Listening for updated webhooks data", - }); - - // TODO: This doesn't even need to be a listener - const connection = await knex.client.acquireConnection(); - connection.query("LISTEN updated_webhook_data"); - - // Whenever we receive a new transaction, process it - connection.on( - "notification", - async (_msg: { channel: string; payload: string }) => { - // Update Configs Data - logger({ - service: "worker", - level: "info", - message: "Received updated webhooks data", - }); - webhookCache.clear(); - }, - ); - - connection.on("end", async () => { - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on end", - }); - }); - - connection.on("error", async (err: unknown) => { - logger({ - service: "worker", - level: "error", - message: "Database connection error", - error: err, - }); - - await knex.destroy(); - knex.client.releaseConnection(connection); - - logger({ - service: "worker", - level: "info", - message: "Released database connection on error", - error: err, - }); - }); -};