Skip to content

chore: Cancel userOps after 1 hour #507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/utils/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,20 @@ export const thirdwebClientId = sha256HexSync(
export const thirdwebClient = createThirdwebClient({
clientId: thirdwebClientId,
});

/**
* Helper functions to handle v4 -> v5 SDK migration.
*/

export const toTransactionStatus = (status: "success" | "reverted"): number =>
status === "success" ? 1 : 0;

export const toTransactionType = (
type: "legacy" | "eip1559" | "eip2930" | "eip4844",
): number => {
if (type === "legacy") return 0;
if (type === "eip1559") return 1;
if (type === "eip2930") return 2;
if (type === "eip4844") return 0; // Unknown
throw new Error(`Unexpected transaction type ${type}`);
};
8 changes: 3 additions & 5 deletions src/worker/tasks/updateMinedTx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
} from "../../utils/usage";
import { WebhookData, sendWebhooks } from "../../utils/webhook";

const MEMPOOL_DURATION_TIMEOUT_MS = 1000 * 60 * 60;
const CANCEL_DEADLINE_MS = 1000 * 60 * 60; // 1 hour

export const updateMinedTx = async () => {
try {
Expand Down Expand Up @@ -44,9 +44,7 @@ export const updateMinedTx = async () => {

// Cancel transactions submitted over 1 hour ago.
// @TODO: move duration to config
const sentAt = new Date(tx.sentAt!);
const ageInMilliseconds = Date.now() - sentAt.getTime();
if (ageInMilliseconds > MEMPOOL_DURATION_TIMEOUT_MS) {
if (msSince(tx.sentAt!) > CANCEL_DEADLINE_MS) {
try {
await cancelTransactionAndUpdate({
queueId: tx.id,
Expand Down Expand Up @@ -93,7 +91,7 @@ export const updateMinedTx = async () => {
chainId: tx.chainId || undefined,
transactionHash: tx.transactionHash || undefined,
provider: provider.connection.url || undefined,
msSinceSend: Date.now() - tx.sentAt!.getTime(),
msSinceSend: msSince(tx.sentAt!),
},
action: UsageEventTxActionEnum.ErrorTx,
});
Expand Down
224 changes: 124 additions & 100 deletions src/worker/tasks/updateMinedUserOps.ts
Original file line number Diff line number Diff line change
@@ -1,137 +1,162 @@
import { getBlock } from "@thirdweb-dev/sdk";
import { ERC4337EthersSigner } from "@thirdweb-dev/wallets/dist/declarations/src/evm/connectors/smart-wallet/lib/erc4337-signer";
import {
defineChain,
eth_getBlockByNumber,
eth_getTransactionByHash,
eth_getTransactionReceipt,
getRpcClient,
} from "thirdweb";
import { prisma } from "../../db/client";
import { getSentUserOps } from "../../db/transactions/getSentUserOps";
import { updateTx } from "../../db/transactions/updateTx";
import { TransactionStatus } from "../../server/schemas/transaction";
import { getSdk } from "../../utils/cache/getSdk";
import { msSince } from "../../utils/date";
import { logger } from "../../utils/logger";
import {
thirdwebClient,
toTransactionStatus,
toTransactionType,
} from "../../utils/sdk";
import {
ReportUsageParams,
UsageEventTxActionEnum,
reportUsage,
} from "../../utils/usage";
import { WebhookData, sendWebhooks } from "../../utils/webhook";

const CANCEL_DEADLINE_MS = 1000 * 60 * 60; // 1 hour

export const updateMinedUserOps = async () => {
try {
const sendWebhookForQueueIds: WebhookData[] = [];
const reportUsageForQueueIds: ReportUsageParams[] = [];
await prisma.$transaction(
async (pgtx) => {
const userOps = await getSentUserOps({ pgtx });

if (userOps.length === 0) {
return;
}

// TODO: Improve spaghetti code...
const updatedUserOps = (
await Promise.all(
userOps.map(async (userOp) => {
const sdk = await getSdk({
chainId: parseInt(userOp.chainId!),
walletAddress: userOp.signerAddress!,
accountAddress: userOp.accountAddress!,
});
const signer = sdk.getSigner() as ERC4337EthersSigner;
const promises = userOps.map(async (userOp) => {
if (
!userOp.sentAt ||
!userOp.signerAddress ||
!userOp.accountAddress ||
!userOp.userOpHash
) {
return;
}

const userOpReceipt =
await signer.smartAccountAPI.getUserOpReceipt(
signer.httpRpcClient,
userOp.userOpHash!,
3000,
);
const sdk = await getSdk({
chainId: parseInt(userOp.chainId),
walletAddress: userOp.signerAddress,
accountAddress: userOp.accountAddress,
});
const signer = sdk.getSigner() as ERC4337EthersSigner;

if (!userOpReceipt) {
// If no receipt was received, return undefined to filter out tx
return undefined;
}
const _sdk = await getSdk({
chainId: parseInt(userOp.chainId!),
// Get userOp receipt.
// If no receipt, try again later (or cancel userOps after 1 hour).
// Else the transaction call was submitted to mempool.
const userOpReceipt = await signer.smartAccountAPI.getUserOpReceipt(
signer.httpRpcClient,
userOp.userOpHash,
3_000, // 3 seconds
);
if (!userOpReceipt) {
if (msSince(userOp.sentAt) > CANCEL_DEADLINE_MS) {
await updateTx({
pgtx,
queueId: userOp.id,
data: {
status: TransactionStatus.Errored,
errorMessage: "Transaction timed out.",
},
});
}
return;
}

const tx = await signer.provider!.getTransaction(
userOpReceipt.transactionHash,
);
const txReceipt = await _sdk
.getProvider()
.getTransactionReceipt(tx.hash);
const minedAt = new Date(
(
await getBlock({
block: tx.blockNumber!,
network: sdk.getProvider(),
})
).timestamp * 1000,
);
const chain = defineChain(parseInt(userOp.chainId));
const rpcRequest = getRpcClient({
client: thirdwebClient,
chain,
});

return {
...userOp,
blockNumber: tx.blockNumber!,
minedAt,
onChainTxStatus: txReceipt.status,
transactionHash: txReceipt.transactionHash,
transactionType: tx.type,
gasLimit: tx.gasLimit.toString(),
maxFeePerGas: tx.maxFeePerGas?.toString(),
maxPriorityFeePerGas: tx.maxPriorityFeePerGas?.toString(),
provider: signer.httpRpcClient.bundlerUrl,
};
}),
)
).filter((userOp) => !!userOp);
// Get the transaction receipt.
// If no receipt, try again later.
// Else the transaction call was confirmed onchain.
const transaction = await eth_getTransactionByHash(rpcRequest, {
hash: userOpReceipt.transactionHash as `0x${string}`,
});
const transactionReceipt = await eth_getTransactionReceipt(
rpcRequest,
{ hash: transaction.hash },
);
if (!transactionReceipt) {
// If no receipt, try again later.
return;
}

await Promise.all(
updatedUserOps.map(async (userOp) => {
await updateTx({
pgtx,
queueId: userOp!.id,
data: {
status: TransactionStatus.Mined,
minedAt: userOp!.minedAt,
blockNumber: userOp!.blockNumber,
onChainTxStatus: userOp!.onChainTxStatus,
transactionHash: userOp!.transactionHash,
transactionType: userOp!.transactionType || undefined,
gasLimit: userOp!.gasLimit || undefined,
maxFeePerGas: userOp!.maxFeePerGas || undefined,
maxPriorityFeePerGas: userOp!.maxPriorityFeePerGas || undefined,
gasPrice: userOp!.gasPrice || undefined,
},
let minedAt = new Date();
try {
const block = await eth_getBlockByNumber(rpcRequest, {
blockNumber: transactionReceipt.blockNumber,
includeTransactions: false,
});
minedAt = new Date(Number(block.timestamp) * 1000);
} catch (e) {}

logger({
service: "worker",
level: "info",
queueId: userOp!.id,
message: `Updated with receipt`,
});
sendWebhookForQueueIds.push({
queueId: userOp!.id,
// Update the userOp transaction as mined.
await updateTx({
pgtx,
queueId: userOp.id,
data: {
status: TransactionStatus.Mined,
});
reportUsageForQueueIds.push({
input: {
fromAddress: userOp!.fromAddress || undefined,
toAddress: userOp!.toAddress || undefined,
value: userOp!.value || undefined,
chainId: userOp!.chainId || undefined,
userOpHash: userOp!.userOpHash || undefined,
onChainTxStatus: userOp!.onChainTxStatus,
functionName: userOp!.functionName || undefined,
extension: userOp!.extension || undefined,
provider: userOp!.provider || undefined,
msSinceSend:
userOp!.minedAt.getTime() - userOp!.sentAt!.getTime(),
},
action: UsageEventTxActionEnum.MineTx,
});
}),
);
minedAt,
blockNumber: Number(transactionReceipt.blockNumber),
onChainTxStatus: toTransactionStatus(transactionReceipt.status),
transactionHash: transactionReceipt.transactionHash,
transactionType: toTransactionType(transaction.type),
gasLimit: userOp.gasLimit ?? undefined,
maxFeePerGas: transaction.maxFeePerGas?.toString(),
maxPriorityFeePerGas:
transaction.maxPriorityFeePerGas?.toString(),
gasPrice: transaction.gasPrice?.toString(),
},
});

logger({
service: "worker",
level: "info",
queueId: userOp.id,
message: "Updated with receipt",
});
sendWebhookForQueueIds.push({
queueId: userOp.id,
status: TransactionStatus.Mined,
});
reportUsageForQueueIds.push({
input: {
fromAddress: userOp.fromAddress ?? undefined,
toAddress: userOp.toAddress ?? undefined,
value: userOp.value ?? undefined,
chainId: userOp.chainId,
userOpHash: userOp.userOpHash ?? undefined,
onChainTxStatus: toTransactionStatus(transactionReceipt.status),
functionName: userOp.functionName ?? undefined,
extension: userOp.extension ?? undefined,
provider: signer.httpRpcClient.bundlerUrl,
msSinceSend: msSince(userOp.sentAt!),
},
action: UsageEventTxActionEnum.MineTx,
});
});

await Promise.all(promises);
},
{
timeout: 5 * 60000,
timeout: 5 * 60 * 1000, // 5 minutes
},
);

Expand All @@ -141,9 +166,8 @@ export const updateMinedUserOps = async () => {
logger({
service: "worker",
level: "error",
message: `Failed to update receipts`,
message: "Failed to update receipts",
error: err,
});
return;
}
};
Loading