@@ -55,6 +55,7 @@ import (
55
55
"github.com/transparency-dev/tessera/api/layout"
56
56
"github.com/transparency-dev/tessera/internal/migrate"
57
57
"github.com/transparency-dev/tessera/internal/otel"
58
+ "github.com/transparency-dev/tessera/internal/parse"
58
59
"github.com/transparency-dev/tessera/internal/stream"
59
60
storage "github.com/transparency-dev/tessera/storage/internal"
60
61
"golang.org/x/sync/errgroup"
@@ -112,6 +113,8 @@ type sequencer interface {
112
113
nextIndex (ctx context.Context ) (uint64 , error )
113
114
// publishTree coordinates the publication of new checkpoints based on the current integrated tree.
114
115
publishTree (ctx context.Context , minAge time.Duration , f func (ctx context.Context , size uint64 , root []byte ) error ) error
116
+ // garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation.
117
+ garbageCollect (ctx context.Context , treeSize uint64 , maxDeletes uint , removePrefix func (ctx context.Context , prefix string ) error ) error
115
118
}
116
119
117
120
// consumeFunc is the signature of a function which can consume entries from the sequencer and integrate
@@ -196,29 +199,41 @@ func (lr *LogReader) StreamEntries(ctx context.Context, startEntry, N uint64) it
196
199
return stream .EntryBundles (ctx , numWorkers , lr .integratedSize , lr .lrs .getEntryBundle , startEntry , N )
197
200
}
198
201
202
+ // Appender creates a new tessera.Appender lifecycle object.
199
203
func (s * Storage ) Appender (ctx context.Context , opts * tessera.AppendOptions ) (* tessera.Appender , tessera.LogReader , error ) {
200
-
201
- if opts .CheckpointInterval () < minCheckpointInterval {
202
- return nil , nil , fmt .Errorf ("requested CheckpointInterval (%v) is less than minimum permitted %v" , opts .CheckpointInterval (), minCheckpointInterval )
203
- }
204
-
205
204
c , err := gcs .NewClient (ctx , gcs .WithJSONReads ())
206
205
if err != nil {
207
206
return nil , nil , fmt .Errorf ("failed to create GCS client: %v" , err )
208
207
}
208
+ gs := & gcsStorage {
209
+ gcsClient : c ,
210
+ bucket : s .cfg .Bucket ,
211
+ bucketPrefix : s .cfg .BucketPrefix ,
212
+ }
209
213
210
214
seq , err := newSpannerCoordinator (ctx , s .cfg .Spanner , uint64 (opts .PushbackMaxOutstanding ()))
211
215
if err != nil {
212
216
return nil , nil , fmt .Errorf ("failed to create Spanner coordinator: %v" , err )
213
217
}
214
218
219
+ a , lr , err := s .newAppender (ctx , gs , seq , opts )
220
+ if err != nil {
221
+ return nil , nil , err
222
+ }
223
+ return & tessera.Appender {
224
+ Add : a .Add ,
225
+ }, lr , nil
226
+ }
227
+
228
+ // newAppender creates and initialises a tessera.Appender struct with the provided underlying storage implementations.
229
+ func (s * Storage ) newAppender (ctx context.Context , o objStore , seq * spannerCoordinator , opts * tessera.AppendOptions ) (* Appender , tessera.LogReader , error ) {
230
+ if opts .CheckpointInterval () < minCheckpointInterval {
231
+ return nil , nil , fmt .Errorf ("requested CheckpointInterval (%v) is less than minimum permitted %v" , opts .CheckpointInterval (), minCheckpointInterval )
232
+ }
233
+
215
234
a := & Appender {
216
235
logStore : & logResourceStore {
217
- objStore : & gcsStorage {
218
- gcsClient : c ,
219
- bucket : s .cfg .Bucket ,
220
- bucketPrefix : s .cfg .BucketPrefix ,
221
- },
236
+ objStore : o ,
222
237
entriesPath : opts .EntriesPath (),
223
238
},
224
239
sequencer : seq ,
@@ -244,10 +259,11 @@ func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*t
244
259
245
260
go a .sequencerJob (ctx )
246
261
go a .publisherJob (ctx , opts .CheckpointInterval ())
262
+ if i := opts .GarbageCollectionInterval (); i > 0 {
263
+ go a .garbageCollectorJob (ctx , i )
264
+ }
247
265
248
- return & tessera.Appender {
249
- Add : a .Add ,
250
- }, reader , nil
266
+ return a , reader , nil
251
267
}
252
268
253
269
// Appender is an implementation of the Tessera appender lifecycle contract.
@@ -324,6 +340,46 @@ func (a *Appender) publisherJob(ctx context.Context, i time.Duration) {
324
340
}
325
341
}
326
342
343
+ // garbageCollectorJob is a long-running function which handles the removal of obsolete partial tiles
344
+ // and entry bundles.
345
+ // Blocks until ctx is done.
346
+ func (a * Appender ) garbageCollectorJob (ctx context.Context , i time.Duration ) {
347
+ t := time .NewTicker (i )
348
+ defer t .Stop ()
349
+
350
+ // Entirely arbitrary number.
351
+ maxDeletesPerRun := uint (1024 )
352
+
353
+ for {
354
+ select {
355
+ case <- ctx .Done ():
356
+ return
357
+ case <- t .C :
358
+ }
359
+ func () {
360
+ ctx , span := tracer .Start (ctx , "tessera.storage.gcp.garbageCollectTask" )
361
+ defer span .End ()
362
+
363
+ // Figure out the size of the latest published checkpoint - we can't be removing partial tiles implied by
364
+ // that checkpoint just because we've done an integration and know about a larger (but as yet unpublished)
365
+ // checkpoint!
366
+ cp , err := a .logStore .getCheckpoint (ctx )
367
+ if err != nil {
368
+ klog .Warningf ("Failed to get published checkpoint: %v" , err )
369
+ }
370
+ _ , pubSize , _ , err := parse .CheckpointUnsafe (cp )
371
+ if err != nil {
372
+ klog .Warningf ("Failed to parse published checkpoint: %v" , err )
373
+ }
374
+
375
+ if err := a .sequencer .garbageCollect (ctx , pubSize , maxDeletesPerRun , a .logStore .objStore .deleteObjectsWithPrefix ); err != nil {
376
+ klog .Warningf ("GarbageCollect failed: %v" , err )
377
+ }
378
+ }()
379
+ }
380
+
381
+ }
382
+
327
383
// init ensures that the storage represents a log in a valid state.
328
384
func (a * Appender ) init (ctx context.Context ) error {
329
385
if _ , err := a .logStore .getCheckpoint (ctx ); err != nil {
@@ -372,6 +428,7 @@ func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []by
372
428
type objStore interface {
373
429
getObject (ctx context.Context , obj string ) ([]byte , int64 , error )
374
430
setObject (ctx context.Context , obj string , data []byte , cond * gcs.Conditions , contType string , cacheCtl string ) error
431
+ deleteObjectsWithPrefix (ctx context.Context , prefix string ) error
375
432
}
376
433
377
434
// logResourceStore knows how to read and write entries which represent a tiles log inside an objStore.
@@ -665,12 +722,14 @@ func (s *spannerCoordinator) initDB(ctx context.Context, spannerDB string) error
665
722
"CREATE TABLE IF NOT EXISTS Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq)" ,
666
723
"CREATE TABLE IF NOT EXISTS IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL, rootHash BYTES(32)) PRIMARY KEY (id)" ,
667
724
"CREATE TABLE IF NOT EXISTS PubCoord (id INT64 NOT NULL, publishedAt TIMESTAMP NOT NULL) PRIMARY KEY (id)" ,
725
+ "CREATE TABLE IF NOT EXISTS GCCoord (id INT64 NOT NULL, fromSize INT64 NOT NULL) PRIMARY KEY (id)" ,
668
726
},
669
727
[][]* spanner.Mutation {
670
728
{spanner .Insert ("Tessera" , []string {"id" , "compatibilityVersion" }, []any {0 , SchemaCompatibilityVersion })},
671
729
{spanner .Insert ("SeqCoord" , []string {"id" , "next" }, []any {0 , 0 })},
672
730
{spanner .Insert ("IntCoord" , []string {"id" , "seq" , "rootHash" }, []any {0 , 0 , rfc6962 .DefaultHasher .EmptyRoot ()})},
673
731
{spanner .Insert ("PubCoord" , []string {"id" , "publishedAt" }, []any {0 , time .Unix (0 , 0 )})},
732
+ {spanner .Insert ("GCCoord" , []string {"id" , "fromSize" }, []any {0 , 0 })},
674
733
},
675
734
)
676
735
}
@@ -949,6 +1008,58 @@ func (s *spannerCoordinator) publishTree(ctx context.Context, minAge time.Durati
949
1008
return nil
950
1009
}
951
1010
1011
+ // garbageCollect is a long running function which will identify unneeded partial tiles/entry bundles, and call the provided function to remove them.
1012
+ //
1013
+ // Uses the `GCCoord` table to ensure that only one binary is actively garbage collecting at any given time, and to track progress so that we don't
1014
+ // needlessly attempt to GC over regions which have already been cleaned.
1015
+ //
1016
+ // Returns true if we've "caught up" with the current state of the tree.
1017
+ func (s * spannerCoordinator ) garbageCollect (ctx context.Context , treeSize uint64 , maxDeletes uint , deleteWithPrefix func (ctx context.Context , prefix string ) error ) error {
1018
+ _ , err := s .dbPool .ReadWriteTransaction (ctx , func (ctx context.Context , txn * spanner.ReadWriteTransaction ) error {
1019
+ row , err := txn .ReadRowWithOptions (ctx , "GCCoord" , spanner.Key {0 }, []string {"fromSize" }, & spanner.ReadOptions {LockHint : spannerpb .ReadRequest_LOCK_HINT_EXCLUSIVE })
1020
+ if err != nil {
1021
+ return fmt .Errorf ("failed to read GCCoord: %w" , err )
1022
+ }
1023
+ var fs int64
1024
+ if err := row .Columns (& fs ); err != nil {
1025
+ return fmt .Errorf ("failed to parse row contents: %v" , err )
1026
+ }
1027
+ fromSize := uint64 (fs )
1028
+
1029
+ if fromSize == treeSize {
1030
+ return nil
1031
+ }
1032
+
1033
+ d := uint (0 )
1034
+ eg := errgroup.Group {}
1035
+ done:
1036
+ for l , f , x := uint64 (0 ), fromSize , treeSize ; x > 0 ; l , f , x = l + 1 , f >> layout .TileHeight , x >> layout .TileHeight {
1037
+ for ri := range layout .Range (f , x - f , x ) {
1038
+ if ri .Partial != 0 || d > maxDeletes {
1039
+ break done
1040
+ }
1041
+ if l == 0 {
1042
+ eg .Go (func () error { return deleteWithPrefix (ctx , layout .EntriesPath (ri .Index , 0 )+ ".p/" ) })
1043
+ d ++
1044
+ fromSize += uint64 (ri .N )
1045
+ }
1046
+ eg .Go (func () error { return deleteWithPrefix (ctx , layout .TilePath (l , ri .Index , 0 )+ ".p/" ) })
1047
+ d ++
1048
+ }
1049
+ }
1050
+ if err := eg .Wait (); err != nil {
1051
+ return fmt .Errorf ("failed to delete one or more objects: %v" , err )
1052
+ }
1053
+
1054
+ if err := txn .BufferWrite ([]* spanner.Mutation {spanner .Update ("GCCoord" , []string {"id" , "fromSize" }, []any {0 , int64 (fromSize )})}); err != nil {
1055
+ return err
1056
+ }
1057
+
1058
+ return nil
1059
+ })
1060
+ return err
1061
+ }
1062
+
952
1063
// gcsStorage knows how to store and retrieve objects from GCS.
953
1064
type gcsStorage struct {
954
1065
bucket string
@@ -1038,6 +1149,37 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte,
1038
1149
return nil
1039
1150
}
1040
1151
1152
+ // deleteObjectsWithPrefix removes any objects with the provided prefix from GCS.
1153
+ func (s * gcsStorage ) deleteObjectsWithPrefix (ctx context.Context , objPrefix string ) error {
1154
+ ctx , span := tracer .Start (ctx , "tessera.storage.gcp.deleteObject" )
1155
+ defer span .End ()
1156
+
1157
+ if s .bucketPrefix != "" {
1158
+ objPrefix = filepath .Join (s .bucketPrefix , objPrefix )
1159
+ }
1160
+ span .SetAttributes (objectPathKey .String (objPrefix ))
1161
+
1162
+ bkt := s .gcsClient .Bucket (s .bucket )
1163
+
1164
+ errs := []error (nil )
1165
+ it := bkt .Objects (ctx , & gcs.Query {Prefix : objPrefix })
1166
+ for {
1167
+ attr , err := it .Next ()
1168
+ if err != nil {
1169
+ if err == iterator .Done {
1170
+ break
1171
+ }
1172
+ return err
1173
+ }
1174
+ klog .V (2 ).Infof ("Deleting object %s" , attr .Name )
1175
+ if err := bkt .Object (attr .Name ).Delete (ctx ); err != nil {
1176
+ errs = append (errs , err )
1177
+ }
1178
+ }
1179
+
1180
+ return errors .Join (errs ... )
1181
+ }
1182
+
1041
1183
// MigrationWriter creates a new GCP storage for the MigrationTarget lifecycle mode.
1042
1184
func (s * Storage ) MigrationWriter (ctx context.Context , opts * tessera.MigrationOptions ) (migrate.MigrationWriter , tessera.LogReader , error ) {
1043
1185
c , err := gcs .NewClient (ctx , gcs .WithJSONReads ())
0 commit comments