Skip to content

Commit 70bbdbd

Browse files
committed
correctly handle job delays for bullmq
1 parent 2207433 commit 70bbdbd

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

src/worker/tasks/send-transaction-worker.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import assert from "node:assert";
2-
import { type Job, type Processor, Worker } from "bullmq";
2+
import { DelayedError, type Job, type Processor, Worker } from "bullmq";
33
import superjson from "superjson";
44
import {
55
type Address,
@@ -70,7 +70,10 @@ type VersionedUserOp = Awaited<ReturnType<typeof prepareUserOp>>;
7070
*
7171
* This worker also handles retried EOA transactions.
7272
*/
73-
const handler: Processor<string, void, string> = async (job: Job<string>) => {
73+
const handler: Processor<string, void, string> = async (
74+
job: Job<string>,
75+
token?: string,
76+
) => {
7477
const { queueId, resendCount } = superjson.parse<SendTransactionData>(
7578
job.data,
7679
);
@@ -89,9 +92,9 @@ const handler: Processor<string, void, string> = async (job: Job<string>) => {
8992

9093
if (transaction.status === "queued") {
9194
if (transaction.isUserOp) {
92-
resultTransaction = await _sendUserOp(job, transaction);
95+
resultTransaction = await _sendUserOp(job, transaction, token);
9396
} else {
94-
resultTransaction = await _sendTransaction(job, transaction);
97+
resultTransaction = await _sendTransaction(job, transaction, token);
9598
}
9699
} else if (transaction.status === "sent") {
97100
resultTransaction = await _resendTransaction(job, transaction, resendCount);
@@ -121,6 +124,7 @@ const handler: Processor<string, void, string> = async (job: Job<string>) => {
121124
const _sendUserOp = async (
122125
job: Job,
123126
queuedTransaction: QueuedTransaction,
127+
token?: string,
124128
): Promise<SentTransaction | ErroredTransaction | null> => {
125129
assert(queuedTransaction.isUserOp);
126130

@@ -218,7 +222,7 @@ const _sendUserOp = async (
218222
}
219223

220224
// Step 2: Get entrypoint address
221-
let entrypointAddress: string | undefined;
225+
let entrypointAddress: Address | undefined;
222226
if (userProvidedEntrypointAddress) {
223227
entrypointAddress = queuedTransaction.entrypointAddress;
224228
} else {
@@ -298,16 +302,21 @@ const _sendUserOp = async (
298302

299303
// Handle if `maxFeePerGas` is overridden.
300304
// Set it if the transaction will be sent, otherwise delay the job.
301-
if (overrides?.maxFeePerGas && unsignedUserOp.maxFeePerGas) {
305+
if (
306+
typeof overrides?.maxFeePerGas !== "undefined" &&
307+
unsignedUserOp.maxFeePerGas
308+
) {
302309
if (overrides.maxFeePerGas > unsignedUserOp.maxFeePerGas) {
303310
unsignedUserOp.maxFeePerGas = overrides.maxFeePerGas;
304311
} else {
305312
const retryAt = _minutesFromNow(5);
306313
job.log(
307314
`Override gas fee (${overrides.maxFeePerGas}) is lower than onchain fee (${unsignedUserOp.maxFeePerGas}). Delaying job until ${retryAt}.`,
308315
);
309-
await job.moveToDelayed(retryAt.getTime());
310-
return null;
316+
// token is required to acquire lock for delaying currently processing job: https://docs.bullmq.io/patterns/process-step-jobs#delaying
317+
await job.moveToDelayed(retryAt.getTime(), token);
318+
// throwing delayed error is required to notify bullmq worker not to complete or fail the job
319+
throw new DelayedError("Delaying job due to gas fee override");
311320
}
312321
}
313322

@@ -374,6 +383,7 @@ const _sendUserOp = async (
374383
const _sendTransaction = async (
375384
job: Job,
376385
queuedTransaction: QueuedTransaction,
386+
token?: string,
377387
): Promise<SentTransaction | ErroredTransaction | null> => {
378388
assert(!queuedTransaction.isUserOp);
379389

@@ -463,8 +473,8 @@ const _sendTransaction = async (
463473
job.log(
464474
`Override gas fee (${overrides.maxFeePerGas}) is lower than onchain fee (${populatedTransaction.maxFeePerGas}). Delaying job until ${retryAt}.`,
465475
);
466-
await job.moveToDelayed(retryAt.getTime());
467-
return null;
476+
await job.moveToDelayed(retryAt.getTime(), token);
477+
throw new DelayedError("Delaying job due to gas fee override");
468478
}
469479
}
470480

0 commit comments

Comments
 (0)