diff --git a/append_lifecycle.go b/append_lifecycle.go index ef7073d8..a47263e6 100644 --- a/append_lifecycle.go +++ b/append_lifecycle.go @@ -47,6 +47,8 @@ const ( DefaultCheckpointInterval = 10 * time.Second // DefaultPushbackMaxOutstanding is used by storage implementations if no WithPushback option is provided when instantiating it. DefaultPushbackMaxOutstanding = 4096 + // DefaultGarbageCollectionInterval is the default value used if no WithGarbageCollectionInterval option is provided. + DefaultGarbageCollectionInterval = time.Minute ) var ( @@ -486,13 +488,14 @@ func (o *AppendOptions) WithAntispam(inMemEntries uint, as Antispam) *AppendOpti func NewAppendOptions() *AppendOptions { return &AppendOptions{ - batchMaxSize: DefaultBatchMaxSize, - batchMaxAge: DefaultBatchMaxAge, - entriesPath: layout.EntriesPath, - bundleIDHasher: defaultIDHasher, - checkpointInterval: DefaultCheckpointInterval, - addDecorators: make([]func(AddFn) AddFn, 0), - pushbackMaxOutstanding: DefaultPushbackMaxOutstanding, + batchMaxSize: DefaultBatchMaxSize, + batchMaxAge: DefaultBatchMaxAge, + entriesPath: layout.EntriesPath, + bundleIDHasher: defaultIDHasher, + checkpointInterval: DefaultCheckpointInterval, + addDecorators: make([]func(AddFn) AddFn, 0), + pushbackMaxOutstanding: DefaultPushbackMaxOutstanding, + garbageCollectionInterval: DefaultGarbageCollectionInterval, } } @@ -517,6 +520,9 @@ type AppendOptions struct { addDecorators []func(AddFn) AddFn followers []stream.Follower + + // garbageCollectionInterval of zero should be interpreted as requesting garbage collection to be disabled. + garbageCollectionInterval time.Duration } // valid returns an error if an invalid combination of options has been set, or nil otherwise. @@ -578,6 +584,10 @@ func (o AppendOptions) CheckpointInterval() time.Duration { return o.checkpointInterval } +func (o AppendOptions) GarbageCollectionInterval() time.Duration { + return o.garbageCollectionInterval +} + // WithCheckpointSigner is an option for setting the note signer and verifier to use when creating and parsing checkpoints. // This option is mandatory for creating logs where the checkpoint is signed locally, e.g. in // the Appender mode. This does not need to be provided where the storage will be used to mirror @@ -702,3 +712,12 @@ type WitnessOptions struct { // and will be disabled and/or removed in the future. FailOpen bool } + +// WithGarbageCollectionInterval allows the interval between scans to remove obsolete partial +// tiles and entry bundles. +// +// Setting to zero disables garbage collection. +func (o *AppendOptions) WithGarbageCollectionInterval(interval time.Duration) *AppendOptions { + o.garbageCollectionInterval = interval + return o +} diff --git a/storage/gcp/README.md b/storage/gcp/README.md index 7e972264..ce97391f 100644 --- a/storage/gcp/README.md +++ b/storage/gcp/README.md @@ -38,6 +38,10 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t This table is used to coordinate publication of new checkpoints, ensuring that checkpoints are not published more frequently than configured. +### `GCCoord` +This table is used to coordinate garbage collection of partial tiles and entry bundles which have been +make obsolete by the continued growth of the log. + ## Life of a leaf 1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func. diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index adf71f47..70a660bd 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -55,6 +55,7 @@ import ( "github.com/transparency-dev/tessera/api/layout" "github.com/transparency-dev/tessera/internal/migrate" "github.com/transparency-dev/tessera/internal/otel" + "github.com/transparency-dev/tessera/internal/parse" "github.com/transparency-dev/tessera/internal/stream" storage "github.com/transparency-dev/tessera/storage/internal" "golang.org/x/sync/errgroup" @@ -112,6 +113,8 @@ type sequencer interface { nextIndex(ctx context.Context) (uint64, error) // publishCheckpoint coordinates the publication of new checkpoints based on the current integrated tree. publishCheckpoint(ctx context.Context, minAge time.Duration, f func(ctx context.Context, size uint64, root []byte) error) error + // garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation. + garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error) error } // consumeFunc is the signature of a function which can consume entries from the sequencer and integrate @@ -196,29 +199,41 @@ func (lr *LogReader) StreamEntries(ctx context.Context, startEntry, N uint64) it return stream.EntryBundles(ctx, numWorkers, lr.integratedSize, lr.lrs.getEntryBundle, startEntry, N) } +// Appender creates a new tessera.Appender lifecycle object. func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*tessera.Appender, tessera.LogReader, error) { - - if opts.CheckpointInterval() < minCheckpointInterval { - return nil, nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opts.CheckpointInterval(), minCheckpointInterval) - } - c, err := gcs.NewClient(ctx, gcs.WithJSONReads()) if err != nil { return nil, nil, fmt.Errorf("failed to create GCS client: %v", err) } + gs := &gcsStorage{ + gcsClient: c, + bucket: s.cfg.Bucket, + bucketPrefix: s.cfg.BucketPrefix, + } seq, err := newSpannerCoordinator(ctx, s.cfg.Spanner, uint64(opts.PushbackMaxOutstanding())) if err != nil { return nil, nil, fmt.Errorf("failed to create Spanner coordinator: %v", err) } + a, lr, err := s.newAppender(ctx, gs, seq, opts) + if err != nil { + return nil, nil, err + } + return &tessera.Appender{ + Add: a.Add, + }, lr, nil +} + +// newAppender creates and initialises a tessera.Appender struct with the provided underlying storage implementations. +func (s *Storage) newAppender(ctx context.Context, o objStore, seq *spannerCoordinator, opts *tessera.AppendOptions) (*Appender, tessera.LogReader, error) { + if opts.CheckpointInterval() < minCheckpointInterval { + return nil, nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opts.CheckpointInterval(), minCheckpointInterval) + } + a := &Appender{ logStore: &logResourceStore{ - objStore: &gcsStorage{ - gcsClient: c, - bucket: s.cfg.Bucket, - bucketPrefix: s.cfg.BucketPrefix, - }, + objStore: o, entriesPath: opts.EntriesPath(), }, sequencer: seq, @@ -244,10 +259,11 @@ func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*t go a.integrateEntriesJob(ctx) go a.publishCheckpointJob(ctx, opts.CheckpointInterval()) + if i := opts.GarbageCollectionInterval(); i > 0 { + go a.garbageCollectorJob(ctx, i) + } - return &tessera.Appender{ - Add: a.Add, - }, reader, nil + return a, reader, nil } // Appender is an implementation of the Tessera appender lifecycle contract. @@ -327,6 +343,46 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, i time.Duration) { } } +// garbageCollectorJob is a long-running function which handles the removal of obsolete partial tiles +// and entry bundles. +// Blocks until ctx is done. +func (a *Appender) garbageCollectorJob(ctx context.Context, i time.Duration) { + t := time.NewTicker(i) + defer t.Stop() + + // Entirely arbitrary number. + maxBundlesPerRun := uint(100) + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } + func() { + ctx, span := tracer.Start(ctx, "tessera.storage.gcp.garbageCollectTask") + defer span.End() + + // Figure out the size of the latest published checkpoint - we can't be removing partial tiles implied by + // that checkpoint just because we've done an integration and know about a larger (but as yet unpublished) + // checkpoint! + cp, err := a.logStore.getCheckpoint(ctx) + if err != nil { + klog.Warningf("Failed to get published checkpoint: %v", err) + } + _, pubSize, _, err := parse.CheckpointUnsafe(cp) + if err != nil { + klog.Warningf("Failed to parse published checkpoint: %v", err) + } + + if err := a.sequencer.garbageCollect(ctx, pubSize, maxBundlesPerRun, a.logStore.objStore.deleteObjectsWithPrefix); err != nil { + klog.Warningf("GarbageCollect failed: %v", err) + } + }() + } + +} + // init ensures that the storage represents a log in a valid state. func (a *Appender) init(ctx context.Context) error { if _, err := a.logStore.getCheckpoint(ctx); err != nil { @@ -375,6 +431,7 @@ func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []by type objStore interface { getObject(ctx context.Context, obj string) ([]byte, int64, error) setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error + deleteObjectsWithPrefix(ctx context.Context, prefix string) error } // logResourceStore knows how to read and write entries which represent a tiles log inside an objStore. @@ -670,12 +727,14 @@ func (s *spannerCoordinator) initDB(ctx context.Context, spannerDB string) error "CREATE TABLE IF NOT EXISTS Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq)", "CREATE TABLE IF NOT EXISTS IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL, rootHash BYTES(32)) PRIMARY KEY (id)", "CREATE TABLE IF NOT EXISTS PubCoord (id INT64 NOT NULL, publishedAt TIMESTAMP NOT NULL) PRIMARY KEY (id)", + "CREATE TABLE IF NOT EXISTS GCCoord (id INT64 NOT NULL, fromSize INT64 NOT NULL) PRIMARY KEY (id)", }, [][]*spanner.Mutation{ {spanner.Insert("Tessera", []string{"id", "compatibilityVersion"}, []any{0, SchemaCompatibilityVersion})}, {spanner.Insert("SeqCoord", []string{"id", "next"}, []any{0, 0})}, {spanner.Insert("IntCoord", []string{"id", "seq", "rootHash"}, []any{0, 0, rfc6962.DefaultHasher.EmptyRoot()})}, {spanner.Insert("PubCoord", []string{"id", "publishedAt"}, []any{0, time.Unix(0, 0)})}, + {spanner.Insert("GCCoord", []string{"id", "fromSize"}, []any{0, 0})}, }, ) } @@ -971,6 +1030,75 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minAge time. return nil } +// garbageCollect will identify up to maxBundles unneeded partial entry bundles (and any unneeded partial tiles which sit above them in the tree) and +// call the provided function to remove them. +// +// Uses the `GCCoord` table to ensure that only one binary is actively garbage collecting at any given time, and to track progress so that we don't +// needlessly attempt to GC over regions which have already been cleaned. +func (s *spannerCoordinator) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint, deleteWithPrefix func(ctx context.Context, prefix string) error) error { + _, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + row, err := txn.ReadRowWithOptions(ctx, "GCCoord", spanner.Key{0}, []string{"fromSize"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE}) + if err != nil { + return fmt.Errorf("failed to read GCCoord: %w", err) + } + var fs int64 + if err := row.Columns(&fs); err != nil { + return fmt.Errorf("failed to parse row contents: %v", err) + } + fromSize := uint64(fs) + + if fromSize == treeSize { + return nil + } + + d := uint(0) + eg := errgroup.Group{} + // GC the tree in "vertical" chunks defined by entry bundles. + for ri := range layout.Range(fromSize, treeSize-fromSize, treeSize) { + // Only known-full bundles are in-scope for for GC, so exit if the current bundle is partial or + // we've reached our limit of chunks. + if ri.Partial > 0 || d > maxBundles { + break + } + + // GC any partial versions of the entry bundle itself and the tile which sits immediately above it. + eg.Go(func() error { return deleteWithPrefix(ctx, layout.EntriesPath(ri.Index, 0)+".p/") }) + eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(0, ri.Index, 0)+".p/") }) + fromSize += uint64(ri.N) + d++ + + // Now consider (only) the part of the tree which sits above the bundle. + // We'll walk up the parent tiles for as a long as we're tracing the right-hand + // edge of a perfect subtree. + // This gives the property we'll only visit each parent tile once, rather than up to 256 times. + pL, pIdx := uint64(0), ri.Index + for isLastLeafInParent(pIdx) { + // Move our coordinates up to the parent + pL, pIdx = pL+1, pIdx>>layout.TileHeight + // GC any partial versions of the parent tile. + eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(pL, pIdx, 0)+".p/") }) + + } + } + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to delete one or more objects: %v", err) + } + + if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("GCCoord", []string{"id", "fromSize"}, []any{0, int64(fromSize)})}); err != nil { + return err + } + + return nil + }) + return err +} + +// isLastLeafInParent returns true if a tile with the provided index is the final child node of a +// (hypothetical) full parent tile. +func isLastLeafInParent(i uint64) bool { + return i%layout.TileWidth == layout.TileWidth-1 +} + // gcsStorage knows how to store and retrieve objects from GCS. type gcsStorage struct { bucket string @@ -1060,6 +1188,37 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, return nil } +// deleteObjectsWithPrefix removes any objects with the provided prefix from GCS. +func (s *gcsStorage) deleteObjectsWithPrefix(ctx context.Context, objPrefix string) error { + ctx, span := tracer.Start(ctx, "tessera.storage.gcp.deleteObject") + defer span.End() + + if s.bucketPrefix != "" { + objPrefix = filepath.Join(s.bucketPrefix, objPrefix) + } + span.SetAttributes(objectPathKey.String(objPrefix)) + + bkt := s.gcsClient.Bucket(s.bucket) + + errs := []error(nil) + it := bkt.Objects(ctx, &gcs.Query{Prefix: objPrefix}) + for { + attr, err := it.Next() + if err != nil { + if err == iterator.Done { + break + } + return err + } + klog.V(2).Infof("Deleting object %s", attr.Name) + if err := bkt.Object(attr.Name).Delete(ctx); err != nil { + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} + // MigrationWriter creates a new GCP storage for the MigrationTarget lifecycle mode. func (s *Storage) MigrationWriter(ctx context.Context, opts *tessera.MigrationOptions) (migrate.MigrationWriter, tessera.LogReader, error) { c, err := gcs.NewClient(ctx, gcs.WithJSONReads()) diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index df13d2ba..5603772d 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -20,8 +20,10 @@ import ( "crypto/sha256" "errors" "fmt" + "log" "os" "reflect" + "strings" "sync" "sync/atomic" "testing" @@ -31,10 +33,13 @@ import ( "cloud.google.com/go/spanner/spannertest" gcs "cloud.google.com/go/storage" "github.com/google/go-cmp/cmp" + "github.com/transparency-dev/merkle/rfc6962" "github.com/transparency-dev/tessera" "github.com/transparency-dev/tessera/api" "github.com/transparency-dev/tessera/api/layout" + "github.com/transparency-dev/tessera/internal/fsck" storage "github.com/transparency-dev/tessera/storage/internal" + "golang.org/x/mod/sumdb/note" ) func newSpannerDB(t *testing.T) func() { @@ -494,6 +499,109 @@ func TestPublishTree(t *testing.T) { } } +func TestGarbageCollect(t *testing.T) { + ctx := t.Context() + batchSize := uint64(60000) + integrateEvery := uint64(31234) + + closeDB := newSpannerDB(t) + defer closeDB() + + s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", batchSize) + if err != nil { + t.Fatalf("newSpannerCoordinator: %v", err) + } + defer s.dbPool.Close() + + sk, vk := mustGenerateKeys(t) + + m := newMemObjStore() + storage := &Storage{} + + opts := tessera.NewAppendOptions(). + WithCheckpointInterval(1200*time.Millisecond). + WithBatching(uint(batchSize), 100*time.Millisecond). + // Disable GC so we can manually invoke below. + WithGarbageCollectionInterval(time.Duration(0)). + WithCheckpointSigner(sk) + appender, lr, err := storage.newAppender(ctx, m, s, opts) + if err != nil { + t.Fatalf("newAppender: %v", err) + } + if err := appender.publishCheckpoint(ctx, 0, []byte("")); err != nil { + t.Fatalf("publishCheckpoint: %v", err) + } + + // Build a reasonably-sized tree with a bunch of partial resouces present, and wait for + // it to be published. + treeSize := uint64(256 * 384) + + a := tessera.NewPublicationAwaiter(ctx, lr.ReadCheckpoint, 100*time.Millisecond) + + // grow and garbage collect the tree several times to check continued correct operation over lifetime of the log + for size := uint64(0); size < treeSize; { + t.Logf("Adding entries from %d", size) + for range batchSize { + f := appender.Add(ctx, tessera.NewEntry(fmt.Appendf(nil, "entry %d", size))) + if size%integrateEvery == 0 { + t.Logf("Awaiting entry %d", size) + if _, _, err := a.Await(ctx, f); err != nil { + t.Fatalf("Await: %v", err) + } + } + size++ + } + t.Logf("Awaiting tree at size %d", size) + if _, _, err := a.Await(ctx, func() (tessera.Index, error) { return tessera.Index{Index: size - 1}, nil }); err != nil { + t.Fatalf("Await final tree: %v", err) + } + + t.Logf("Running GC at size %d", size) + if err := s.garbageCollect(ctx, size, 1000, m.deleteObjectsWithPrefix); err != nil { + t.Fatalf("garbageCollect: %v", err) + } + + // Compare any remaining partial resources to the list of places + // we'd expect them to be, given the tree size. + wantPartialPrefixes := make(map[string]struct{}) + for _, p := range expectedPartialPrefixes(size) { + wantPartialPrefixes[p] = struct{}{} + } + for k := range m.mem { + if strings.Contains(k, ".p/") { + p := strings.SplitAfter(k, ".p/")[0] + if _, ok := wantPartialPrefixes[p]; !ok { + t.Errorf("Found unwanted partial: %s", k) + } + } + } + } + + // And finally, for good measure, assert that all the resources implied by the log's checkpoint + // are present. + if err := fsck.Check(ctx, vk.Name(), vk, lr, 1, defaultMerkleLeafHasher); err != nil { + t.Fatalf("FSCK failed: %v", err) + } +} + +// expectedPartialPrefixes returns a slice containing resource prefixes where it's acceptable for a +// tree of the provided size to have partial resources. +// +// These are really just the right-hand tiles/entry bundle in the tree. +func expectedPartialPrefixes(size uint64) []string { + r := []string{} + for l, c := uint64(0), size; c > 0; l, c = l+1, c>>8 { + idx, p := c/256, c%256 + if p != 0 { + if l == 0 { + r = append(r, layout.EntriesPath(idx, 0)+".p/") + } + r = append(r, layout.TilePath(l, idx, 0)+".p/") + } + } + return r +} + type memObjStore struct { sync.RWMutex mem map[string][]byte @@ -533,3 +641,46 @@ func (m *memObjStore) setObject(_ context.Context, obj string, data []byte, cond m.mem[obj] = data return nil } + +func (m *memObjStore) deleteObjectsWithPrefix(_ context.Context, prefix string) error { + m.Lock() + defer m.Unlock() + + for k := range m.mem { + if strings.HasPrefix(k, prefix) { + log.Printf("DELETE: %s", k) + delete(m.mem, k) + } + } + return nil +} + +func mustGenerateKeys(t *testing.T) (note.Signer, note.Verifier) { + sk, vk, err := note.GenerateKey(nil, "testlog") + if err != nil { + t.Fatalf("GenerateKey: %v", err) + } + s, err := note.NewSigner(sk) + if err != nil { + t.Fatalf("NewSigner: %v", err) + } + v, err := note.NewVerifier(vk) + if err != nil { + t.Fatalf("NewVerifier: %v", err) + } + return s, v +} + +// defaultMerkleLeafHasher parses a C2SP tlog-tile bundle and returns the Merkle leaf hashes of each entry it contains. +func defaultMerkleLeafHasher(bundle []byte) ([][]byte, error) { + eb := &api.EntryBundle{} + if err := eb.UnmarshalText(bundle); err != nil { + return nil, fmt.Errorf("unmarshal: %v", err) + } + r := make([][]byte, 0, len(eb.Entries)) + for _, e := range eb.Entries { + h := rfc6962.DefaultHasher.HashLeaf(e) + r = append(r, h[:]) + } + return r, nil +}