Skip to content

Commit

Permalink
Prototype writing indices into custom binary format
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Jan 25, 2024
1 parent 89c60fe commit db8a696
Showing 1 changed file with 155 additions and 16 deletions.
171 changes: 155 additions & 16 deletions scripts/build-raw-near-lake-index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

const compressing = require('compressing');

const fs = require('fs');
const { writeFile } = require('fs/promises');
const zlib = require('zlib');
const tar = require('tar-stream');

Expand All @@ -16,18 +15,29 @@ async function main() {
const dstDir = `./lake-data/${bucketName}`;
// TODO: Make shards dynamic, allow to filter by shard
// TODO: Should index smth from 'block' as well? (e.g. block.header.timestamp)
const folders = ['0', '1', '2', '3'];
const shards = ['0', '1', '2', '3'];

for (let shard of shards) {
console.log('Processing shard', shard);

const unmerged = [];
for await (const changesByAccount of changesByAccountStream(shard, startBlockNumber, endBlockNumber)) {
unmerged.push(changesByAccount);
}

for (let folder of folders) {
console.log('Processing shard', folder);
const allChangesByAccount = mergeChangesByAccount(unmerged);
// console.log('allChangesByAccount', allChangesByAccount);

await writeChanges(`${dstDir}/${shard}`, allChangesByAccount);
}

async function *changesByAccountStream(shard, startBlockNumber, endBlockNumber) {
for (let blockNumber = startBlockNumber; blockNumber < endBlockNumber; blockNumber += FILES_PER_ARCHIVE) {
console.log('blockNumber', blockNumber, 'endBlockNumber', endBlockNumber);
const blockHeight = normalizeBlockHeight(blockNumber);
const [prefix1, prefix2] = blockHeight.match(/^(.{6})(.{3})/).slice(1);
const inFolder = `${dstDir}/${folder}/${prefix1}/${prefix2}`;
const inFolder = `${dstDir}/${shard}/${prefix1}/${prefix2}`;
const inFile = `${inFolder}/${blockHeight}.tgz`;
// const uncompressStream = new compressing.tgz.UncompressStream({ source: inFile });

console.log('inFile', inFile);

Expand All @@ -50,10 +60,11 @@ async function main() {
});

const { state_changes, chunk, ...json } = JSON.parse(data.toString('utf-8'));
if (!chunk) {
continue;
}

const blockHeight = chunk.header.height_included;
console.log('blockHeight', blockHeight);
// console.log('json', json);
// console.log('state_changes', state_changes);

for (let { type, change } of state_changes) {
const { account_id, ...changeData } = change;
Expand All @@ -71,27 +82,155 @@ async function main() {
}
}

console.log('changesByAccount', changesByAccount);
yield changesByAccount;
}
}

}

const MIN_CHANGES_PER_FILE = 1000;

async function writeChanges(outFolder, changesByAccount) {
console.log('changesByAccount', changesByAccount);
for (let accountId in changesByAccount) {
const accountChanges = changesByAccount[accountId];
const totalChanges = Object.values(accountChanges).reduce((sum, changes) => sum + changes.length, 0);
if (totalChanges < MIN_CHANGES_PER_FILE) {
continue;
}

await writeChangesFile(`${outFolder}/${accountId}.dat`, { [accountId]: accountChanges });
delete changesByAccount[accountId];
}


await writeChangesFile(`${outFolder}/changes.dat`, changesByAccount);
}

const PAGE_SIZE = 64 * 1024;

async function writeChangesFile(outPath, changesByAccount) {
console.log('writeChangesFile', outPath, Object.keys(changesByAccount).length);

const outStream = fs.createWriteStream(outPath);
const buffer = Buffer.alloc(PAGE_SIZE);
let offset = 0;
for (let accountId in changesByAccount) {
offset = buffer.writeUInt8(accountId.length, offset);
offset += buffer.write(accountId, offset);

const accountChanges = changesByAccount[accountId];

for (let key in accountChanges) {
const allChanges = accountChanges[key];

// NOTE: Changes arrays are split into chunks of 0xFF items
for (let i = 0; i < allChanges.length + 0xFF; i += 0xFF) {
const changes = allChanges.slice(i, i + 0xFF);

// TODO: Check max key length
offset = buffer.writeUInt8(key.length, offset);
offset += buffer.write(key, offset);

offset = buffer.writeUInt8(changes.length, offset);
for (let change of changes) {
offset = buffer.writeInt32LE(change, offset);
}

// TODO: Adjust this as needed
if (offset > PAGE_SIZE - 1000) {
console.log('Writing', outPath, offset);
await new Promise((resolve, reject) => {
outStream.write(buffer.slice(0, offset), e => e ? reject(e) : resolve());
});
offset = 0;
}
}
}
}

await new Promise((resolve, reject) => {
console.log('Writing', outPath, offset);
outStream.end(buffer.slice(0, offset), e => e ? reject(e) : resolve());
});
}

function reduceRecursive(items, fn) {
if (items.length === 0) {
throw new Error('Cannot reduce empty list');
}

if (items.length === 1) {
return items[0];
}

return fn(
reduceRecursive(items.slice(0, items.length / 2), fn),
reduceRecursive(items.slice(items.length / 2), fn));
}

function merge(a, b, fn) {
for (k in b) {
if (a[k]) {
a[k] = fn(a[k], b[k]);
} else {
a[k] = b[k];
}
}
return a;
}

function mergeChangesByAccount(changesByAccountList) {
return reduceRecursive(changesByAccountList, (a, b) => merge(a, b, mergeChanges));
}

function mergeChanges(a, b) {
return merge(a, b, mergeSortedArrays);
}

function mergeSortedArrays(a, b) {
const result = [];
let i = 0;
let j = 0;
while (i < a.length && j < b.length) {
if (a[i] < b[j]) {
result.push(a[i]);
i++;
} else if (a[i] > b[j]) {
result.push(b[j]);
j++;
} else {
result.push(a[i]);
i++;
j++;
}
}

for (; i < a.length; i++) {
result.push(a[i]);
}

for (; j < b.length; j++) {
result.push(b[j]);
}

return result;
}

function changeKey(type, changeData) {
// TODO: Adjust this as needed
switch (type) {
case 'account_update':
case 'account_deletion':
return type;
return 'a:';
case 'access_key_update':
case 'access_key_deletion':
return `${type}:${changeData.public_key}`;
return `k:${changeData.public_key}`;
case 'data_update':
case 'data_deletion':
return `${type}:${changeData.key_base64}`;
return `d:${changeData.key_base64}`;
case 'contract_code_update':
case 'contract_code_deletion':
return `${type}`;
return 'c:';
default:
throw new Error(`Unknown type ${type}`);
}
Expand Down

0 comments on commit db8a696

Please sign in to comment.