Skip to content

Commit

Permalink
Implement load-from-source to support both redis and lake
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Mar 9, 2024
1 parent e31321d commit 126a983
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 30 deletions.
4 changes: 2 additions & 2 deletions scripts/build-raw-near-lake-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const { mkdir, writeFile, access } = require('fs/promises');

const sha256 = require('../utils/sha256');
const { writeChangesFile, readChangesFile, changeKey, mergeChangesFiles } = require('../storage/lake/changes-index');
const { readBlocks } = require('../storage/lake/archive');
const { readShardBlocks } = require('../source/lake');

const debug = require('debug')('build-index');

Expand Down Expand Up @@ -34,7 +34,7 @@ async function main() {
const end = Math.min(start + BLOCKS_PER_BATCH, endBlockNumber);
console.log('Processing batch', start, end);

const blocksStream = readBlocks(dstDir, shard, start, end);
const blocksStream = readShardBlocks({ dataDir: dstDir, shard, startBlockHeight: start, endBlockHeight: end });
const parseBlocksStream = mapStream(blocksStream, ({ data }) => JSON.parse(data.toString('utf-8')));

let changesByAccountList = [];
Expand Down
40 changes: 28 additions & 12 deletions scripts/load-from-redis-stream.js → scripts/load-from-source.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const storage = require("../storage");
const { DATA_SCOPE, ACCOUNT_SCOPE, compositeKey, ACCESS_KEY_SCOPE } = require('../storage-keys');
const { Account, BORSH_SCHEMA, AccessKey, PublicKey, FunctionCallPermission, AccessKeyPermission, FullAccessPermission } = require('../data-model');

const { redisBlockStream } = require('../utils/redis-block-stream');

const { withTimeCounter, getCounters, resetCounters} = require('../utils/counters');

Expand All @@ -30,7 +29,7 @@ async function handleStreamerMessage(streamerMessage, options = {}) {
console.log(new Date(), `Block #${blockHeight} Shards: ${streamerMessage.shards.length}`,
`Speed: ${totalMessages * 1000 / (Date.now() - timeStarted)} blocks/second`,
`Lag: ${formatDuration(Date.now() - (timestamp / 1000000))}`);

const pipeline = [
dumpChanges && dumpChangesToStorage,
].filter(Boolean);
Expand Down Expand Up @@ -115,7 +114,7 @@ async function handleChange({ batch, blockHeight, type, change, keepFromBlockHei
case 'access_key_update': {
const { public_key: publicKeyStr, access_key: {
nonce,
permission
permission
} } = change;
// NOTE: nonce.toString() is a hack to make stuff work, near-lake shouldn't use number for u64 values as it results in data loss
const accessKey = new AccessKey({ nonce: nonce.toString(), permission: new AccessKeyPermission(
Expand Down Expand Up @@ -160,18 +159,27 @@ if (require.main === module) {

const yargs = require('yargs/yargs');
yargs(process.argv.slice(2))
.command(['load-from-redis-stream <redis-url> [stream-key]', '$0'],
'loads data from NEAR Lake S3 into other datastores',
.command(['load-from-source', '$0'],
'loads data from given source into fast-near compatible storage',
yargs => yargs
.option('start-block-height', {
describe: 'block height to start loading from. By default starts from latest known block height or genesis.',
number: true
})
.describe('redis-url', 'URL of the Redis server to stream data from')
.option('redis-url', {
describe: 'URL of the Redis server to stream data from',
// TODO: Require only when source specified
// required: true
})
.option('stream-key', {
describe: 'Redis stream key to stream data from',
default: 'final_blocks',
})
.option('shards', {
describe: 'Shards to process. Defaults to 0..3',
default: ['0', '1', '2', '3'],
array: true,
})
.option('include', {
describe: 'include only accounts matching this glob pattern. Can be specified multiple times.',
array: true
Expand All @@ -197,28 +205,36 @@ if (require.main === module) {
.option('dump-changes', {
describe: 'Dump state changes into storage. Use FAST_NEAR_STORAGE_TYPE to specify storage type. Defaults to `redis`.',
boolean: true
})
.option('source', {
describe: 'Source of the data. Defaults to `redis-blocks`.',
choices: ['redis-blocks', 'lake'],
default: 'redis-blocks'
}),
async argv => {

const {
startBlockHeight,
redisUrl,
streamKey,
batchSize,
historyLength,
limit,
include,
exclude,
dumpChanges,
source,
...otherOptions
} = argv;
console.log('otherOptions', otherOptions);

let blocksProcessed = 0;

for await (let streamerMessage of redisBlockStream({
const { readBlocks } = require(`../source/${source}`);

for await (let streamerMessage of readBlocks({
startBlockHeight: startBlockHeight || await storage.getLatestBlockHeight() || 0,
redisUrl,
streamKey,
batchSize
endBlockHeight: limit ? startBlockHeight + limit : undefined,
batchSize,
...otherOptions
})) {
await withTimeCounter('handleStreamerMessage', async () => {
await handleStreamerMessage(streamerMessage, {
Expand Down
52 changes: 43 additions & 9 deletions storage/lake/archive.js → source/lake.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,59 @@ const zlib = require('zlib');
const tar = require('tar-stream');
const { pipeline } = require('node:stream/promises')

const debug = require('debug')('source:lake');

const FILES_PER_ARCHIVE = 5;
const BLOCKS_PER_LOG = 1000;

async function *readBlocks(dataDir, shard, startBlockNumber, endBlockNumber) {
async function *readBlocks({ dataDir, shards, startBlockHeight, endBlockHeight }) {
if (!shards.includes('block')) {
shards = [...shards, 'block'];
}
for (let baseBlockHeight = startBlockHeight; baseBlockHeight < endBlockHeight; baseBlockHeight += FILES_PER_ARCHIVE) {
const blocks = [...Array(FILES_PER_ARCHIVE)].map(() => ({ shards: shards.slice(0, -1).map(() => ({}))}));
for (let i = 0; i < shards.length; i++) {
const shard = shards[i];
const batch = await readShardBlocksBatch({ blockNumber: baseBlockHeight, dataDir, shard });
for (const { data, blockHeight } of batch) {
const block = blocks[blockHeight - baseBlockHeight];
if (shard === 'block') {
block.block = JSON.parse(data.toString('utf8'));
} else {
block.shards[i] = JSON.parse(data.toString('utf8'));
}
}
}

yield *blocks.filter(block => block.block);
}
}

// TODO: Update the build index script / lake storage accordingly

async function *readShardBlocks({ dataDir, shard, startBlockHeight: startBlockNumber, endBlockHeight: endBlockNumber }) {
startBlockNumber = startBlockNumber ? Math.floor(startBlockNumber / FILES_PER_ARCHIVE) * FILES_PER_ARCHIVE : 0;
debug('startBlockHeight:', startBlockNumber, 'endBlockHeight:', endBlockNumber, 'dataDir:', dataDir, 'shard:', shard);

const startTime = Date.now();
debug('startTime:', startTime);
for (let blockNumber = startBlockNumber; blockNumber < endBlockNumber; blockNumber += FILES_PER_ARCHIVE) {
const blockHeight = normalizeBlockHeight(blockNumber);
const [prefix1, prefix2] = blockHeight.match(/^(.{6})(.{3})/).slice(1);
const inFolder = `${dataDir}/${shard}/${prefix1}/${prefix2}`;
const inFile = `${inFolder}/${blockHeight}.tgz`;

if (blockNumber > startBlockNumber && blockNumber % BLOCKS_PER_LOG === 0) {
const blocksPerSecond = (blockNumber - startBlockNumber) / ((Date.now() - startTime) / 1000);
console.log(`Reading block ${blockNumber}. Speed: ${blocksPerSecond.toFixed(2)} blocks/s. ETA: ${(endBlockNumber - blockNumber) / blocksPerSecond} s`);
}

const batch = await readShardBlocksBatch({ blockNumber, dataDir, shard });
yield *batch;
}
}

async function readShardBlocksBatch({ blockNumber, dataDir, shard }) {
const blockHeight = normalizeBlockHeight(blockNumber);
const [prefix1, prefix2] = blockHeight.match(/^(.{6})(.{3})/).slice(1);
const inFolder = `${dataDir}/${shard}/${prefix1}/${prefix2}`;
const inFile = `${inFolder}/${blockHeight}.tgz`;

const extract = tar.extract();
const gunzip = zlib.createGunzip();
const readStream = fs.createReadStream(inFile);
Expand All @@ -44,17 +79,16 @@ async function *readBlocks(dataDir, shard, startBlockNumber, endBlockNumber) {
}
});
await pipelinePromise;
yield *results;
return results;
} finally {
// NOTE: After analysis with why-is-node-running looks like at least Gunzip is not properly closed
gunzip.close();
readStream.close();
}
}
}

function normalizeBlockHeight(number) {
return number.toString().padStart(12, '0');
}

module.exports = { readBlocks };
module.exports = { readBlocks, readShardBlocks };
8 changes: 5 additions & 3 deletions utils/redis-block-stream.js → source/redis-blocks.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const { createClient } = require('redis');
const { promisify } = require('util');

const debug = require('debug')('source:redis-blocks');

const RETRY_TIMEOUT = 1000;
async function* redisBlockStream({ startBlockHeight, endBlockHeight, redisUrl, streamKey = 'final_blocks', batchSize, abortController }) {
console.log('redisBlockStream startBlockHeight:', startBlockHeight);
async function* readBlocks({ startBlockHeight, endBlockHeight, redisUrl, streamKey = 'final_blocks', batchSize, abortController }) {
debug('startBlockHeight:', startBlockHeight, 'endBlockHeight:', endBlockHeight, 'redisUrl:', redisUrl, 'streamKey:', streamKey, 'batchSize:', batchSize);
let redisClient = createClient(redisUrl, {
detect_buffers: true,
no_ready_check: true
Expand Down Expand Up @@ -59,5 +61,5 @@ async function* redisBlockStream({ startBlockHeight, endBlockHeight, redisUrl, s
}

module.exports = {
redisBlockStream
readBlocks
};
4 changes: 2 additions & 2 deletions storage/lake.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const fs = require('fs/promises');
const { readChangesFile, changeKey, changeValue } = require('./lake/changes-index');
const { readBlocks } = require('./lake/archive');
const { readShardBlocks } = require('../source/lake');

class LakeStorage {

Expand Down Expand Up @@ -46,7 +46,7 @@ class LakeStorage {
const { accountId, key } = parseCompKey(compKey);
const shard = shardForAccount(accountId);

for await (const { data, blockHeight: currentHeight } of readBlocks(this.dataDir, shard, blockHeight, blockHeight + 1)) {
for await (const { data, blockHeight: currentHeight } of readShardBlocks({ dataDir: this.dataDir, shard, startBlockNumber: blockHeight, endBlockNumber: blockHeight + 1 })) {
if (currentHeight !== blockHeight) {
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions utils/submit-transaction.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { redisBlockStream } = require('./redis-block-stream');
const { readBlocks} = require('../source/redis-blocks');
const { SignedTransaction, BORSH_SCHEMA } = require('../data-model');
const { deserialize } = require('borsh');
const LRU = require('lru-cache');
Expand Down Expand Up @@ -57,7 +57,7 @@ async function submitTransaction(transactionData) {

const redisUrl = process.env.BLOCKS_REDIS_URL;
debug('redisUrl:', redisUrl);
blockStream = redisBlockStream({ startBlockHeight, redisUrl, batchSize: 1 });
blockStream = readBlocks({ startBlockHeight, redisUrl, batchSize: 1 });
await new Promise(async (resolve, reject) => {
(async () => {
let streamStarted = false;
Expand Down

0 comments on commit 126a983

Please sign in to comment.