Skip to content

Commit 52981f0

Browse files
authored
MineTransactionWorker retries more frequently, no cap on resends (#633)
* fix: add from address to claimTo to support merkle proofs * fix: have mine worker retry more frequently * remove mine worker in retry-failed * add log line for nonce acquire
1 parent 9fd4d9c commit 52981f0

File tree

5 files changed

+46
-27
lines changed

5 files changed

+46
-27
lines changed

src/db/wallets/walletNonce.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ export const deleteAllNonces = async () => {
184184
const keys = [
185185
...(await redis.keys("nonce:*")),
186186
...(await redis.keys("nonce-recycled:*")),
187+
...(await redis.keys("sent-nonce:*")),
187188
];
188189
if (keys.length > 0) {
189190
await redis.del(keys);

src/server/routes/transaction/retry-failed.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { eth_getTransactionReceipt, getRpcClient } from "thirdweb";
55
import { TransactionDB } from "../../../db/transactions/db";
66
import { getChain } from "../../../utils/chain";
77
import { thirdwebClient } from "../../../utils/sdk";
8+
import { MineTransactionQueue } from "../../../worker/queues/mineTransactionQueue";
89
import { SendTransactionQueue } from "../../../worker/queues/sendTransactionQueue";
910
import { createCustomError } from "../../middleware/error";
1011
import { standardResponseSchema } from "../../schemas/sharedApiSchemas";
@@ -108,15 +109,23 @@ export async function retryFailedTransaction(fastify: FastifyInstance) {
108109
}
109110
}
110111

111-
const job = await SendTransactionQueue.q.getJob(
112+
const sendJob = await SendTransactionQueue.q.getJob(
112113
SendTransactionQueue.jobId({
113114
queueId: transaction.queueId,
114115
resendCount: 0,
115116
}),
116117
);
118+
if (sendJob) {
119+
await sendJob.remove();
120+
}
117121

118-
if (job) {
119-
await job.remove();
122+
const mineJob = await MineTransactionQueue.q.getJob(
123+
MineTransactionQueue.jobId({
124+
queueId: transaction.queueId,
125+
}),
126+
);
127+
if (mineJob) {
128+
await mineJob.remove();
120129
}
121130

122131
await SendTransactionQueue.add({

src/worker/queues/mineTransactionQueue.ts

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,8 @@ export type MineTransactionData = {
1010
export class MineTransactionQueue {
1111
static q = new Queue<string>("transactions-2-mine", {
1212
connection: redis,
13-
defaultJobOptions: {
14-
...defaultJobOptions,
15-
// Delay confirming the tx by 500ms.
16-
delay: 500,
17-
// Retry after 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 512s, 1024s (17 minutes)
18-
// This needs to be long enough to handle transactions stuck in mempool.
19-
// @TODO: This can be more optimized based on the chain block time.
20-
attempts: 10,
21-
backoff: { type: "exponential", delay: 2_000 },
22-
},
13+
// Backoff strategy is defined on the worker (`BackeoffStrategy`) and when adding to the queue (`attempts`).
14+
defaultJobOptions,
2315
});
2416

2517
// There must be a worker to poll the result for every transaction hash,
@@ -29,7 +21,11 @@ export class MineTransactionQueue {
2921
static add = async (data: MineTransactionData) => {
3022
const serialized = superjson.stringify(data);
3123
const jobId = this.jobId(data);
32-
await this.q.add(jobId, serialized, { jobId });
24+
await this.q.add(jobId, serialized, {
25+
jobId,
26+
attempts: 200, // > 30 minutes with the backoffStrategy defined on the worker
27+
backoff: { type: "custom" },
28+
});
3329
};
3430

3531
static length = async () => this.q.getWaitingCount();

src/worker/tasks/mineTransactionWorker.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -159,19 +159,17 @@ const _mineTransaction = async (
159159

160160
// Resend the transaction (after some initial delay).
161161
const config = await getConfig();
162-
if (resendCount < config.maxRetriesPerTx) {
163-
const blockNumber = await getBlockNumberish(chainId);
164-
const ellapsedBlocks = blockNumber - sentAtBlock;
165-
if (ellapsedBlocks >= config.minEllapsedBlocksBeforeRetry) {
166-
const message = `Resending transaction after ${ellapsedBlocks} blocks. blockNumber=${blockNumber} sentAtBlock=${sentAtBlock}`;
167-
job.log(message);
168-
logger({ service: "worker", level: "info", queueId, message });
162+
const blockNumber = await getBlockNumberish(chainId);
163+
const ellapsedBlocks = blockNumber - sentAtBlock;
164+
if (ellapsedBlocks >= config.minEllapsedBlocksBeforeRetry) {
165+
const message = `Resending transaction after ${ellapsedBlocks} blocks. blockNumber=${blockNumber} sentAtBlock=${sentAtBlock}`;
166+
job.log(message);
167+
logger({ service: "worker", level: "info", queueId, message });
169168

170-
await SendTransactionQueue.add({
171-
queueId,
172-
resendCount: resendCount + 1,
173-
});
174-
}
169+
await SendTransactionQueue.add({
170+
queueId,
171+
resendCount: resendCount + 1,
172+
});
175173
}
176174

177175
return null;
@@ -229,6 +227,12 @@ export const initMineTransactionWorker = () => {
229227
const _worker = new Worker(MineTransactionQueue.q.name, handler, {
230228
concurrency: env.CONFIRM_TRANSACTION_QUEUE_CONCURRENCY,
231229
connection: redis,
230+
settings: {
231+
backoffStrategy: (attemptsMade: number) => {
232+
// Retries after: 2s, 4s, 6s, 8s, 10s, 10s, 10s, 10s, ...
233+
return Math.min(attemptsMade * 2_000, 10_000);
234+
},
235+
},
232236
});
233237

234238
// If a transaction fails to mine after all retries, set it as errored and release the nonce.

src/worker/tasks/sendTransactionWorker.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
isReplacementGasFeeTooLow,
2323
prettifyError,
2424
} from "../../utils/error";
25+
import { logger } from "../../utils/logger";
2526
import { getChecksumAddress } from "../../utils/primitiveTypes";
2627
import { redis } from "../../utils/redis/redis";
2728
import { thirdwebClient } from "../../utils/sdk";
@@ -181,7 +182,15 @@ const _sendTransaction = async (
181182

182183
// Acquire an unused nonce for this transaction.
183184
const { nonce, isRecycledNonce } = await acquireNonce(chainId, from);
184-
job.log(`Acquired nonce ${nonce}. isRecycledNonce=${isRecycledNonce}`);
185+
job.log(
186+
`Acquired nonce ${nonce} for transaction ${queuedTransaction.queueId}. isRecycledNonce=${isRecycledNonce}`,
187+
);
188+
logger({
189+
level: "info",
190+
message: `Acquired nonce ${nonce} for transaction ${queuedTransaction.queueId}. isRecycledNonce=${isRecycledNonce}`,
191+
service: "worker",
192+
});
193+
185194
populatedTransaction.nonce = nonce;
186195
job.log(`Sending transaction: ${stringify(populatedTransaction)}`);
187196

0 commit comments

Comments
 (0)