From 183cdefe33fe73e6546e4a43ddf39b9b50bf18d0 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 19 Nov 2024 10:25:20 -0500 Subject: [PATCH 1/2] Sharded DB --- ss/pebbledb/batch.go | 67 +++--- ss/pebbledb/db.go | 510 ++++++++++++++++++++++++++----------------- 2 files changed, 352 insertions(+), 225 deletions(-) diff --git a/ss/pebbledb/batch.go b/ss/pebbledb/batch.go index d8dc003..069545c 100644 --- a/ss/pebbledb/batch.go +++ b/ss/pebbledb/batch.go @@ -66,52 +66,69 @@ func (b *Batch) Write() (err error) { return b.batch.Commit(defaultWriteOpts) } -// For writing kv pairs in any order of version +// RawBatch handles writing key-value pairs where versions and storeKeys can vary. +// It maintains separate batches for each storage (pebble.DB instance). type RawBatch struct { - storage *pebble.DB - batch *pebble.Batch + batches map[*pebble.DB]*pebble.Batch // Map from storage to batch } -func NewRawBatch(storage *pebble.DB) (*RawBatch, error) { - batch := storage.NewBatch() - +func NewRawBatch() *RawBatch { return &RawBatch{ - storage: storage, - batch: batch, - }, nil + batches: make(map[*pebble.DB]*pebble.Batch), + } } -func (b *RawBatch) Size() int { - return b.batch.Len() +func (rb *RawBatch) Size() int { + size := 0 + for _, batch := range rb.batches { + size += batch.Len() + } + return size } -func (b *RawBatch) Reset() { - b.batch.Reset() +func (rb *RawBatch) Reset() { + for _, batch := range rb.batches { + batch.Reset() + } } -func (b *RawBatch) set(storeKey string, tombstone int64, key, value []byte, version int64) error { +func (rb *RawBatch) set(db *pebble.DB, storeKey string, tombstone int64, key, value []byte, version int64) error { + batch, ok := rb.batches[db] + if !ok { + batch = db.NewBatch() + rb.batches[db] = batch + } + prefixedKey := MVCCEncode(prependStoreKey(storeKey, key), version) prefixedVal := MVCCEncode(value, tombstone) - if err := b.batch.Set(prefixedKey, prefixedVal, nil); err != nil { + if err := batch.Set(prefixedKey, prefixedVal, nil); err != nil { return fmt.Errorf("failed to write PebbleDB batch: %w", err) } return nil } -func (b *RawBatch) Set(storeKey string, key, value []byte, version int64) error { - return b.set(storeKey, 0, key, value, version) +func (rb *RawBatch) Set(db *pebble.DB, storeKey string, key, value []byte, version int64) error { + return rb.set(db, storeKey, 0, key, value, version) } -func (b *RawBatch) Delete(storeKey string, key []byte, version int64) error { - return b.set(storeKey, version, key, []byte(tombstoneVal), version) +func (rb *RawBatch) Delete(db *pebble.DB, storeKey string, key []byte, version int64) error { + return rb.set(db, storeKey, version, key, []byte(tombstoneVal), version) } -func (b *RawBatch) Write() (err error) { - defer func() { - err = errors.Join(err, b.batch.Close()) - }() - - return b.batch.Commit(defaultWriteOpts) +func (rb *RawBatch) Write() (err error) { + for _, batch := range rb.batches { + if batch.Count() > 0 { + if commitErr := batch.Commit(defaultWriteOpts); commitErr != nil { + err = errors.Join(err, commitErr) + } + } + if closeErr := batch.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + } + // Reset the batches after writing + rb.batches = make(map[*pebble.DB]*pebble.Batch) + return err } diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index cac2296..b7eb1ec 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -47,7 +47,7 @@ var ( ) type Database struct { - storage *pebble.DB + storages map[string]*pebble.DB asyncWriteWG sync.WaitGroup config config.StateStoreConfig // Earliest version for db after pruning @@ -69,6 +69,15 @@ type VersionedChangesets struct { Changesets []*proto.NamedChangeSet } +var moduleGroups = [][]string{ + {"wasm", "aclaccesscontrol", "oracle"}, + {"epoch", "mint", "acc"}, + {"bank", "feegrant", "staking"}, + {"distribution", "slashing", "gov"}, + {"params", "ibc", "upgrade"}, + {"evidence", "transfer", "tokenfactory"}, +} + func New(dataDir string, config config.StateStoreConfig) (*Database, error) { cache := pebble.NewCache(1024 * 1024 * 32) defer cache.Unref() @@ -103,22 +112,33 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { opts.FlushSplitBytes = opts.Levels[0].TargetFileSize opts = opts.EnsureDefaults() - db, err := pebble.Open(dataDir, opts) - if err != nil { - return nil, fmt.Errorf("failed to open PebbleDB: %w", err) + db := &Database{ + storages: make(map[string]*pebble.DB), + asyncWriteWG: sync.WaitGroup{}, + config: config, + pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), + } + groupIndex := 0 + for _, group := range moduleGroups { + shardDataDir := fmt.Sprintf("%s/shard%d", dataDir, groupIndex) + pebbleDB, err := pebble.Open(shardDataDir, opts) + if err != nil { + return nil, fmt.Errorf("failed to open PebbleDB for shard %d: %w", groupIndex, err) + } + + for _, storeKey := range group { + db.storages[storeKey] = pebbleDB + } + + groupIndex++ } earliestVersion, err := retrieveEarliestVersion(db) if err != nil { - return nil, fmt.Errorf("failed to open PebbleDB: %w", err) - } - database := &Database{ - storage: db, - asyncWriteWG: sync.WaitGroup{}, - config: config, - earliestVersion: earliestVersion, - pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) } + db.earliestVersion = earliestVersion + if config.DedicatedChangelog { streamHandler, _ := changelog.NewStream( logger.NewNopLogger(), @@ -130,16 +150,18 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { PruneInterval: 300 * time.Second, }, ) - database.streamHandler = streamHandler - go database.writeAsyncInBackground() + db.streamHandler = streamHandler + go db.writeAsyncInBackground() } - return database, nil + return db, nil } -func NewWithDB(storage *pebble.DB) *Database { - return &Database{ - storage: storage, +func (db *Database) getStorage(storeKey string) (*pebble.DB, error) { + storage, ok := db.storages[storeKey] + if !ok { + return nil, fmt.Errorf("no storage for storeKey: %s", storeKey) } + return storage, nil } func (db *Database) Close() error { @@ -148,37 +170,54 @@ func (db *Database) Close() error { db.streamHandler = nil close(db.pendingChanges) } - // Wait for the async writes to finish + db.asyncWriteWG.Wait() - err := db.storage.Close() - db.storage = nil + var err error + for _, storage := range db.storages { + closeErr := storage.Close() + if err == nil && closeErr != nil { + err = closeErr + } + } + db.storages = nil return err } func (db *Database) SetLatestVersion(version int64) error { var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) - fmt.Printf("SetLatestVersion: version=%d, err=%v, latestVersionKey=%s\n", version, err, latestVersionKey) - return err -} -func (db *Database) GetLatestVersion() (int64, error) { - bz, closer, err := db.storage.Get([]byte(latestVersionKey)) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database - return 0, nil + // Store the latest version in all storages + for _, storage := range db.storages { + err := storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) + if err != nil { + return err } - - return 0, err } + return nil +} - if len(bz) == 0 { - return 0, closer.Close() +func (db *Database) GetLatestVersion() (int64, error) { + latestVersion := int64(0) + for _, storage := range db.storages { + bz, closer, err := storage.Get([]byte(latestVersionKey)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + continue + } + return 0, err + } + if len(bz) == 0 { + closer.Close() + continue + } + version := int64(binary.LittleEndian.Uint64(bz)) + if version > latestVersion { + latestVersion = version + } + closer.Close() } - - return int64(binary.LittleEndian.Uint64(bz)), closer.Close() + return latestVersion, nil } func (db *Database) SetEarliestVersion(version int64) error { @@ -187,7 +226,13 @@ func (db *Database) SetEarliestVersion(version int64) error { var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) + + for _, storage := range db.storages { + err := storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) + if err != nil { + return err + } + } } return nil } @@ -197,58 +242,86 @@ func (db *Database) GetEarliestVersion() (int64, error) { } // Retrieves earliest version from db -func retrieveEarliestVersion(db *pebble.DB) (int64, error) { - bz, closer, err := db.Get([]byte(earliestVersionKey)) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database - return 0, nil +func retrieveEarliestVersion(db *Database) (int64, error) { + earliestVersion := int64(math.MaxInt64) + for _, storage := range db.storages { + bz, closer, err := storage.Get([]byte(earliestVersionKey)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + continue + } + return 0, err } - - return 0, err + if len(bz) == 0 { + closer.Close() + continue + } + version := int64(binary.LittleEndian.Uint64(bz)) + if version < earliestVersion { + earliestVersion = version + } + closer.Close() } - - if len(bz) == 0 { - return 0, closer.Close() + if earliestVersion == int64(math.MaxInt64) { + return 0, nil } - - return int64(binary.LittleEndian.Uint64(bz)), closer.Close() + return earliestVersion, nil } // SetLatestKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { - return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) + for _, storage := range db.storages { + err := storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) + if err != nil { + return err + } + } + return nil } // GetLatestKey retrieves the latest key processed during migration. func (db *Database) GetLatestMigratedKey() ([]byte, error) { - bz, closer, err := db.storage.Get([]byte(latestMigratedKeyMetadata)) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return nil, nil + // Retrieve from the first storage + for _, storage := range db.storages { + bz, closer, err := storage.Get([]byte(latestMigratedKeyMetadata)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + continue + } + return nil, err } - return nil, err + defer closer.Close() + return bz, nil } - defer closer.Close() - return bz, nil + return nil, nil } // SetLatestModule sets the latest module processed during migration. func (db *Database) SetLatestMigratedModule(module string) error { - return db.storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) + for _, storage := range db.storages { + err := storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts) + if err != nil { + return err + } + } + return nil } // GetLatestModule retrieves the latest module processed during migration. func (db *Database) GetLatestMigratedModule() (string, error) { - bz, closer, err := db.storage.Get([]byte(latestMigratedModuleMetadata)) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return "", nil + // Retrieve from the first storage + for _, storage := range db.storages { + bz, closer, err := storage.Get([]byte(latestMigratedModuleMetadata)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + continue + } + return "", err } - return "", err + defer closer.Close() + return string(bz), nil } - defer closer.Close() - return string(bz), nil + return "", nil } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { @@ -269,7 +342,12 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt return nil, nil } - prefixedVal, err := getMVCCSlice(db.storage, storeKey, key, targetVersion) + storage, err := db.getStorage(storeKey) + if err != nil { + return nil, err + } + + prefixedVal, err := getMVCCSlice(storage, storeKey, key, targetVersion) if err != nil { if errors.Is(err, errorutils.ErrRecordNotFound) { return nil, nil @@ -283,8 +361,6 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt return nil, fmt.Errorf("invalid PebbleDB MVCC value: %s", prefixedVal) } - // A tombstone of zero or a target version that is less than the tombstone - // version means the key is not deleted at the target version. if len(tombBz) == 0 { return valBz, nil } @@ -294,25 +370,24 @@ func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byt return nil, fmt.Errorf("failed to decode value tombstone: %w", err) } - // A tombstone of zero or a target version that is less than the tombstone - // version means the key is not deleted at the target version. if targetVersion < tombstone { return valBz, nil } - // the value is considered deleted return nil, nil } func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error { - // Check if version is 0 and change it to 1 - // We do this specifically since keys written as part of genesis state come in as version 0 - // But pebbledb treats version 0 as special, so apply the changeset at version 1 instead if version == 0 { version = 1 } - b, err := NewBatch(db.storage, version) + storage, err := db.getStorage(cs.Name) + if err != nil { + return err + } + + b, err := NewBatch(storage, version) if err != nil { return err } @@ -329,7 +404,6 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } } - // Mark the store as updated db.storeKeyDirty.Store(cs.Name, version) return b.Write() @@ -387,106 +461,95 @@ func (db *Database) writeAsyncInBackground() { func (db *Database) Prune(version int64) error { earliestVersion := version + 1 // we increment by 1 to include the provided version - itr, err := db.storage.NewIter(nil) - if err != nil { - return err - } - defer itr.Close() - - batch := db.storage.NewBatch() - defer batch.Close() - - var ( - counter int - prevKey, prevKeyEncoded, prevValEncoded []byte - prevVersionDecoded int64 - prevStore string - ) - - for itr.First(); itr.Valid(); { - currKeyEncoded := slices.Clone(itr.Key()) - - // Ignore metadata entry for version during pruning - if bytes.Equal(currKeyEncoded, []byte(latestVersionKey)) || bytes.Equal(currKeyEncoded, []byte(earliestVersionKey)) { - itr.Next() - continue - } - - // Store current key and version - currKey, currVersion, currOK := SplitMVCCKey(currKeyEncoded) - if !currOK { - return fmt.Errorf("invalid MVCC key") - } - - storeKey, err := parseStoreKey(currKey) + for _, storage := range db.storages { + itr, err := storage.NewIter(nil) if err != nil { - // XXX: This should never happen given we skip the metadata keys. return err } + defer itr.Close() + + batch := storage.NewBatch() + defer batch.Close() + + var ( + counter int + prevKey, prevKeyEncoded, prevValEncoded []byte + prevVersionDecoded int64 + ) + + for itr.First(); itr.Valid(); { + currKeyEncoded := slices.Clone(itr.Key()) - // For every new module visited, check to see last time it was updated - if storeKey != prevStore { - prevStore = storeKey - updated, ok := db.storeKeyDirty.Load(storeKey) - versionUpdated, typeOk := updated.(int64) - // Skip a store's keys if version it was last updated is less than last prune height - if !ok || (typeOk && versionUpdated < db.earliestVersion) { - itr.SeekGE(storePrefix(storeKey + "0")) + // Ignore metadata entry for version during pruning + if bytes.Equal(currKeyEncoded, []byte(latestVersionKey)) || bytes.Equal(currKeyEncoded, []byte(earliestVersionKey)) { + itr.Next() continue } - } - currVersionDecoded, err := decodeUint64Ascending(currVersion) - if err != nil { - return err - } - - // Seek to next key if we are at a version which is higher than prune height - // Do not seek to next key if KeepLastVersion is false and we need to delete the previous key in pruning - if currVersionDecoded > version && (db.config.KeepLastVersion || prevVersionDecoded > version) { - itr.NextPrefix() - continue - } + // Store current key and version + currKey, currVersion, currOK := SplitMVCCKey(currKeyEncoded) + if !currOK { + return fmt.Errorf("invalid MVCC key") + } - // Delete a key if another entry for that key exists at a larger version than original but leq to the prune height - // Also delete a key if it has been tombstoned and its version is leq to the prune height - // Also delete a key if KeepLastVersion is false and version is leq to the prune height - if prevVersionDecoded <= version && (bytes.Equal(prevKey, currKey) || valTombstoned(prevValEncoded) || !db.config.KeepLastVersion) { - err = batch.Delete(prevKeyEncoded, nil) + currVersionDecoded, err := decodeUint64Ascending(currVersion) if err != nil { return err } - counter++ - if counter >= PruneCommitBatchSize { - err = batch.Commit(defaultWriteOpts) + // Seek to next key if we are at a version which is higher than prune height + if currVersionDecoded > version && (db.config.KeepLastVersion || prevVersionDecoded > version) { + itr.NextPrefix() + continue + } + + // Delete logic as before + if prevVersionDecoded <= version && (bytes.Equal(prevKey, currKey) || valTombstoned(prevValEncoded) || !db.config.KeepLastVersion) { + err = batch.Delete(prevKeyEncoded, nil) if err != nil { return err } - counter = 0 - batch.Reset() + counter++ + if counter >= PruneCommitBatchSize { + err = batch.Commit(defaultWriteOpts) + if err != nil { + return err + } + + counter = 0 + batch.Reset() + } } - } - // Update prevKey and prevVersion for next iteration - prevKey = currKey - prevVersionDecoded = currVersionDecoded - prevKeyEncoded = currKeyEncoded - prevValEncoded = slices.Clone(itr.Value()) + // Update prevKey and prevVersion for next iteration + prevKey = currKey + prevVersionDecoded = currVersionDecoded + prevKeyEncoded = currKeyEncoded + prevValEncoded = slices.Clone(itr.Value()) - itr.Next() - } + itr.Next() + } + + // Commit any leftover delete ops in batch + if counter > 0 { + err = batch.Commit(defaultWriteOpts) + if err != nil { + return err + } + } - // Commit any leftover delete ops in batch - if counter > 0 { - err = batch.Commit(defaultWriteOpts) + // Update earliest version in storage + var ts [VersionSize]byte + binary.LittleEndian.PutUint64(ts[:], uint64(earliestVersion)) + err = storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) if err != nil { return err } } - return db.SetEarliestVersion(earliestVersion) + db.earliestVersion = earliestVersion + return nil } func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { @@ -498,6 +561,11 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) return nil, errorutils.ErrStartAfterEnd } + storage, err := db.getStorage(storeKey) + if err != nil { + return nil, err + } + lowerBound := MVCCEncode(prependStoreKey(storeKey, start), 0) var upperBound []byte @@ -505,7 +573,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0) } - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) + itr, err := storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) if err != nil { return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -522,6 +590,11 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ return nil, errorutils.ErrStartAfterEnd } + storage, err := db.getStorage(storeKey) + if err != nil { + return nil, err + } + lowerBound := MVCCEncode(prependStoreKey(storeKey, start), 0) var upperBound []byte @@ -529,7 +602,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0) } - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) + itr, err := storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound}) if err != nil { return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -541,37 +614,53 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ // TODO: Potentially add retries instead of panics func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { var wg sync.WaitGroup + var errCh = make(chan error, db.config.ImportNumWorkers) worker := func() { defer wg.Done() - batch, err := NewBatch(db.storage, version) - if err != nil { - panic(err) - } + batches := make(map[string]*Batch) - var counter int for entry := range ch { - err := batch.Set(entry.StoreKey, entry.Key, entry.Value) + storage, err := db.getStorage(entry.StoreKey) if err != nil { - panic(err) + errCh <- err + return } - counter++ - if counter%ImportCommitBatchSize == 0 { - if err := batch.Write(); err != nil { - panic(err) + b, ok := batches[entry.StoreKey] + if !ok { + b, err = NewBatch(storage, version) + if err != nil { + errCh <- err + return } + batches[entry.StoreKey] = b + } + + err = b.Set(entry.StoreKey, entry.Key, entry.Value) + if err != nil { + errCh <- err + return + } - batch, err = NewBatch(db.storage, version) + if b.Size() >= ImportCommitBatchSize { + err = b.Write() if err != nil { - panic(err) + errCh <- err + return } + delete(batches, entry.StoreKey) } } - if batch.Size() > 0 { - if err := batch.Write(); err != nil { - panic(err) + // Write remaining batches + for _, b := range batches { + if b.Size() > 0 { + err := b.Write() + if err != nil { + errCh <- err + return + } } } } @@ -582,30 +671,42 @@ func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { } wg.Wait() + close(errCh) + + for err := range errCh { + if err != nil { + return err + } + } return nil } func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { var wg sync.WaitGroup + var errCh = make(chan error, db.config.ImportNumWorkers) worker := func() { defer wg.Done() - batch, err := NewRawBatch(db.storage) - if err != nil { - panic(err) - } - + rawBatch := NewRawBatch() var counter int - var latestKey []byte // store the latest key from the batch + var latestKey []byte var latestModule string + for entry := range ch { - err := batch.Set(entry.StoreKey, entry.Key, entry.Value, entry.Version) + storage, err := db.getStorage(entry.StoreKey) if err != nil { - panic(err) + errCh <- err + return } - latestKey = entry.Key // track the latest key + err = rawBatch.Set(storage, entry.StoreKey, entry.Key, entry.Value, entry.Version) + if err != nil { + errCh <- err + return + } + + latestKey = entry.Key // Track the latest key latestModule = entry.StoreKey counter++ @@ -613,17 +714,20 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { startTime := time.Now() // Commit the batch and record the latest key as metadata - if err := batch.Write(); err != nil { - panic(err) + if err := rawBatch.Write(); err != nil { + errCh <- err + return } // Persist the latest key in the metadata if err := db.SetLatestMigratedKey(latestKey); err != nil { - panic(err) + errCh <- err + return } if err := db.SetLatestMigratedModule(latestModule); err != nil { - panic(err) + errCh <- err + return } if counter%1000000 == 0 { @@ -633,26 +737,26 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { }) } - batch, err = NewRawBatch(db.storage) - if err != nil { - panic(err) - } + rawBatch = NewRawBatch() } } // Final batch write - if batch.Size() > 0 { - if err := batch.Write(); err != nil { - panic(err) + if rawBatch.Size() > 0 { + if err := rawBatch.Write(); err != nil { + errCh <- err + return } // Persist the final latest key if err := db.SetLatestMigratedKey(latestKey); err != nil { - panic(err) + errCh <- err + return } if err := db.SetLatestMigratedModule(latestModule); err != nil { - panic(err) + errCh <- err + return } } } @@ -663,16 +767,27 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { } wg.Wait() + close(errCh) + + for err := range errCh { + if err != nil { + return err + } + } return nil } // RawIterate iterates over all keys and values for a store func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error) { - // Iterate through all keys and values for a store + storage, err := db.getStorage(storeKey) + if err != nil { + return false, err + } + lowerBound := MVCCEncode(prependStoreKey(storeKey, nil), 0) - itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound}) + itr, err := storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound}) if err != nil { return false, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } @@ -692,11 +807,6 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte return false, fmt.Errorf("invalid MVCC key") } - // Only iterate through module - if storeKey != "" && !bytes.HasPrefix(currKey, storePrefix(storeKey)) { - break - } - currVersionDecoded, err := decodeUint64Ascending(currVersion) if err != nil { return false, err From ac68d88f42aaf9fa31792cf72722219c7791795d Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 19 Nov 2024 10:35:13 -0500 Subject: [PATCH 2/2] Add back metadata check --- ss/pebbledb/db.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index b7eb1ec..936100a 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -796,17 +796,22 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte for itr.First(); itr.Valid(); itr.Next() { currKeyEncoded := itr.Key() - // Ignore metadata entry for version + // Ignore metadata entries if bytes.Equal(currKeyEncoded, []byte(latestVersionKey)) || bytes.Equal(currKeyEncoded, []byte(earliestVersionKey)) { continue } - // Store current key and version + // Decode the current key and version currKey, currVersion, currOK := SplitMVCCKey(currKeyEncoded) if !currOK { return false, fmt.Errorf("invalid MVCC key") } + // Only iterate through the specified module + if storeKey != "" && !bytes.HasPrefix(currKey, storePrefix(storeKey)) { + break + } + currVersionDecoded, err := decodeUint64Ascending(currVersion) if err != nil { return false, err @@ -822,11 +827,10 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte return false, fmt.Errorf("invalid PebbleDB MVCC value: %s", currKey) } - // Call callback fn + // Call the callback function if fn(currKey, valBz, currVersionDecoded) { return true, nil } - } return false, nil