Skip to content

Commit

Permalink
Operate on block level when listening for new transactions submitted
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Mar 1, 2024
1 parent 7f77f5c commit 8710542
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 39 deletions.
47 changes: 27 additions & 20 deletions utils/submit-transaction.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const { redisBlockStream } = require('./redis-block-stream');
const { transactionStream } = require('./transaction-stream');
const { SignedTransaction, BORSH_SCHEMA } = require('../data-model');
const { deserialize } = require('borsh');
const LRU = require('lru-cache');
Expand Down Expand Up @@ -41,12 +40,12 @@ async function txStatus(txHash, accountId) {
return await sendJsonRpc('tx', [txHash, accountId]);
}

let txStream;
let blockStream;
const txEventEmitter = new EventEmitter();
const txCache = new LRU({ max: 10000, maxAge: 1000 * 60 * 60 });

async function submitTransaction(transactionData) {
if (!txStream) {
if (!blockStream) {
const startBlockHeight = (await (await fetch(`${NODE_URL}/status`, {
method: 'GET',
headers: {
Expand All @@ -57,21 +56,29 @@ async function submitTransaction(transactionData) {

const redisUrl = process.env.BLOCKS_REDIS_URL;
debug('redisUrl:', redisUrl);
const blocksStream = redisBlockStream({ startBlockHeight, redisUrl, batchSize: 1 });

txStream = transactionStream(blocksStream);
blockStream = redisBlockStream({ startBlockHeight, redisUrl, batchSize: 1 });
await new Promise(async (resolve, reject) => {
(async () => {
let streamStarted = false;
for await (const { transaction, outcome } of txStream) {
for await (const { block, shards } of blockStream) {
if (!streamStarted) {
streamStarted = true;
// NOTE: We are waiting for block stream to start and then it continues
resolve();
}

const txHash = transaction.hash;
txCache.set(txHash, { transaction, outcome });
txEventEmitter.emit('tx', { transaction, outcome });
for (const { chunk } of shards) {
if (!chunk) {
continue;
}

for await (const { transaction, outcome } of chunk.transactions) {
const txHash = transaction.hash;
txCache.set(txHash, { transaction, outcome });
}
}
debug('Block:', block.header.height);
txEventEmitter.emit('block');
}
})().catch(reject);
});
Expand All @@ -83,17 +90,17 @@ async function submitTransaction(transactionData) {
return txCache.get(txHash);
}

const SUBMIT_TX_STATUS_CHECK_TIMEOUT = 1000 * 10;
const SUBMIT_TX_STATUS_CHECK_TIMEOUT = 1000 * 1;
const SUBMIT_TOTAL_TIMEOUT = 1000 * 45;
const { transaction, outcome } = await new Promise((resolve, reject) => {
let txCallback;
const subscribe = () => txEventEmitter.on('tx', txCallback);
const unsubscribe = () => (txEventEmitter.off('tx', txCallback), txCallback = null);
setTimeout(() => txCallback && (unsubscribe(), reject(new Error(`Taking more than ${SUBMIT_TOTAL_TIMEOUT}ms to submit transaction`))), SUBMIT_TOTAL_TIMEOUT);
let blockCallback;
const subscribe = () => txEventEmitter.on('block', blockCallback);
const unsubscribe = () => (txEventEmitter.off('block', blockCallback), blockCallback = null);
setTimeout(() => blockCallback && (unsubscribe(), reject(new Error(`Taking more than ${SUBMIT_TOTAL_TIMEOUT}ms to submit transaction`))), SUBMIT_TOTAL_TIMEOUT);

// NOTE: This is necessary in case transaction already landed before but we don't know
setTimeout(async function checkStatus() {
if (!txCallback) return;
if (!blockCallback) return;

const { transaction: { signerId } } = deserialize(BORSH_SCHEMA, SignedTransaction, transactionData);
debug('Checking txStatus', txHash, signerId);
Expand All @@ -102,7 +109,7 @@ async function submitTransaction(transactionData) {
const { transaction, transaction_outcome } = result;
debug('txStatus result:', result);

if (txCallback) {
if (blockCallback) {
unsubscribe();
resolve({ transaction, outcome: transaction_outcome });
}
Expand All @@ -120,10 +127,10 @@ async function submitTransaction(transactionData) {
}
}, SUBMIT_TX_STATUS_CHECK_TIMEOUT);

txCallback = ({ transaction, outcome }) => {
if (transaction.hash == txHash) {
blockCallback = () => {
if (txCache.has(txHash)) {
unsubscribe();
resolve({ transaction, outcome });
resolve(txCache.get(txHash));
}
};
subscribe();
Expand Down
19 changes: 0 additions & 19 deletions utils/transaction-stream.js

This file was deleted.

0 comments on commit 8710542

Please sign in to comment.