Skip to content

Commit

Permalink
Extract method to load from Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Feb 28, 2024
1 parent 913ee49 commit 1586f18
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 39 deletions.
43 changes: 4 additions & 39 deletions scripts/load-from-redis-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,9 @@ const storage = require("../storage");
const { DATA_SCOPE, ACCOUNT_SCOPE, compositeKey, ACCESS_KEY_SCOPE } = require('../storage-keys');
const { Account, BORSH_SCHEMA, AccessKey, PublicKey, FunctionCallPermission, AccessKeyPermission, FullAccessPermission } = require('../data-model');

const { withTimeCounter, getCounters, resetCounters} = require('../utils/counters');

const { createClient } = require('redis');
const { promisify } = require('util');

async function* stream({ startBlockHeight, redisUrl, streamKey, blocksPreloadPoolSize }) {
let redisClient = createClient(redisUrl, {
detect_buffers: true,
no_ready_check: true
});
// TODO: Does it need to crash as fatal error?
redisClient.on('error', (err) => console.error('Redis Client Error', err));

redisClient = {
xread: promisify(redisClient.xread).bind(redisClient),
xrange: promisify(redisClient.xrange).bind(redisClient),
// TODO: Should use quit at some point? Pass AbortController?
};

// TODO: - suffix for block range?
const { redisBlockStream } = require('../utils/redis-block-stream');

if (startBlockHeight) {
let blockHeight = startBlockHeight;
do {
const result = await redisClient.xread('COUNT', blocksPreloadPoolSize, 'BLOCK', '100', 'STREAMS', streamKey, blockHeight);
if (!result) {
continue;
}

const items = result[0][1];
for (let [id, [, block]] of items) {
yield JSON.parse(block);
blockHeight = parseInt(id.split('-')[0]) + 1;
}
} while (true);
}

}
const { withTimeCounter, getCounters, resetCounters} = require('../utils/counters');


let totalMessages = 0;
Expand Down Expand Up @@ -249,11 +214,11 @@ if (require.main === module) {

let blocksProcessed = 0;

for await (let streamerMessage of stream({
for await (let streamerMessage of redisBlockStream({
startBlockHeight: startBlockHeight || await storage.getLatestBlockHeight() || 0,
redisUrl,
streamKey,
blocksPreloadPoolSize: batchSize
batchSize
})) {
await withTimeCounter('handleStreamerMessage', async () => {
await handleStreamerMessage(streamerMessage, {
Expand Down
51 changes: 51 additions & 0 deletions utils/redis-block-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
const { createClient } = require('redis');
const { promisify } = require('util');

async function* redisBlockStream({ startBlockHeight, endBlockHeight, redisUrl, streamKey, batchSize, abortController }) {
let redisClient = createClient(redisUrl, {
detect_buffers: true,
no_ready_check: true
});
// TODO: Does it need to crash as fatal error?
redisClient.on('error', (err) => console.error('Redis Client Error', err));

redisClient = {
xread: promisify(redisClient.xread).bind(redisClient),
xrange: promisify(redisClient.xrange).bind(redisClient),
quit: promisify(redisClient.quit).bind(redisClient)
};

if (!startBlockHeight) {
throw new Error('startBlockHeight is required');
}

try {
let blockHeight = startBlockHeight;
do {
if (abortController && abortController.signal.aborted) {
break;
}

const result = await redisClient.xread('COUNT', batchSize, 'BLOCK', '100', 'STREAMS', streamKey, blockHeight);
if (!result) {
continue;
}

const items = result[0][1];
for (let [id, [, block]] of items) {
yield JSON.parse(block);
blockHeight = parseInt(id.split('-')[0]) + 1;

if (endBlockHeight && blockHeight >= endBlockHeight) {
return;
}
}
} while (!endBlockHeight || blockHeight < endBlockHeight);
} finally {
await redisClient.quit();
}
}

module.exports = {
redisBlockStream
};

0 comments on commit 1586f18

Please sign in to comment.