diff --git a/README.md b/README.md index d2189ba..1a5ecc6 100644 --- a/README.md +++ b/README.md @@ -590,6 +590,19 @@ pkg install zrepl-dsh2dsh See also zrepl/zrepl#403 + * Snapshots now created concurrently. + + By default it uses the number of CPUs as concurrency limit and it can be + changed in config: + + ```yaml + jobs: + - name: "zroot-to-zdisk" + snapshotting: + type: "periodic" + concurrency: 1 + ``` + ## Upstream user documentation **User Documentation** can be found at diff --git a/internal/config/config.go b/internal/config/config.go index 73121b1..0f7b7b7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -342,6 +342,7 @@ type SnapshottingPeriodic struct { Hooks []HookCommand `yaml:"hooks" validate:"dive"` TimestampFormat string `yaml:"timestamp_format" default:"dense" validate:"required"` TimestampLocal bool `yaml:"timestamp_local" default:"true"` + Concurrency uint `yaml:"concurrency"` } func (self *SnapshottingPeriodic) CronSpec() string { diff --git a/internal/daemon/hooks/hook_config.go b/internal/daemon/hooks/hook_config.go index a09b1b4..2147561 100644 --- a/internal/daemon/hooks/hook_config.go +++ b/internal/daemon/hooks/hook_config.go @@ -7,8 +7,6 @@ import ( "github.com/dsh2dsh/zrepl/internal/zfs" ) -type List []*CommandHook - func ListFromConfig(in []config.HookCommand) (List, error) { hl := make(List, len(in)) for i, h := range in { @@ -21,6 +19,10 @@ func ListFromConfig(in []config.HookCommand) (List, error) { return hl, nil } +type List []*CommandHook + +func (self List) Slice() []*CommandHook { return []*CommandHook(self) } + func (self List) WithCombinedOutput() List { for _, h := range self { h.WithCombinedOutput() diff --git a/internal/daemon/snapper/impl.go b/internal/daemon/snapper/impl.go index 43ae3b7..4588b64 100644 --- a/internal/daemon/snapper/impl.go +++ b/internal/daemon/snapper/impl.go @@ -4,15 +4,16 @@ import ( "context" "fmt" "log/slog" - "sort" + "slices" "strconv" "strings" "time" + "golang.org/x/sync/errgroup" + "github.com/dsh2dsh/zrepl/internal/daemon/hooks" "github.com/dsh2dsh/zrepl/internal/daemon/logging" "github.com/dsh2dsh/zrepl/internal/logger" - "github.com/dsh2dsh/zrepl/internal/util/chainlock" "github.com/dsh2dsh/zrepl/internal/zfs" ) @@ -21,20 +22,23 @@ type planArgs struct { timestampFormat string timestampLocal bool hooks hooks.List + concurrency int } type plan struct { - mtx chainlock.L args planArgs - snaps map[*zfs.DatasetPath]*snapProgress + snaps map[*zfs.DatasetPath]*progress + + hookMatchCount map[hooks.Hook]int } func makePlan(args planArgs, fss []*zfs.DatasetPath) *plan { - snaps := make(map[*zfs.DatasetPath]*snapProgress, len(fss)) + snaps := make(map[*zfs.DatasetPath]*progress, len(fss)) for _, fs := range fss { - snaps[fs] = &snapProgress{state: SnapPending} + snaps[fs] = NewProgress() } - return &plan{snaps: snaps, args: args} + p := &plan{snaps: snaps, args: args} + return p.init() } type SnapState uint @@ -60,27 +64,25 @@ func (self SnapState) String() string { return "SnapState(" + strconv.FormatInt(int64(self), 10) + ")" } -// All fields protected by Snapper.mtx -type snapProgress struct { - state SnapState - - // SnapStarted, SnapDone, SnapError - name string - startAt time.Time - hookPlan *hooks.Plan - - // SnapDone - doneAt time.Time +func (self *plan) init() *plan { + self.hookMatchCount = make(map[hooks.Hook]int, len(self.args.hooks)) + for _, h := range self.args.hooks { + self.hookMatchCount[h] = 0 + } + return self +} - // SnapErr TODO disambiguate state - runResults hooks.PlanReport +func (self *plan) snapName() string { + return fmt.Sprintf("%s%s", self.args.prefix, + self.formatNow(self.args.timestampFormat, self.args.timestampLocal)) } -func (plan *plan) formatNow(format string, localTime bool) string { +func (self *plan) formatNow(format string, localTime bool) string { now := time.Now() if !localTime { now = now.UTC() } + switch strings.ToLower(format) { case "dense": format = "20060102_150405_MST" @@ -94,203 +96,103 @@ func (plan *plan) formatNow(format string, localTime bool) string { return now.Format(format) } -func (plan *plan) execute(ctx context.Context, dryRun bool) (ok bool) { - hookMatchCount := make(map[hooks.Hook]int, len(plan.args.hooks)) - for _, h := range plan.args.hooks { - hookMatchCount[h] = 0 - } +func (self *plan) execute(ctx context.Context, dryRun bool) bool { + var anyFsHadErr bool + var g errgroup.Group + g.SetLimit(self.args.concurrency) - anyFsHadErr := false // TODO channel programs -> allow a little jitter? - for fs, progress := range plan.snaps { - suffix := plan.formatNow(plan.args.timestampFormat, - plan.args.timestampLocal) - snapname := fmt.Sprintf("%s%s", plan.args.prefix, suffix) - + for fs, progress := range self.snaps { + snapName := self.snapName() ctx := logging.With(ctx, slog.String("fs", fs.ToString()), - slog.String("snap", snapname)) + slog.String("snap", snapName)) - hookEnvExtra := map[string]string{ - hooks.EnvFS: fs.ToString(), - hooks.EnvSnapshot: snapname, + hookPlan := self.hookPlan(ctx, fs, snapName) + if hookPlan == nil { + anyFsHadErr = true + progress.StateError() + continue } - jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(ctx context.Context) (err error) { - l := getLogger(ctx) - l.Debug("create snapshot") - err = zfs.ZFSSnapshot(ctx, fs, snapname, false) // TODO propagate context to ZFSSnapshot - if err != nil { - logger.WithError(l, err, "cannot create snapshot") - } - return + g.Go(func() error { + return progress.CreateSnapshot(ctx, dryRun, snapName, hookPlan) }) + } - fsHadErr := false - var hookPlanReport hooks.PlanReport - var hookPlan *hooks.Plan - { - filteredHooks, err := plan.args.hooks.CopyFilteredForFilesystem(fs) - if err != nil { - logger.WithError(getLogger(ctx), err, "unexpected filter error") - fsHadErr = true - goto updateFSState - } - // account for running hooks - for _, h := range filteredHooks { - hookMatchCount[h]++ - } + if err := g.Wait(); err != nil { + return false + } + self.logUnmatchedHooks(ctx) + return !anyFsHadErr +} - var planErr error - hookPlan, planErr = hooks.NewPlan(filteredHooks, hooks.PhaseSnapshot, jobCallback, hookEnvExtra) - if planErr != nil { - fsHadErr = true - logger.WithError(getLogger(ctx), planErr, "cannot create job hook plan") - goto updateFSState - } - } +func (self *plan) hookPlan(ctx context.Context, fs *zfs.DatasetPath, + snapName string, +) *hooks.Plan { + filteredHooks, err := self.args.hooks.CopyFilteredForFilesystem(fs) + if err != nil { + logger.WithError(getLogger(ctx), err, "unexpected filter error") + return nil + } + // account for running hooks + self.countHooks(filteredHooks) - plan.mtx.HoldWhile(func() { - progress.name = snapname - progress.startAt = time.Now() - progress.hookPlan = hookPlan - progress.state = SnapStarted + jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, + func(ctx context.Context) error { + return createSnapshot(ctx, fs, snapName) }) - { - getLogger(ctx). - With(slog.String("report", hookPlan.Report().String())). - Debug("begin run job plan") - hookPlan.Run(ctx, dryRun) - hookPlanReport = hookPlan.Report() - fsHadErr = hookPlanReport.HadError() // not just fatal errors - if fsHadErr { - getLogger(ctx). - With(slog.String("report", hookPlanReport.String())). - Error("end run job plan with error") - } else { - getLogger(ctx). - With(slog.String("report", hookPlanReport.String())). - Info("end run job plan successful") - } - } - - updateFSState: - anyFsHadErr = anyFsHadErr || fsHadErr - plan.mtx.HoldWhile(func() { - progress.doneAt = time.Now() - progress.state = SnapDone - if fsHadErr { - progress.state = SnapError - } - progress.runResults = hookPlanReport + hookPlan, err := hooks.NewPlan(filteredHooks, hooks.PhaseSnapshot, + jobCallback, map[string]string{ + hooks.EnvFS: fs.ToString(), + hooks.EnvSnapshot: snapName, }) + if err != nil { + logger.WithError(getLogger(ctx), err, "cannot create job hook plan") + return nil } + return hookPlan +} - for h, mc := range hookMatchCount { - if mc == 0 { - hookIdx := -1 - for idx, ah := range plan.args.hooks { - if ah == h { - hookIdx = idx - break - } - } - getLogger(ctx). - With(slog.String("hook", h.String()), - slog.Int("hook_number", hookIdx+1)). - Warn("hook did not match any snapshotted filesystems") - } +func createSnapshot(ctx context.Context, fs *zfs.DatasetPath, snapName string, +) error { + l := getLogger(ctx) + l.Debug("create snapshot") + if err := zfs.ZFSSnapshot(ctx, fs, snapName, false); err != nil { + logger.WithError(l, err, "cannot create snapshot") + return err } - - return !anyFsHadErr + return nil } -type ReportFilesystem struct { - Path string - State SnapState - - // Valid in SnapStarted and later - SnapName string - StartAt time.Time - Hooks string - HooksHadError bool - - // Valid in SnapDone | SnapError - DoneAt time.Time +func (self *plan) countHooks(filteredHooks hooks.List) { + for _, h := range filteredHooks { + self.hookMatchCount[h]++ + } } -func (plan *plan) report() []*ReportFilesystem { - plan.mtx.Lock() - defer plan.mtx.Unlock() - - pReps := make([]*ReportFilesystem, 0, len(plan.snaps)) - for fs, p := range plan.snaps { - var hooksStr string - var hooksHadError bool - if p.hookPlan != nil { - hooksStr, hooksHadError = p.report() +func (self *plan) logUnmatchedHooks(ctx context.Context) { + argsHooks := self.args.hooks.Slice() + l := getLogger(ctx) + for h, cnt := range self.hookMatchCount { + if cnt != 0 { + continue } - pReps = append(pReps, &ReportFilesystem{ - Path: fs.ToString(), - State: p.state, - SnapName: p.name, - StartAt: p.startAt, - DoneAt: p.doneAt, - Hooks: hooksStr, - HooksHadError: hooksHadError, - }) + hookIdx := slices.IndexFunc(argsHooks, + func(h2 *hooks.CommandHook) bool { return h2 == h }) + l.With(slog.String("hook", h.String()), + slog.Int("hook_number", hookIdx+1)). + Warn("hook did not match any snapshotted filesystems") } - - sort.Slice(pReps, func(i, j int) bool { - return strings.Compare(pReps[i].Path, pReps[j].Path) == -1 - }) - - return pReps } -func (p *snapProgress) report() (hooksStr string, hooksHadError bool) { - hr := p.hookPlan.Report() - // FIXME: technically this belongs into client - // but we can't serialize hooks.Step ATM - rightPad := func(str string, length int, pad string) string { - if len(str) > length { - return str[:length] - } - return str + strings.Repeat(pad, length-len(str)) - } - hooksHadError = hr.HadError() - rows := make([][]string, len(hr)) - const numCols = 4 - lens := make([]int, numCols) - for i, e := range hr { - rows[i] = make([]string, numCols) - rows[i][0] = strconv.Itoa(i + 1) - rows[i][1] = e.Status.String() - runTime := "..." - if e.Status != hooks.StepPending { - runTime = e.End.Sub(e.Begin).Round(time.Millisecond).String() - } - rows[i][2] = runTime - rows[i][3] = "" - if e.Report != nil { - rows[i][3] = e.Report.String() - } - for j, col := range lens { - if len(rows[i][j]) > col { - lens[j] = len(rows[i][j]) - } - } - } - rowsFlat := make([]string, len(hr)) - for i, r := range rows { - colsPadded := make([]string, len(r)) - for j, c := range r[:len(r)-1] { - colsPadded[j] = rightPad(c, lens[j], " ") - } - colsPadded[len(r)-1] = r[len(r)-1] - rowsFlat[i] = strings.Join(colsPadded, " ") +func (self *plan) report() []*ReportFilesystem { + reports := make([]*ReportFilesystem, 0, len(self.snaps)) + for fs, p := range self.snaps { + reports = append(reports, p.Report(fs.ToString())) } - hooksStr = strings.Join(rowsFlat, "\n") - - return hooksStr, hooksHadError + slices.SortFunc(reports, func(a, b *ReportFilesystem) int { + return strings.Compare(a.Path, b.Path) + }) + return reports } diff --git a/internal/daemon/snapper/periodic.go b/internal/daemon/snapper/periodic.go index 70c2101..6e67042 100644 --- a/internal/daemon/snapper/periodic.go +++ b/internal/daemon/snapper/periodic.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "runtime" "slices" "sort" "sync" @@ -46,6 +47,11 @@ func periodicFromConfig(fsf zfs.DatasetFilter, in *config.SnapshottingPeriodic, return nil, fmt.Errorf("hook config error: %w", err) } + concurrency := int(in.Concurrency) + if concurrency < 1 { + concurrency = runtime.GOMAXPROCS(0) + } + s := &Periodic{ cronSpec: cronSpec, args: periodicArgs{ @@ -56,6 +62,7 @@ func periodicFromConfig(fsf zfs.DatasetFilter, in *config.SnapshottingPeriodic, timestampFormat: in.TimestampFormat, timestampLocal: in.TimestampLocal, hooks: hookList, + concurrency: concurrency, }, // ctx and log is set in Run() }, @@ -117,7 +124,8 @@ func (self *Periodic) Run(ctx context.Context) { err, "failed start snapper") } - log.Info("start snapper") + log.With(slog.Int("concurrency", self.args.planArgs.concurrency)). + Info("start snapper") defer log.Info("exiting snapper") u := func(u func(*Periodic)) State { diff --git a/internal/daemon/snapper/progress.go b/internal/daemon/snapper/progress.go new file mode 100644 index 0000000..d56217a --- /dev/null +++ b/internal/daemon/snapper/progress.go @@ -0,0 +1,170 @@ +package snapper + +import ( + "context" + "errors" + "log/slog" + "strconv" + "strings" + "sync" + "time" + + "github.com/dsh2dsh/zrepl/internal/daemon/hooks" +) + +var planErr = errors.New("end run job plan with error") + +func NewProgress() *progress { + return &progress{state: SnapPending} +} + +// All fields protected by Snapper.mtx +type progress struct { + state SnapState + mu sync.Mutex + + // SnapStarted, SnapDone, SnapError + name string + startAt time.Time + hookPlan *hooks.Plan + + // SnapDone + doneAt time.Time + + // SnapErr TODO disambiguate state + runResults hooks.PlanReport +} + +func (self *progress) CreateSnapshot(ctx context.Context, dryRun bool, + snapName string, hookPlan *hooks.Plan, +) error { + self.start(snapName, hookPlan) + l := getLogger(ctx) + l.With(slog.String("report", hookPlan.Report().String())). + Debug("begin run job plan") + + hookPlan.Run(ctx, dryRun) + hookPlanReport := hookPlan.Report() + + l = l.With(slog.String("report", hookPlanReport.String())) + if hookPlanReport.HadError() { // not just fatal errors + l.Error(planErr.Error()) + self.StateError() + return planErr + } + + l.Info("end run job plan successful") + self.done(hookPlanReport) + return nil +} + +func (self *progress) start(name string, plan *hooks.Plan) { + self.mu.Lock() + defer self.mu.Unlock() + + self.name = name + self.startAt = time.Now() + self.hookPlan = plan + self.state = SnapStarted +} + +func (self *progress) done(report hooks.PlanReport) { + self.mu.Lock() + defer self.mu.Unlock() + + self.doneAt = time.Now() + self.state = SnapDone + self.runResults = report +} + +func (self *progress) StateError() { + self.mu.Lock() + defer self.mu.Unlock() + + self.doneAt = time.Now() + self.state = SnapError +} + +func (self *progress) Report(fs string) *ReportFilesystem { + self.mu.Lock() + defer self.mu.Unlock() + + hooksStr, hooksHadError := self.buildReport() + return &ReportFilesystem{ + Path: fs, + State: self.state, + SnapName: self.name, + StartAt: self.startAt, + DoneAt: self.doneAt, + Hooks: hooksStr, + HooksHadError: hooksHadError, + } +} + +type ReportFilesystem struct { + Path string + State SnapState + + // Valid in SnapStarted and later + SnapName string + StartAt time.Time + Hooks string + HooksHadError bool + + // Valid in SnapDone | SnapError + DoneAt time.Time +} + +func (self *progress) buildReport() (string, bool) { + if self.hookPlan == nil { + return "", false + } + + hr := self.hookPlan.Report() + // FIXME: technically this belongs into client + // but we can't serialize hooks.Step ATM + rightPad := func(str string, length int, pad string) string { + if len(str) > length { + return str[:length] + } + return str + strings.Repeat(pad, length-len(str)) + } + + hooksHadError := hr.HadError() + rows := make([][]string, len(hr)) + const numCols = 4 + lens := make([]int, numCols) + + for i, e := range hr { + rows[i] = make([]string, numCols) + rows[i][0] = strconv.Itoa(i + 1) + rows[i][1] = e.Status.String() + runTime := "..." + if e.Status != hooks.StepPending { + runTime = e.End.Sub(e.Begin).Round(time.Millisecond).String() + } + rows[i][2] = runTime + rows[i][3] = "" + if e.Report != nil { + rows[i][3] = e.Report.String() + } + for j, col := range lens { + if len(rows[i][j]) > col { + lens[j] = len(rows[i][j]) + } + } + } + + rowsFlat := make([]string, len(hr)) + for i, r := range rows { + colsPadded := make([]string, len(r)) + for j, c := range r[:len(r)-1] { + colsPadded[j] = rightPad(c, lens[j], " ") + } + colsPadded[len(r)-1] = r[len(r)-1] + rowsFlat[i] = strings.Join(colsPadded, " ") + } + + hooksStr := strings.Join(rowsFlat, "\n") + return hooksStr, hooksHadError +}