@@ -19,6 +19,7 @@ import (
19
19
"github.com/sei-protocol/sei-db/config"
20
20
"github.com/sei-protocol/sei-db/proto"
21
21
"github.com/sei-protocol/sei-db/ss/types"
22
+ "github.com/sei-protocol/sei-db/ss/util"
22
23
"github.com/sei-protocol/sei-db/stream/changelog"
23
24
"golang.org/x/exp/slices"
24
25
)
@@ -34,11 +35,16 @@ const (
34
35
earliestVersionKey = "s/_earliest"
35
36
latestMigratedKeyMetadata = "s/_latestMigratedKey"
36
37
latestMigratedModuleMetadata = "s/_latestMigratedModule"
38
+ lastRangeHashKey = "s/_hash:latestRange"
37
39
tombstoneVal = "TOMBSTONE"
38
40
39
41
// TODO: Make configurable
40
42
ImportCommitBatchSize = 10000
41
43
PruneCommitBatchSize = 50
44
+ DeleteCommitBatchSize = 50
45
+
46
+ // Number of workers to use for hash computation
47
+ HashComputationWorkers = 10
42
48
)
43
49
44
50
var (
@@ -63,6 +69,11 @@ type Database struct {
63
69
64
70
// Pending changes to be written to the DB
65
71
pendingChanges chan VersionedChangesets
72
+
73
+ // Cache for last range hashed
74
+ lastRangeHashedCache int64
75
+ lastRangeHashedMu sync.RWMutex
76
+ hashComputationMu sync.Mutex
66
77
}
67
78
68
79
type VersionedChangesets struct {
@@ -120,6 +131,14 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
120
131
earliestVersion : earliestVersion ,
121
132
pendingChanges : make (chan VersionedChangesets , config .AsyncWriteBuffer ),
122
133
}
134
+
135
+ // Initialize the lastRangeHashed cache
136
+ lastHashed , err := retrieveLastRangeHashed (db )
137
+ if err != nil {
138
+ return nil , fmt .Errorf ("failed to retrieve last range hashed: %w" , err )
139
+ }
140
+ database .lastRangeHashedCache = lastHashed
141
+
123
142
if config .DedicatedChangelog {
124
143
streamHandler , _ := changelog .NewStream (
125
144
logger .NewNopLogger (),
@@ -196,6 +215,28 @@ func (db *Database) GetEarliestVersion() (int64, error) {
196
215
return db .earliestVersion , nil
197
216
}
198
217
218
+ func (db * Database ) SetLastRangeHashed (latestHashed int64 ) error {
219
+ var ts [VersionSize ]byte
220
+ binary .LittleEndian .PutUint64 (ts [:], uint64 (latestHashed ))
221
+
222
+ // Update the cache first
223
+ db .lastRangeHashedMu .Lock ()
224
+ db .lastRangeHashedCache = latestHashed
225
+ db .lastRangeHashedMu .Unlock ()
226
+
227
+ return db .storage .Set ([]byte (lastRangeHashKey ), ts [:], defaultWriteOpts )
228
+ }
229
+
230
+ // GetLastRangeHashed returns the highest block that has been fully hashed in ranges.
231
+ func (db * Database ) GetLastRangeHashed () (int64 , error ) {
232
+ // Return the cached value
233
+ db .lastRangeHashedMu .RLock ()
234
+ cachedValue := db .lastRangeHashedCache
235
+ db .lastRangeHashedMu .RUnlock ()
236
+
237
+ return cachedValue , nil
238
+ }
239
+
199
240
// Retrieves earliest version from db
200
241
func retrieveEarliestVersion (db * pebble.DB ) (int64 , error ) {
201
242
bz , closer , err := db .Get ([]byte (earliestVersionKey ))
@@ -353,6 +394,114 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named
353
394
Version : version ,
354
395
Changesets : changesets ,
355
396
}
397
+
398
+ if db .config .HashRange > 0 {
399
+ go func (ver int64 ) {
400
+ // Try to acquire lock, return immediately if already locked
401
+ if ! db .hashComputationMu .TryLock () {
402
+ return
403
+ }
404
+ defer db .hashComputationMu .Unlock ()
405
+
406
+ if err := db .computeMissingRanges (ver ); err != nil {
407
+ fmt .Printf ("maybeComputeMissingRanges error: %v\n " , err )
408
+ }
409
+ }(version )
410
+ }
411
+
412
+ return nil
413
+ }
414
+
415
+ func (db * Database ) computeMissingRanges (latestVersion int64 ) error {
416
+ lastHashed , err := db .GetLastRangeHashed ()
417
+ if err != nil {
418
+ return fmt .Errorf ("failed to get last hashed range: %w" , err )
419
+ }
420
+
421
+ // Keep filling full chunks until we can't
422
+ for {
423
+ nextTarget := lastHashed + db .config .HashRange
424
+
425
+ // If we haven't reached the next full chunk boundary yet, stop.
426
+ // We do NOT do partial chunks.
427
+ if nextTarget > latestVersion {
428
+ break
429
+ }
430
+
431
+ // We have a full chunk from (lastHashed+1) .. nextTarget
432
+ begin := lastHashed + 1
433
+ end := nextTarget
434
+ if err := db .computeHashForRange (begin , end ); err != nil {
435
+ return err
436
+ }
437
+
438
+ // Mark that we've completed that chunk
439
+ lastHashed = end
440
+ if err := db .SetLastRangeHashed (lastHashed ); err != nil {
441
+ return err
442
+ }
443
+ }
444
+
445
+ return nil
446
+ }
447
+
448
+ func (db * Database ) computeHashForRange (beginBlock , endBlock int64 ) error {
449
+ chunkSize := endBlock - beginBlock + 1
450
+ if chunkSize <= 0 {
451
+ // Nothing to do
452
+ return nil
453
+ }
454
+
455
+ // Use constant number of workers
456
+ numOfWorkers := HashComputationWorkers
457
+
458
+ // Calculate blocks per worker
459
+ blocksPerWorker := chunkSize / int64 (numOfWorkers )
460
+ if blocksPerWorker < 1 {
461
+ blocksPerWorker = 1
462
+ }
463
+
464
+ for _ , moduleName := range util .Modules {
465
+ dataCh := make (chan types.RawSnapshotNode , 10_000 )
466
+
467
+ hashCalculator := util .NewXorHashCalculator (blocksPerWorker , numOfWorkers , dataCh )
468
+
469
+ go func (mod string ) {
470
+ defer close (dataCh )
471
+
472
+ _ , err := db .RawIterate (mod , func (key , value []byte , ver int64 ) bool {
473
+ // Only feed data whose version is in [beginBlock..endBlock]
474
+ if ver >= beginBlock && ver <= endBlock {
475
+ dataCh <- types.RawSnapshotNode {
476
+ StoreKey : mod ,
477
+ Key : key ,
478
+ Value : value ,
479
+ Version : ver - beginBlock ,
480
+ }
481
+ }
482
+ return false
483
+ })
484
+ if err != nil {
485
+ panic (fmt .Errorf ("error scanning module %s: %w" , mod , err ))
486
+ }
487
+ }(moduleName )
488
+
489
+ allHashes := hashCalculator .ComputeHashes ()
490
+ if len (allHashes ) == 0 {
491
+ continue
492
+ }
493
+
494
+ finalHash := allHashes [len (allHashes )- 1 ]
495
+
496
+ if err := db .WriteBlockRangeHash (moduleName , beginBlock , endBlock , finalHash ); err != nil {
497
+ return fmt .Errorf (
498
+ "failed to write block-range hash for module %q in [%d..%d]: %w" ,
499
+ moduleName , beginBlock , endBlock , err ,
500
+ )
501
+ }
502
+
503
+ }
504
+
356
505
return nil
357
506
}
358
507
@@ -671,6 +820,7 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error {
671
820
func (db * Database ) RawIterate (storeKey string , fn func (key []byte , value []byte , version int64 ) bool ) (bool , error ) {
672
821
// Iterate through all keys and values for a store
673
822
lowerBound := MVCCEncode (prependStoreKey (storeKey , nil ), 0 )
823
+ prefix := storePrefix (storeKey )
674
824
675
825
itr , err := db .storage .NewIter (& pebble.IterOptions {LowerBound : lowerBound })
676
826
if err != nil {
@@ -693,10 +843,13 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
693
843
}
694
844
695
845
// Only iterate through module
696
- if storeKey != "" && ! bytes .HasPrefix (currKey , storePrefix ( storeKey ) ) {
846
+ if storeKey != "" && ! bytes .HasPrefix (currKey , prefix ) {
697
847
break
698
848
}
699
849
850
+ // Parse prefix out of the key
851
+ parsedKey := currKey [len (prefix ):]
852
+
700
853
currVersionDecoded , err := decodeUint64Ascending (currVersion )
701
854
if err != nil {
702
855
return false , err
@@ -713,7 +866,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
713
866
}
714
867
715
868
// Call callback fn
716
- if fn (currKey , valBz , currVersionDecoded ) {
869
+ if fn (parsedKey , valBz , currVersionDecoded ) {
717
870
return true , nil
718
871
}
719
872
@@ -722,6 +875,50 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
722
875
return false , nil
723
876
}
724
877
878
+ func (db * Database ) DeleteKeysAtVersion (module string , version int64 ) error {
879
+
880
+ batch , err := NewBatch (db .storage , version )
881
+ if err != nil {
882
+ return fmt .Errorf ("failed to create deletion batch for module %q: %w" , module , err )
883
+ }
884
+
885
+ deleteCounter := 0
886
+
887
+ _ , err = db .RawIterate (module , func (key , value []byte , ver int64 ) bool {
888
+ if ver == version {
889
+ if err := batch .HardDelete (module , key ); err != nil {
890
+ fmt .Printf ("Error physically deleting key %q in module %q: %v\n " , key , module , err )
891
+ return true // stop iteration on error
892
+ }
893
+ deleteCounter ++
894
+ if deleteCounter >= DeleteCommitBatchSize {
895
+ if err := batch .Write (); err != nil {
896
+ fmt .Printf ("Error writing deletion batch for module %q: %v\n " , module , err )
897
+ return true
898
+ }
899
+ deleteCounter = 0
900
+ batch , err = NewBatch (db .storage , version )
901
+ if err != nil {
902
+ fmt .Printf ("Error creating a new deletion batch for module %q: %v\n " , module , err )
903
+ return true
904
+ }
905
+ }
906
+ }
907
+ return false
908
+ })
909
+ if err != nil {
910
+ return fmt .Errorf ("error iterating module %q for deletion: %w" , module , err )
911
+ }
912
+
913
+ // Commit any remaining deletions.
914
+ if batch .Size () > 0 {
915
+ if err := batch .Write (); err != nil {
916
+ return fmt .Errorf ("error writing final deletion batch for module %q: %w" , module , err )
917
+ }
918
+ }
919
+ return nil
920
+ }
921
+
725
922
func storePrefix (storeKey string ) []byte {
726
923
return []byte (fmt .Sprintf (StorePrefixTpl , storeKey ))
727
924
}
@@ -818,3 +1015,20 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo
818
1015
}
819
1016
return nil
820
1017
}
1018
+
1019
+ func retrieveLastRangeHashed (db * pebble.DB ) (int64 , error ) {
1020
+ bz , closer , err := db .Get ([]byte (lastRangeHashKey ))
1021
+ if err != nil {
1022
+ if errors .Is (err , pebble .ErrNotFound ) {
1023
+ // means we haven't hashed anything yet
1024
+ return 0 , nil
1025
+ }
1026
+ return 0 , err
1027
+ }
1028
+ defer closer .Close ()
1029
+
1030
+ if len (bz ) == 0 {
1031
+ return 0 , nil
1032
+ }
1033
+ return int64 (binary .LittleEndian .Uint64 (bz )), nil
1034
+ }
0 commit comments