diff --git a/config/config.go b/config/config.go index f11b0ab..cfdaf9c 100644 --- a/config/config.go +++ b/config/config.go @@ -87,6 +87,9 @@ type StateStoreConfig struct { // Whether to keep last version of a key during pruning or delete // defaults to true KeepLastVersion bool `mapstructure:"keep-last-version"` + + // Number of versions per shard + VersionShardSize int64 `mapstructure:"version-shard-size"` } func DefaultStateCommitConfig() StateCommitConfig { diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index cac2296..6a8a904 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "path/filepath" "strings" "sync" "time" @@ -46,8 +47,14 @@ var ( defaultWriteOpts = pebble.NoSync ) +type VersionedDB struct { + startVersion int64 + endVersion int64 // exclusive + db *pebble.DB +} + type Database struct { - storage *pebble.DB + dbs []*VersionedDB asyncWriteWG sync.WaitGroup config config.StateStoreConfig // Earliest version for db after pruning @@ -70,8 +77,51 @@ type VersionedChangesets struct { } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { + versionShardSize := config.VersionShardSize + database := &Database{ + dbs: []*VersionedDB{}, + config: config, + pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), + } + + // Initialize the first shard + startVersion := int64(0) + endVersion := startVersion + versionShardSize + versionedDB, err := openVersionedDB(dataDir, startVersion, endVersion, database.getPebbleOptions()) + if err != nil { + return nil, err + } + + database.dbs = append(database.dbs, versionedDB) + + earliestVersion, err := retrieveEarliestVersion(versionedDB.db) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + database.earliestVersion = earliestVersion + + if config.DedicatedChangelog { + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: 300 * time.Second, + }, + ) + database.streamHandler = streamHandler + go database.writeAsyncInBackground() + } + + return database, nil +} + +func (db *Database) getPebbleOptions() *pebble.Options { cache := pebble.NewCache(1024 * 1024 * 32) - defer cache.Unref() + // We should not defer cache.Unref() here because we need the cache to persist + // for the lifetime of the database opts := &pebble.Options{ Cache: cache, Comparer: MVCCComparer, @@ -91,7 +141,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { l.IndexBlockSize = 256 << 10 // 256 KB l.FilterPolicy = bloom.FilterPolicy(10) l.FilterType = pebble.TableFilter - // TODO: Consider compression only for specific layers like bottommost l.Compression = pebble.ZstdCompression if i > 0 { l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2 @@ -103,43 +152,23 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { opts.FlushSplitBytes = opts.Levels[0].TargetFileSize opts = opts.EnsureDefaults() - db, err := pebble.Open(dataDir, opts) - if err != nil { - return nil, fmt.Errorf("failed to open PebbleDB: %w", err) - } + return opts +} - earliestVersion, err := retrieveEarliestVersion(db) +func openVersionedDB(dataDir string, startVersion, endVersion int64, opts *pebble.Options) (*VersionedDB, error) { + dbDir := filepath.Join(dataDir, fmt.Sprintf("%d_%d", startVersion, endVersion)) + db, err := pebble.Open(dbDir, opts) if err != nil { return nil, fmt.Errorf("failed to open PebbleDB: %w", err) } - database := &Database{ - storage: db, - asyncWriteWG: sync.WaitGroup{}, - config: config, - earliestVersion: earliestVersion, - pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), - } - if config.DedicatedChangelog { - streamHandler, _ := changelog.NewStream( - logger.NewNopLogger(), - utils.GetChangelogPath(dataDir), - changelog.Config{ - DisableFsync: true, - ZeroCopy: true, - KeepRecent: uint64(config.KeepRecent), - PruneInterval: 300 * time.Second, - }, - ) - database.streamHandler = streamHandler - go database.writeAsyncInBackground() - } - return database, nil -} -func NewWithDB(storage *pebble.DB) *Database { - return &Database{ - storage: storage, + versionedDB := &VersionedDB{ + startVersion: startVersion, + endVersion: endVersion, + db: db, } + + return versionedDB, nil } func (db *Database) Close() error { @@ -148,46 +177,58 @@ func (db *Database) Close() error { db.streamHandler = nil close(db.pendingChanges) } - // Wait for the async writes to finish db.asyncWriteWG.Wait() - err := db.storage.Close() - db.storage = nil - return err + + for _, vdb := range db.dbs { + err := vdb.db.Close() + if err != nil { + return err + } + } + db.dbs = nil + return nil } func (db *Database) SetLatestVersion(version int64) error { + currentDB, err := db.getOrCreateDBForVersion(version) + if err != nil { + return err + } var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) + err = currentDB.db.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) fmt.Printf("SetLatestVersion: version=%d, err=%v, latestVersionKey=%s\n", version, err, latestVersionKey) return err } func (db *Database) GetLatestVersion() (int64, error) { - bz, closer, err := db.storage.Get([]byte(latestVersionKey)) + if len(db.dbs) == 0 { + return 0, nil + } + currentDB := db.dbs[len(db.dbs)-1] + bz, closer, err := currentDB.db.Get([]byte(latestVersionKey)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database return 0, nil } - return 0, err } + defer closer.Close() if len(bz) == 0 { - return 0, closer.Close() + return 0, nil } - return int64(binary.LittleEndian.Uint64(bz)), closer.Close() + return int64(binary.LittleEndian.Uint64(bz)), nil } func (db *Database) SetEarliestVersion(version int64) error { if version > db.earliestVersion { db.earliestVersion = version - + currentDB := db.dbs[0] var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) + return currentDB.db.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) } return nil } @@ -215,14 +256,16 @@ func retrieveEarliestVersion(db *pebble.DB) (int64, error) { return int64(binary.LittleEndian.Uint64(bz)), closer.Close() } -// SetLatestKey sets the latest key processed during migration. +// SetLatestMigratedKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { - return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) + currentDB := db.dbs[len(db.dbs)-1] + return currentDB.db.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) } -// GetLatestKey retrieves the latest key processed during migration. +// GetLatestMigratedKey retrieves the latest key processed during migration. func (db *Database) GetLatestMigratedKey() ([]byte, error) { - bz, closer, err := db.storage.Get([]byte(latestMigratedKeyMetadata)) + currentDB := db.dbs[len(db.dbs)-1] + bz, closer, err := currentDB.db.Get([]byte(latestMigratedKeyMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, nil @@ -230,17 +273,19 @@ func (db *Database) GetLatestMigratedKey() ([]byte, error) { return nil, err } defer closer.Close() - return bz, nil + return slices.Clone(bz), nil } -// SetLatestModule sets the latest module processed during migration. +// SetLatestMigratedModule sets the latest module processed during migration. func (db *Database) SetLatestMigratedModule(module string) error { - return db.storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) + currentDB := db.dbs[len(db.dbs)-1] + return currentDB.db.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) } -// GetLatestModule retrieves the latest module processed during migration. +// GetLatestMigratedModule retrieves the latest module processed during migration. func (db *Database) GetLatestMigratedModule() (string, error) { - bz, closer, err := db.storage.Get([]byte(latestMigratedModuleMetadata)) + currentDB := db.dbs[len(db.dbs)-1] + bz, closer, err := currentDB.db.Get([]byte(latestMigratedModuleMetadata)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return "", nil @@ -269,38 +314,48 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt return nil, nil } - prefixedVal, err := getMVCCSlice(db.storage, storeKey, key, targetVersion) - if err != nil { - if errors.Is(err, errorutils.ErrRecordNotFound) { - return nil, nil + // Index into it + for i := len(db.dbs) - 1; i >= 0; i-- { + vdb := db.dbs[i] + if targetVersion < vdb.startVersion { + continue } - return nil, fmt.Errorf("failed to perform PebbleDB read: %w", err) - } + if targetVersion >= vdb.endVersion { + // Target version not in this shard + continue + } - valBz, tombBz, ok := SplitMVCCKey(prefixedVal) - if !ok { - return nil, fmt.Errorf("invalid PebbleDB MVCC value: %s", prefixedVal) - } + prefixedVal, err := getMVCCSlice(vdb.db, storeKey, key, targetVersion) + if err != nil { + if errors.Is(err, errorutils.ErrRecordNotFound) { + continue + } - // A tombstone of zero or a target version that is less than the tombstone - // version means the key is not deleted at the target version. - if len(tombBz) == 0 { - return valBz, nil - } + return nil, fmt.Errorf("failed to perform PebbleDB read: %w", err) + } - tombstone, err := decodeUint64Ascending(tombBz) - if err != nil { - return nil, fmt.Errorf("failed to decode value tombstone: %w", err) - } + valBz, tombBz, ok := SplitMVCCKey(prefixedVal) + if !ok { + return nil, fmt.Errorf("invalid PebbleDB MVCC value: %s", prefixedVal) + } + + if len(tombBz) == 0 { + return valBz, nil + } - // A tombstone of zero or a target version that is less than the tombstone - // version means the key is not deleted at the target version. - if targetVersion < tombstone { - return valBz, nil + tombstone, err := decodeUint64Ascending(tombBz) + if err != nil { + return nil, fmt.Errorf("failed to decode value tombstone: %w", err) + } + + if targetVersion < tombstone { + return valBz, nil + } + + return nil, nil } - // the value is considered deleted return nil, nil } @@ -312,7 +367,12 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro version = 1 } - b, err := NewBatch(db.storage, version) + currentDB, err := db.getOrCreateDBForVersion(version) + if err != nil { + return err + } + + b, err := NewBatch(currentDB.db, version) if err != nil { return err } @@ -385,15 +445,21 @@ func (db *Database) writeAsyncInBackground() { // it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. // This is not a large issue given the next time that module is updated, it will be properly pruned thereafter. func (db *Database) Prune(version int64) error { + if len(db.dbs) != 0 { + return fmt.Errorf("Pruning not enabled when sharding by version") + } + + // Only one shard when pruning enabled + database := db.dbs[0].db earliestVersion := version + 1 // we increment by 1 to include the provided version - itr, err := db.storage.NewIter(nil) + itr, err := database.NewIter(nil) if err != nil { return err } defer itr.Close() - batch := db.storage.NewBatch() + batch := database.NewBatch() defer batch.Close() var ( @@ -498,6 +564,11 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) return nil, errorutils.ErrStartAfterEnd } + vdb, err := db.getDBForVersion(version) + if err != nil { + return nil, err + } + lowerBound := MVCCEncode(prependStoreKey(storeKey, start), 0) var upperBound []byte @@ -505,7 +576,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0) } - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) + itr, err := vdb.db.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) if err != nil { return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -522,6 +593,11 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ return nil, errorutils.ErrStartAfterEnd } + vdb, err := db.getDBForVersion(version) + if err != nil { + return nil, err + } + lowerBound := MVCCEncode(prependStoreKey(storeKey, start), 0) var upperBound []byte @@ -529,7 +605,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0) } - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) + itr, err := vdb.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) if err != nil { return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -544,7 +620,12 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { worker := func() { defer wg.Done() - batch, err := NewBatch(db.storage, version) + currentDB, err := db.getOrCreateDBForVersion(version) + if err != nil { + panic(err) + } + + batch, err := NewBatch(currentDB.db, version) if err != nil { panic(err) } @@ -562,7 +643,7 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { panic(err) } - batch, err = NewBatch(db.storage, version) + batch, err = NewBatch(currentDB.db, version) if err != nil { panic(err) } @@ -586,6 +667,40 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { return nil } +func (db *Database) getDBForVersion(version int64) (*VersionedDB, error) { + if len(db.dbs) == 0 { + return nil, fmt.Errorf("no database shards available") + } + if version < db.dbs[0].startVersion || version >= db.dbs[len(db.dbs)-1].endVersion { + return nil, fmt.Errorf("version %d is out of bounds", version) + } + index := int((version - db.dbs[0].startVersion) / db.config.VersionShardSize) + if index < 0 || index >= len(db.dbs) { + return nil, fmt.Errorf("no database shard found for version %d", version) + } + return db.dbs[index], nil +} + +func (db *Database) getOrCreateDBForVersion(version int64) (*VersionedDB, error) { + vdb, err := db.getDBForVersion(version) + if err == nil { + return vdb, nil + } + // Need to create a new shard + startVersion := (version / db.config.VersionShardSize) * db.config.VersionShardSize + endVersion := startVersion + db.config.VersionShardSize + opts := db.getPebbleOptions() + newDB, err := openVersionedDB(db.config.DBDirectory, startVersion, endVersion, opts) + if err != nil { + return nil, err + } + db.dbs = append(db.dbs, newDB) + return newDB, nil +} + +// TODO: Raw import update with separate db per version +// Can't create a batch with multiple versions +// Create wrapper around func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { var wg sync.WaitGroup