Skip to content

[GCP] Garbage collection #668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions append_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DefaultCheckpointInterval = 10 * time.Second
// DefaultPushbackMaxOutstanding is used by storage implementations if no WithPushback option is provided when instantiating it.
DefaultPushbackMaxOutstanding = 4096
// DefaultGarbageCollectionInterval is the default value used if no WithGarbageCollectionInterval option is provided.
DefaultGarbageCollectionInterval = time.Minute
)

var (
Expand Down Expand Up @@ -480,13 +482,14 @@ func (o *AppendOptions) WithAntispam(inMemEntries uint, as Antispam) *AppendOpti

func NewAppendOptions() *AppendOptions {
return &AppendOptions{
batchMaxSize: DefaultBatchMaxSize,
batchMaxAge: DefaultBatchMaxAge,
entriesPath: layout.EntriesPath,
bundleIDHasher: defaultIDHasher,
checkpointInterval: DefaultCheckpointInterval,
addDecorators: make([]func(AddFn) AddFn, 0),
pushbackMaxOutstanding: DefaultPushbackMaxOutstanding,
batchMaxSize: DefaultBatchMaxSize,
batchMaxAge: DefaultBatchMaxAge,
entriesPath: layout.EntriesPath,
bundleIDHasher: defaultIDHasher,
checkpointInterval: DefaultCheckpointInterval,
addDecorators: make([]func(AddFn) AddFn, 0),
pushbackMaxOutstanding: DefaultPushbackMaxOutstanding,
garbageCollectionInterval: DefaultGarbageCollectionInterval,
}
}

Expand All @@ -511,6 +514,9 @@ type AppendOptions struct {

addDecorators []func(AddFn) AddFn
followers []stream.Follower

// garbageCollectionInterval of zero should be interpreted as requesting garbage collection to be disabled.
garbageCollectionInterval time.Duration
}

// valid returns an error if an invalid combination of options has been set, or nil otherwise.
Expand Down Expand Up @@ -572,6 +578,10 @@ func (o AppendOptions) CheckpointInterval() time.Duration {
return o.checkpointInterval
}

func (o AppendOptions) GarbageCollectionInterval() time.Duration {
return o.garbageCollectionInterval
}

// WithCheckpointSigner is an option for setting the note signer and verifier to use when creating and parsing checkpoints.
// This option is mandatory for creating logs where the checkpoint is signed locally, e.g. in
// the Appender mode. This does not need to be provided where the storage will be used to mirror
Expand Down Expand Up @@ -696,3 +706,12 @@ type WitnessOptions struct {
// and will be disabled and/or removed in the future.
FailOpen bool
}

// WithGarbageCollectionInterval allows the interval between scans to remove obsolete partial
// tiles and entry bundles.
//
// Setting to zero disables garbage collection.
func (o *AppendOptions) WithGarbageCollectionInterval(interval time.Duration) *AppendOptions {
o.garbageCollectionInterval = interval
return o
}
4 changes: 4 additions & 0 deletions storage/gcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t
This table is used to coordinate publication of new checkpoints, ensuring that checkpoints are not published
more frequently than configured.

### `GCCoord`
This table is used to coordinate garbage collection of partial tiles and entry bundles which have been
make obsolete by the continued growth of the log.

## Life of a leaf

1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func.
Expand Down
185 changes: 172 additions & 13 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/transparency-dev/tessera/api/layout"
"github.com/transparency-dev/tessera/internal/migrate"
"github.com/transparency-dev/tessera/internal/otel"
"github.com/transparency-dev/tessera/internal/parse"
"github.com/transparency-dev/tessera/internal/stream"
storage "github.com/transparency-dev/tessera/storage/internal"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -112,6 +113,8 @@ type sequencer interface {
nextIndex(ctx context.Context) (uint64, error)
// publishTree coordinates the publication of new checkpoints based on the current integrated tree.
publishTree(ctx context.Context, minAge time.Duration, f func(ctx context.Context, size uint64, root []byte) error) error
// garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation.
garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error) error
}

// consumeFunc is the signature of a function which can consume entries from the sequencer and integrate
Expand Down Expand Up @@ -196,29 +199,41 @@ func (lr *LogReader) StreamEntries(ctx context.Context, startEntry, N uint64) it
return stream.EntryBundles(ctx, numWorkers, lr.integratedSize, lr.lrs.getEntryBundle, startEntry, N)
}

// Appender creates a new tessera.Appender lifecycle object.
func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*tessera.Appender, tessera.LogReader, error) {

if opts.CheckpointInterval() < minCheckpointInterval {
return nil, nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opts.CheckpointInterval(), minCheckpointInterval)
}

c, err := gcs.NewClient(ctx, gcs.WithJSONReads())
if err != nil {
return nil, nil, fmt.Errorf("failed to create GCS client: %v", err)
}
gs := &gcsStorage{
gcsClient: c,
bucket: s.cfg.Bucket,
bucketPrefix: s.cfg.BucketPrefix,
}

seq, err := newSpannerCoordinator(ctx, s.cfg.Spanner, uint64(opts.PushbackMaxOutstanding()))
if err != nil {
return nil, nil, fmt.Errorf("failed to create Spanner coordinator: %v", err)
}

a, lr, err := s.newAppender(ctx, gs, seq, opts)
if err != nil {
return nil, nil, err
}
return &tessera.Appender{
Add: a.Add,
}, lr, nil
}

// newAppender creates and initialises a tessera.Appender struct with the provided underlying storage implementations.
func (s *Storage) newAppender(ctx context.Context, o objStore, seq *spannerCoordinator, opts *tessera.AppendOptions) (*Appender, tessera.LogReader, error) {
if opts.CheckpointInterval() < minCheckpointInterval {
return nil, nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opts.CheckpointInterval(), minCheckpointInterval)
}

a := &Appender{
logStore: &logResourceStore{
objStore: &gcsStorage{
gcsClient: c,
bucket: s.cfg.Bucket,
bucketPrefix: s.cfg.BucketPrefix,
},
objStore: o,
entriesPath: opts.EntriesPath(),
},
sequencer: seq,
Expand All @@ -244,10 +259,11 @@ func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*t

go a.sequencerJob(ctx)
go a.publisherJob(ctx, opts.CheckpointInterval())
if i := opts.GarbageCollectionInterval(); i > 0 {
go a.garbageCollectorJob(ctx, i)
}

return &tessera.Appender{
Add: a.Add,
}, reader, nil
return a, reader, nil
}

// Appender is an implementation of the Tessera appender lifecycle contract.
Expand Down Expand Up @@ -324,6 +340,46 @@ func (a *Appender) publisherJob(ctx context.Context, i time.Duration) {
}
}

// garbageCollectorJob is a long-running function which handles the removal of obsolete partial tiles
// and entry bundles.
// Blocks until ctx is done.
func (a *Appender) garbageCollectorJob(ctx context.Context, i time.Duration) {
t := time.NewTicker(i)
defer t.Stop()

// Entirely arbitrary number.
maxBundlesPerRun := uint(100)

for {
select {
case <-ctx.Done():
return
case <-t.C:
}
func() {
ctx, span := tracer.Start(ctx, "tessera.storage.gcp.garbageCollectTask")
defer span.End()

// Figure out the size of the latest published checkpoint - we can't be removing partial tiles implied by
// that checkpoint just because we've done an integration and know about a larger (but as yet unpublished)
// checkpoint!
cp, err := a.logStore.getCheckpoint(ctx)
if err != nil {
klog.Warningf("Failed to get published checkpoint: %v", err)
}
_, pubSize, _, err := parse.CheckpointUnsafe(cp)
if err != nil {
klog.Warningf("Failed to parse published checkpoint: %v", err)
}

if err := a.sequencer.garbageCollect(ctx, pubSize, maxBundlesPerRun, a.logStore.objStore.deleteObjectsWithPrefix); err != nil {
klog.Warningf("GarbageCollect failed: %v", err)
}
}()
}

}

// init ensures that the storage represents a log in a valid state.
func (a *Appender) init(ctx context.Context) error {
if _, err := a.logStore.getCheckpoint(ctx); err != nil {
Expand Down Expand Up @@ -372,6 +428,7 @@ func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []by
type objStore interface {
getObject(ctx context.Context, obj string) ([]byte, int64, error)
setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error
deleteObjectsWithPrefix(ctx context.Context, prefix string) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an internal interface, so I'm not overly worried, but this looks like a strange cousin next to these other 2 get/set methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to alternative suggestions, but the thinking was that this enables infra to do this specific task in the best way it can (e.g. POSIX could potentially do an rmdir()) rather than force all to conform to an LCD like findObjectsWithPrefix and deleteObject

}

// logResourceStore knows how to read and write entries which represent a tiles log inside an objStore.
Expand Down Expand Up @@ -665,12 +722,14 @@ func (s *spannerCoordinator) initDB(ctx context.Context, spannerDB string) error
"CREATE TABLE IF NOT EXISTS Seq (id INT64 NOT NULL, seq INT64 NOT NULL, v BYTES(MAX),) PRIMARY KEY (id, seq)",
"CREATE TABLE IF NOT EXISTS IntCoord (id INT64 NOT NULL, seq INT64 NOT NULL, rootHash BYTES(32)) PRIMARY KEY (id)",
"CREATE TABLE IF NOT EXISTS PubCoord (id INT64 NOT NULL, publishedAt TIMESTAMP NOT NULL) PRIMARY KEY (id)",
"CREATE TABLE IF NOT EXISTS GCCoord (id INT64 NOT NULL, fromSize INT64 NOT NULL) PRIMARY KEY (id)",
},
[][]*spanner.Mutation{
{spanner.Insert("Tessera", []string{"id", "compatibilityVersion"}, []any{0, SchemaCompatibilityVersion})},
{spanner.Insert("SeqCoord", []string{"id", "next"}, []any{0, 0})},
{spanner.Insert("IntCoord", []string{"id", "seq", "rootHash"}, []any{0, 0, rfc6962.DefaultHasher.EmptyRoot()})},
{spanner.Insert("PubCoord", []string{"id", "publishedAt"}, []any{0, time.Unix(0, 0)})},
{spanner.Insert("GCCoord", []string{"id", "fromSize"}, []any{0, 0})},
},
)
}
Expand Down Expand Up @@ -949,6 +1008,75 @@ func (s *spannerCoordinator) publishTree(ctx context.Context, minAge time.Durati
return nil
}

// garbageCollect will identify up to maxBundles unneeded partial entry bundles (and any unneeded partial tiles which sit above them in the tree) and
// call the provided function to remove them.
//
// Uses the `GCCoord` table to ensure that only one binary is actively garbage collecting at any given time, and to track progress so that we don't
// needlessly attempt to GC over regions which have already been cleaned.
func (s *spannerCoordinator) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint, deleteWithPrefix func(ctx context.Context, prefix string) error) error {
_, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
row, err := txn.ReadRowWithOptions(ctx, "GCCoord", spanner.Key{0}, []string{"fromSize"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
if err != nil {
return fmt.Errorf("failed to read GCCoord: %w", err)
}
var fs int64
if err := row.Columns(&fs); err != nil {
return fmt.Errorf("failed to parse row contents: %v", err)
}
fromSize := uint64(fs)

if fromSize == treeSize {
return nil
}

d := uint(0)
eg := errgroup.Group{}
// GC the tree in "vertical" chunks defined by entry bundles.
for ri := range layout.Range(fromSize, treeSize-fromSize, treeSize) {
// Only known-full bundles are in-scope for for GC, so exit if the current bundle is partial or
// we've reached our limit of chunks.
if ri.Partial > 0 || d > maxBundles {
break
}

// GC any partial versions of the entry bundle itself and the tile which sits immediately above it.
eg.Go(func() error { return deleteWithPrefix(ctx, layout.EntriesPath(ri.Index, 0)+".p/") })
eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(0, ri.Index, 0)+".p/") })
fromSize += uint64(ri.N)
d++

// Now consider (only) the part of the tree which sits above the bundle.
// We'll walk up the parent tiles for as a long as we're tracing the right-hand
// edge of a perfect subtree.
// This gives the property we'll only visit each parent tile once, rather than up to 256 times.
pL, pIdx := uint64(0), ri.Index
for isLastLeafInParent(pIdx) {
// Move our coordinates up to the parent
pL, pIdx = pL+1, pIdx>>layout.TileHeight
// GC any partial versions of the parent tile.
eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(pL, pIdx, 0)+".p/") })

}
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to delete one or more objects: %v", err)
}

if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("GCCoord", []string{"id", "fromSize"}, []any{0, int64(fromSize)})}); err != nil {
return err
}

return nil
})
return err
}

// isLastLeafInParent returns true if a tile with the provided index is the final child node of a
// (hypothetical) full parent tile.
func isLastLeafInParent(i uint64) bool {
return i%layout.TileWidth == layout.TileWidth-1
}

// gcsStorage knows how to store and retrieve objects from GCS.
type gcsStorage struct {
bucket string
Expand Down Expand Up @@ -1038,6 +1166,37 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte,
return nil
}

// deleteObjectsWithPrefix removes any objects with the provided prefix from GCS.
func (s *gcsStorage) deleteObjectsWithPrefix(ctx context.Context, objPrefix string) error {
ctx, span := tracer.Start(ctx, "tessera.storage.gcp.deleteObject")
defer span.End()

if s.bucketPrefix != "" {
objPrefix = filepath.Join(s.bucketPrefix, objPrefix)
}
span.SetAttributes(objectPathKey.String(objPrefix))

bkt := s.gcsClient.Bucket(s.bucket)

errs := []error(nil)
it := bkt.Objects(ctx, &gcs.Query{Prefix: objPrefix})
for {
attr, err := it.Next()
if err != nil {
if err == iterator.Done {
break
}
return err
}
klog.V(2).Infof("Deleting object %s", attr.Name)
if err := bkt.Object(attr.Name).Delete(ctx); err != nil {
errs = append(errs, err)
}
}

return errors.Join(errs...)
}

// MigrationWriter creates a new GCP storage for the MigrationTarget lifecycle mode.
func (s *Storage) MigrationWriter(ctx context.Context, opts *tessera.MigrationOptions) (migrate.MigrationWriter, tessera.LogReader, error) {
c, err := gcs.NewClient(ctx, gcs.WithJSONReads())
Expand Down
Loading
Loading