Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sharded storage #15

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { RedisStorage } = require('./redis');
const { LMDBStorage } = require('./lmdb-embedded');
const { LakeStorage } = require('./lake');
const { CachedStorage } = require('./cached');
const { ShardedStorage } = require('./sharded');

const storageType = process.env.FAST_NEAR_STORAGE_TYPE || 'redis';

Expand All @@ -15,7 +16,8 @@ const debugStorage = new DebugStorage(createStorage(storageType));
function createStorage(storageType) {
switch (storageType) {
case 'lmdb':
return new LMDBStorage();
// return new LMDBStorage({ path: `./lmdb-data` });
return new ShardedStorage([...Array(1)].map((_, i) => new LMDBStorage({ path: `./lmdb-data/${i}` })));
case 'redis':
return new RedisStorage();
case 'lake':
Expand Down
2 changes: 1 addition & 1 deletion storage/lake.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,4 @@ async function fileExists(file) {
}
}

module.exports = { LakeStorage };
module.exports = { LakeStorage, parseCompKey };
4 changes: 2 additions & 2 deletions storage/lmdb-embedded.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ function truncatedKey(compKey) {
}

class LMDBStorage {
constructor() {
constructor({ path = LMDB_PATH }) {
this.db = open({
path: LMDB_PATH,
path,
keyEncoder,
noSync: true, // NOTE: YOLO, as all data is recoverable from the blockchain
// compression: true, // TODO: Check if this is worth it
Expand Down
117 changes: 117 additions & 0 deletions storage/sharded.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
const sha256 = require('../utils/sha256');

const { parseCompKey } = require('./lake');

class ShardedStorage {
constructor(shards, accountIdToShardId = hashShard(shards.length)) {
this.shards = shards;
this.accountIdToShardId = accountIdToShardId;
}

async getLatestBlockHeight() {
return await this.shards[0].getLatestBlockHeight();
}

async setLatestBlockHeight(blockHeight) {
await Promise.all(this.shards.map(shard => shard.setLatestBlockHeight(blockHeight)));
}

async getBlockTimestamp(blockHeight) {
return await this.shards[0].getBlockTimestamp(blockHeight);
}

async setBlockTimestamp(blockHeight, timestamp) {
await Promise.all(this.shards.map(shard => shard.setBlockTimestamp(blockHeight, timestamp)));
}

async getLatestDataBlockHeight(compKey, blockHeight) {
const shardId = this.compKeyToShardId(compKey);
return await this.shards[shardId].getLatestDataBlockHeight(compKey, blockHeight);
}

async getData(compKey, blockHeight) {
const shardId = this.compKeyToShardId(compKey);
return await this.shards[shardId].getData(compKey, blockHeight);
}

async getLatestData(compKey, blockHeight) {
const shardId = this.compKeyToShardId(compKey);
return await this.shards[shardId].getLatestData(compKey, blockHeight);
}

async setData(batch, scope, accountId, storageKey, blockHeight, data) {
const shardId = this.accountIdToShardId(accountId);
if (batch.shardId && batch.shardId != shardId) {
return;
}

return await this.shards[shardId].setData(batch.batch, scope, accountId, storageKey, blockHeight, data);
}

async deleteData(batch, scope, accountId, storageKey, blockHeight) {
const shardId = this.accountIdToShardId(accountId);
if (batch.shardId && batch.shardId != shardId) {
return;
}

return await this.shards[shardId].deleteData(batch, scope, accountId, storageKey, blockHeight);
}

async getBlob(hash) {
const shardId = parseInt(hash.slice(0, 2), 16) % this.shards.length;
return await this.shards[shardId].getBlob(hash);
}

async setBlob(batch, data) {
// TODO: Refactor to avoid double hashing even in debug mode.
const hash = sha256(data);

const shardId = hash.readUInt32LE(0) % this.shards.length;
if (batch.shardId && batch.shardId != shardId) {
return;
}

return await this.shards[shardId].setBlob(hash, data);
}

async cleanOlderData(batch, key, blockHeight) {
await Promise.all(this.shards.map(shard => shard.cleanOlderData(batch, key, blockHeight)));
}

async scanAllKeys(iterator) {
// TODO: Iterate over all shards in sequence
throw new Error('Not implemented');
}

async scanDataKeys(accountId, blockHeight, keyPattern, iterator, limit) {
const shardId = this.accountIdToShardId(accountId);
return await this.shards[shardId].scanDataKeys(accountId, blockHeight, keyPattern, iterator, limit);
}

async writeBatch(fn) {
await Promise.all(this.shards.map((shard, shardId) => shard.writeBatch(batch => fn({ batch, shardId }))));
}

async clearDatabase() {
await Promise.all(this.shards.map(shard => shard.clearDatabase()));
}

async closeDatabase() {
await Promise.all(this.shards.map(shard => shard.closeDatabase()));
}

compKeyToShardId(compKey) {
const { accountId } = parseCompKey(compKey);
return this.accountIdToShardId(accountId);
}
}

function hashShard(count) {
return function (accountId) {
const hash = sha256(Buffer.from(accountId));
return hash.readUInt32LE(0) % count;
};
}

module.exports = { ShardedStorage, hashShard };

Loading