Skip to content

Commit

Permalink
Destroy snapshots concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
dsh2dsh committed Dec 31, 2024
1 parent ca9af4c commit 748eb6e
Show file tree
Hide file tree
Showing 16 changed files with 386 additions and 578 deletions.
16 changes: 8 additions & 8 deletions internal/daemon/job/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,10 +756,6 @@ func (j *ActiveSide) replicate(ctx context.Context) error {
}

func (j *ActiveSide) pruneSender(ctx context.Context) error {
log := GetLogger(ctx)
log.With(slog.Int("concurrency", j.prunerFactory.Concurrency())).
Info("start pruning sender")

sender, _ := j.mode.SenderReceiver()
senderOnce := NewSenderOnce(ctx, sender)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
Expand All @@ -768,6 +764,10 @@ func (j *ActiveSide) pruneSender(ctx context.Context) error {
ctx, senderOnce, senderOnce)
})

log := GetLogger(ctx)
log.With(slog.Int("concurrency", tasks.prunerSender.Concurrency())).
Info("start pruning sender")

begin := time.Now()
tasks.prunerSender.Prune()
log.With(slog.Duration("duration", time.Since(begin))).
Expand All @@ -776,17 +776,17 @@ func (j *ActiveSide) pruneSender(ctx context.Context) error {
}

func (j *ActiveSide) pruneReceiver(ctx context.Context) error {
log := GetLogger(ctx)
log.With(slog.Int("concurrency", j.prunerFactory.Concurrency())).
Info("start pruning receiver")

sender, receiver := j.mode.SenderReceiver()
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(
ctx, receiver, sender)
tasks.state = ActiveSidePruneReceiver
})

log := GetLogger(ctx)
log.With(slog.Int("concurrency", tasks.prunerReceiver.Concurrency())).
Info("start pruning receiver")

begin := time.Now()
tasks.prunerReceiver.Prune()
log.With(slog.Duration("duration", time.Since(begin))).
Expand Down
2 changes: 1 addition & 1 deletion internal/daemon/job/snapjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (j *SnapJob) prune(ctx context.Context) {
j.prunerMtx.Unlock()

log := GetLogger(ctx)
log.With(slog.Int("concurrency", j.prunerFactory.Concurrency())).
log.With(slog.Int("concurrency", j.pruner.Concurrency())).
Info("start pruning")
j.pruner.Prune()
log.Info("finished pruning")
Expand Down
95 changes: 40 additions & 55 deletions internal/daemon/pruner/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"runtime"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -14,22 +13,6 @@ import (
"github.com/dsh2dsh/zrepl/internal/util/envconst"
)

type PrunerFactory struct {
concurrency int
senderRules []pruning.KeepRule
receiverRules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs *prometheus.HistogramVec
}

type LocalPrunerFactory struct {
concurrency int
keepRules []pruning.KeepRule
retryWait time.Duration
promPruneSecs *prometheus.HistogramVec
}

func NewLocalPrunerFactory(in config.PruningLocal,
promPruneSecs *prometheus.HistogramVec,
) (*LocalPrunerFactory, error) {
Expand All @@ -47,13 +30,8 @@ func NewLocalPrunerFactory(in config.PruningLocal,
}
}

concurrency := int(in.Concurrency)
if concurrency == 0 {
concurrency = runtime.GOMAXPROCS(0)
}

f := &LocalPrunerFactory{
concurrency: concurrency,
concurrency: int(in.Concurrency),
keepRules: rules,
promPruneSecs: promPruneSecs,

Expand All @@ -63,6 +41,35 @@ func NewLocalPrunerFactory(in config.PruningLocal,
return f, nil
}

type LocalPrunerFactory struct {
concurrency int
keepRules []pruning.KeepRule
retryWait time.Duration
promPruneSecs *prometheus.HistogramVec
}

func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context,
target Target, history Sender,
) *Pruner {
return &Pruner{
args: args{
concurrency: f.concurrency,
ctx: context.WithValue(ctx, contextKeyPruneSide, "local"),
target: target,
sender: history,
rules: f.keepRules,
retryWait: f.retryWait,

// considerSnapAtCursorReplicated is not relevant for local pruning
considerSnapAtCursorReplicated: false,

promPruneSecs: f.promPruneSecs.WithLabelValues("local"),
},
state: Plan,
startedAt: time.Now(),
}
}

func NewPrunerFactory(in config.PruningSenderReceiver,
promPruneSecs *prometheus.HistogramVec,
) (*PrunerFactory, error) {
Expand All @@ -84,13 +91,8 @@ func NewPrunerFactory(in config.PruningSenderReceiver,
}
}

concurrency := int(in.Concurrency)
if concurrency == 0 {
concurrency = runtime.GOMAXPROCS(0)
}

f := &PrunerFactory{
concurrency: concurrency,
concurrency: int(in.Concurrency),
senderRules: keepRulesSender,
receiverRules: keepRulesReceiver,
promPruneSecs: promPruneSecs,
Expand All @@ -103,6 +105,15 @@ func NewPrunerFactory(in config.PruningSenderReceiver,
return f, nil
}

type PrunerFactory struct {
concurrency int
senderRules []pruning.KeepRule
receiverRules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs *prometheus.HistogramVec
}

func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target,
sender Sender,
) *Pruner {
Expand Down Expand Up @@ -144,29 +155,3 @@ func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target,
startedAt: time.Now(),
}
}

func (f *PrunerFactory) Concurrency() int { return f.concurrency }

func (f *LocalPrunerFactory) BuildLocalPruner(ctx context.Context,
target Target, history Sender,
) *Pruner {
return &Pruner{
args: args{
concurrency: f.concurrency,
ctx: context.WithValue(ctx, contextKeyPruneSide, "local"),
target: target,
sender: history,
rules: f.keepRules,
retryWait: f.retryWait,

// considerSnapAtCursorReplicated is not relevant for local pruning
considerSnapAtCursorReplicated: false,

promPruneSecs: f.promPruneSecs.WithLabelValues("local"),
},
state: Plan,
startedAt: time.Now(),
}
}

func (f *LocalPrunerFactory) Concurrency() int { return f.concurrency }
13 changes: 7 additions & 6 deletions internal/daemon/pruner/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (f *fs) Report() FSReport {

func (self *fs) Build(a *args, tfs *pdu.Filesystem, target Target,
sender Sender, needsReplicated bool,
) {
) error {
ctx := a.ctx
l := GetLogger(ctx).With(slog.String("fs", tfs.Path))
l.Debug("plan filesystem")
Expand All @@ -98,7 +98,7 @@ func (self *fs) Build(a *args, tfs *pdu.Filesystem, target Target,
tfsvsres, err := target.ListFilesystemVersions(ctx, &req)
if err != nil {
pfsPlanErrAndLog(err, "cannot list filesystem versions")
return
return nil
}
tfsvs := tfsvsres.GetVersions()
// no progress here since we could run in a live-lock (must have used target
Expand All @@ -118,12 +118,12 @@ func (self *fs) Build(a *args, tfs *pdu.Filesystem, target Target,
resp, err := sender.ReplicationCursor(ctx, &req)
if err != nil {
pfsPlanErrAndLog(err, "cannot get replication cursor bookmark")
return
return nil
} else if resp.GetNotexist() {
err := errors.New(
"replication cursor bookmark does not exist (one successful replication is required before pruning works)")
pfsPlanErrAndLog(err, "")
return
return nil
}
cursorGuid = resp.GetGuid()
beforeCursor = containsGuid(tfsvs, cursorGuid)
Expand All @@ -137,7 +137,7 @@ func (self *fs) Build(a *args, tfs *pdu.Filesystem, target Target,
if err != nil {
err := fmt.Errorf("%s: %w", tfsv.RelName(), err)
pfsPlanErrAndLog(err, "fs version with invalid creation date")
return
return nil
}
s := &snapshot{date: creation, fsv: tfsv}
// note that we cannot use CreateTXG because target and receiver could be
Expand All @@ -154,9 +154,10 @@ func (self *fs) Build(a *args, tfs *pdu.Filesystem, target Target,
if needsReplicated && beforeCursor {
err := errors.New("prune target has no snapshot that corresponds to sender replication cursor bookmark")
pfsPlanErrAndLog(err, "")
return
return nil
}

// Apply prune rules
self.destroyList = pruning.PruneSnapshots(ctx, self.snaps, a.rules)
return nil
}
Loading

0 comments on commit 748eb6e

Please sign in to comment.