@@ -55,6 +55,7 @@ import (
55
55
"github.com/transparency-dev/tessera/api/layout"
56
56
"github.com/transparency-dev/tessera/internal/migrate"
57
57
"github.com/transparency-dev/tessera/internal/otel"
58
+ "github.com/transparency-dev/tessera/internal/parse"
58
59
"github.com/transparency-dev/tessera/internal/stream"
59
60
storage "github.com/transparency-dev/tessera/storage/internal"
60
61
"golang.org/x/sync/errgroup"
@@ -112,6 +113,8 @@ type sequencer interface {
112
113
nextIndex (ctx context.Context ) (uint64 , error )
113
114
// publishTree coordinates the publication of new checkpoints based on the current integrated tree.
114
115
publishTree (ctx context.Context , minAge time.Duration , f func (ctx context.Context , size uint64 , root []byte ) error ) error
116
+ // garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation.
117
+ garbageCollect (ctx context.Context , treeSize uint64 , maxDeletes uint , removePrefix func (ctx context.Context , prefix string ) error ) error
115
118
}
116
119
117
120
// consumeFunc is the signature of a function which can consume entries from the sequencer and integrate
@@ -196,29 +199,41 @@ func (lr *LogReader) StreamEntries(ctx context.Context, startEntry, N uint64) it
196
199
return stream .EntryBundles (ctx , numWorkers , lr .integratedSize , lr .lrs .getEntryBundle , startEntry , N )
197
200
}
198
201
202
+ // Appender creates a new tessera.Appender lifecycle object.
199
203
func (s * Storage ) Appender (ctx context.Context , opts * tessera.AppendOptions ) (* tessera.Appender , tessera.LogReader , error ) {
200
-
201
- if opts .CheckpointInterval () < minCheckpointInterval {
202
- return nil , nil , fmt .Errorf ("requested CheckpointInterval (%v) is less than minimum permitted %v" , opts .CheckpointInterval (), minCheckpointInterval )
203
- }
204
-
205
204
c , err := gcs .NewClient (ctx , gcs .WithJSONReads ())
206
205
if err != nil {
207
206
return nil , nil , fmt .Errorf ("failed to create GCS client: %v" , err )
208
207
}
208
+ gs := & gcsStorage {
209
+ gcsClient : c ,
210
+ bucket : s .cfg .Bucket ,
211
+ bucketPrefix : s .cfg .BucketPrefix ,
212
+ }
209
213
210
214
seq , err := newSpannerCoordinator (ctx , s .cfg .Spanner , uint64 (opts .PushbackMaxOutstanding ()))
211
215
if err != nil {
212
216
return nil , nil , fmt .Errorf ("failed to create Spanner coordinator: %v" , err )
213
217
}
214
218
219
+ a , lr , err := s .newAppender (ctx , gs , seq , opts )
220
+ if err != nil {
221
+ return nil , nil , err
222
+ }
223
+ return & tessera.Appender {
224
+ Add : a .Add ,
225
+ }, lr , nil
226
+ }
227
+
228
+ // newAppender creates and initialises a tessera.Appender struct with the provided underlying storage implementations.
229
+ func (s * Storage ) newAppender (ctx context.Context , o objStore , seq * spannerCoordinator , opts * tessera.AppendOptions ) (* Appender , tessera.LogReader , error ) {
230
+ if opts .CheckpointInterval () < minCheckpointInterval {
231
+ return nil , nil , fmt .Errorf ("requested CheckpointInterval (%v) is less than minimum permitted %v" , opts .CheckpointInterval (), minCheckpointInterval )
232
+ }
233
+
215
234
a := & Appender {
216
235
logStore : & logResourceStore {
217
- objStore : & gcsStorage {
218
- gcsClient : c ,
219
- bucket : s .cfg .Bucket ,
220
- bucketPrefix : s .cfg .BucketPrefix ,
221
- },
236
+ objStore : o ,
222
237
entriesPath : opts .EntriesPath (),
223
238
},
224
239
sequencer : seq ,
@@ -244,10 +259,11 @@ func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*t
244
259
245
260
go a .sequencerJob (ctx )
246
261
go a .publisherJob (ctx , opts .CheckpointInterval ())
262
+ if i := opts .GarbageCollectionInterval (); i > 0 {
263
+ go a .garbageCollectorJob (ctx , i )
264
+ }
247
265
248
- return & tessera.Appender {
249
- Add : a .Add ,
250
- }, reader , nil
266
+ return a , reader , nil
251
267
}
252
268
253
269
// Appender is an implementation of the Tessera appender lifecycle contract.
@@ -324,6 +340,42 @@ func (a *Appender) publisherJob(ctx context.Context, i time.Duration) {
324
340
}
325
341
}
326
342
343
+ func (a * Appender ) garbageCollectorJob (ctx context.Context , i time.Duration ) {
344
+ t := time .NewTicker (i )
345
+ defer t .Stop ()
346
+
347
+ maxDeletesPerRun := uint (1024 )
348
+
349
+ for {
350
+ select {
351
+ case <- ctx .Done ():
352
+ return
353
+ case <- t .C :
354
+ }
355
+ func () {
356
+ ctx , span := tracer .Start (ctx , "tessera.storage.gcp.garbageCollectTask" )
357
+ defer span .End ()
358
+
359
+ // Figure out the size of the latest published checkpoint - we can't be removing partial tiles implied by
360
+ // that checkpoint just because we've done an integration and know about a larger (but as yet unpublished)
361
+ // checkpoint!
362
+ cp , err := a .logStore .getCheckpoint (ctx )
363
+ if err != nil {
364
+ klog .Warningf ("Failed to get published checkpoint: %v" , err )
365
+ }
366
+ _ , pubSize , _ , err := parse .CheckpointUnsafe (cp )
367
+ if err != nil {
368
+ klog .Warningf ("Failed to parse published checkpoint: %v" , err )
369
+ }
370
+
371
+ if err := a .sequencer .garbageCollect (ctx , pubSize , maxDeletesPerRun , a .logStore .objStore .deleteObjectsWithPrefix ); err != nil {
372
+ klog .Warningf ("GarbageCollect failed: %v" , err )
373
+ }
374
+ }()
375
+ }
376
+
377
+ }
378
+
327
379
// init ensures that the storage represents a log in a valid state.
328
380
func (a * Appender ) init (ctx context.Context ) error {
329
381
if _ , err := a .logStore .getCheckpoint (ctx ); err != nil {
@@ -372,6 +424,7 @@ func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []by
372
424
type objStore interface {
373
425
getObject (ctx context.Context , obj string ) ([]byte , int64 , error )
374
426
setObject (ctx context.Context , obj string , data []byte , cond * gcs.Conditions , contType string , cacheCtl string ) error
427
+ deleteObjectsWithPrefix (ctx context.Context , prefix string ) error
375
428
}
376
429
377
430
// logResourceStore knows how to read and write entries which represent a tiles log inside an objStore.
@@ -665,12 +718,14 @@ func (s *spannerCoordinator) initDB(ctx context.Context, spannerDB string) error
665
718
"CREATE TABLE IF NOT EXISTS Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq)" ,
666
719
"CREATE TABLE IF NOT EXISTS IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL, rootHash BYTES(32)) PRIMARY KEY (id)" ,
667
720
"CREATE TABLE IF NOT EXISTS PubCoord (id INT64 NOT NULL, publishedAt TIMESTAMP NOT NULL) PRIMARY KEY (id)" ,
721
+ "CREATE TABLE IF NOT EXISTS GCCoord (id INT64 NOT NULL, fromSize INT64 NOT NULL) PRIMARY KEY (id)" ,
668
722
},
669
723
[][]* spanner.Mutation {
670
724
{spanner .Insert ("Tessera" , []string {"id" , "compatibilityVersion" }, []any {0 , SchemaCompatibilityVersion })},
671
725
{spanner .Insert ("SeqCoord" , []string {"id" , "next" }, []any {0 , 0 })},
672
726
{spanner .Insert ("IntCoord" , []string {"id" , "seq" , "rootHash" }, []any {0 , 0 , rfc6962 .DefaultHasher .EmptyRoot ()})},
673
727
{spanner .Insert ("PubCoord" , []string {"id" , "publishedAt" }, []any {0 , time .Unix (0 , 0 )})},
728
+ {spanner .Insert ("GCCoord" , []string {"id" , "fromSize" }, []any {0 , 0 })},
674
729
},
675
730
)
676
731
}
@@ -949,6 +1004,58 @@ func (s *spannerCoordinator) publishTree(ctx context.Context, minAge time.Durati
949
1004
return nil
950
1005
}
951
1006
1007
+ // garbageCollect is a long running function which will identify unneeded partial tiles/entry bundles, and call the provided function to remove them.
1008
+ //
1009
+ // Uses the `GCCoord` table to ensure that only one binary is actively garbage collecting at any given time, and to track progress so that we don't
1010
+ // needlessly attempt to GC over regions which have already been cleaned.
1011
+ //
1012
+ // Returns true if we've "caught up" with the current state of the tree.
1013
+ func (s * spannerCoordinator ) garbageCollect (ctx context.Context , treeSize uint64 , maxDeletes uint , deleteWithPrefix func (ctx context.Context , prefix string ) error ) error {
1014
+ _ , err := s .dbPool .ReadWriteTransaction (ctx , func (ctx context.Context , txn * spanner.ReadWriteTransaction ) error {
1015
+ row , err := txn .ReadRowWithOptions (ctx , "GCCoord" , spanner.Key {0 }, []string {"fromSize" }, & spanner.ReadOptions {LockHint : spannerpb .ReadRequest_LOCK_HINT_EXCLUSIVE })
1016
+ if err != nil {
1017
+ return fmt .Errorf ("failed to read GCCoord: %w" , err )
1018
+ }
1019
+ var fs int64
1020
+ if err := row .Columns (& fs ); err != nil {
1021
+ return fmt .Errorf ("failed to parse row contents: %v" , err )
1022
+ }
1023
+ fromSize := uint64 (fs )
1024
+
1025
+ if fromSize == treeSize {
1026
+ return nil
1027
+ }
1028
+
1029
+ d := uint (0 )
1030
+ eg := errgroup.Group {}
1031
+ done:
1032
+ for l , f , x := uint64 (0 ), fromSize , treeSize ; x > 0 ; l , f , x = l + 1 , f >> layout .TileHeight , x >> layout .TileHeight {
1033
+ for ri := range layout .Range (f , x - f , x ) {
1034
+ if ri .Partial != 0 || d > maxDeletes {
1035
+ break done
1036
+ }
1037
+ if l == 0 {
1038
+ eg .Go (func () error { return deleteWithPrefix (ctx , layout .EntriesPath (ri .Index , 0 )+ ".p/" ) })
1039
+ d ++
1040
+ fromSize += uint64 (ri .N )
1041
+ }
1042
+ eg .Go (func () error { return deleteWithPrefix (ctx , layout .TilePath (l , ri .Index , 0 )+ ".p/" ) })
1043
+ d ++
1044
+ }
1045
+ }
1046
+ if err := eg .Wait (); err != nil {
1047
+ return fmt .Errorf ("failed to delete one or more objects: %v" , err )
1048
+ }
1049
+
1050
+ if err := txn .BufferWrite ([]* spanner.Mutation {spanner .Update ("GCCoord" , []string {"id" , "fromSize" }, []any {0 , int64 (fromSize )})}); err != nil {
1051
+ return err
1052
+ }
1053
+
1054
+ return nil
1055
+ })
1056
+ return err
1057
+ }
1058
+
952
1059
// gcsStorage knows how to store and retrieve objects from GCS.
953
1060
type gcsStorage struct {
954
1061
bucket string
@@ -1038,6 +1145,37 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte,
1038
1145
return nil
1039
1146
}
1040
1147
1148
+ // deleteObjectsWithPrefix removes any objects with the provided prefix from GCS.
1149
+ func (s * gcsStorage ) deleteObjectsWithPrefix (ctx context.Context , objPrefix string ) error {
1150
+ ctx , span := tracer .Start (ctx , "tessera.storage.gcp.deleteObject" )
1151
+ defer span .End ()
1152
+
1153
+ if s .bucketPrefix != "" {
1154
+ objPrefix = filepath .Join (s .bucketPrefix , objPrefix )
1155
+ }
1156
+ span .SetAttributes (objectPathKey .String (objPrefix ))
1157
+
1158
+ bkt := s .gcsClient .Bucket (s .bucket )
1159
+
1160
+ errs := []error (nil )
1161
+ it := bkt .Objects (ctx , & gcs.Query {Prefix : objPrefix })
1162
+ for {
1163
+ attr , err := it .Next ()
1164
+ if err != nil {
1165
+ if err == iterator .Done {
1166
+ break
1167
+ }
1168
+ return err
1169
+ }
1170
+ klog .V (2 ).Infof ("Deleting object %s" , attr .Name )
1171
+ if err := bkt .Object (attr .Name ).Delete (ctx ); err != nil {
1172
+ errs = append (errs , err )
1173
+ }
1174
+ }
1175
+
1176
+ return errors .Join (errs ... )
1177
+ }
1178
+
1041
1179
// MigrationWriter creates a new GCP storage for the MigrationTarget lifecycle mode.
1042
1180
func (s * Storage ) MigrationWriter (ctx context.Context , opts * tessera.MigrationOptions ) (migrate.MigrationWriter , tessera.LogReader , error ) {
1043
1181
c , err := gcs .NewClient (ctx , gcs .WithJSONReads ())
0 commit comments