From 1636f4282a0b3e217b79d2c575c10ffd83cdd97d Mon Sep 17 00:00:00 2001 From: Denis Shaposhnikov <993498+dsh2dsh@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:43:57 +0200 Subject: [PATCH] One more refactor of CommandHook --- internal/daemon/hooks/hook_config.go | 9 +- internal/daemon/hooks/hook_exec.go | 15 +-- internal/daemon/hooks/hook_logging.go | 70 ++------------ internal/daemon/hooks/hook_type_callback.go | 2 +- internal/daemon/hooks/hook_type_command.go | 102 ++++++++++---------- internal/daemon/hooks/hooks_test.go | 5 +- internal/daemon/hooks/report.go | 2 +- internal/daemon/snapper/impl.go | 2 +- 8 files changed, 78 insertions(+), 129 deletions(-) diff --git a/internal/daemon/hooks/hook_config.go b/internal/daemon/hooks/hook_config.go index 0495760d..db7ec043 100644 --- a/internal/daemon/hooks/hook_config.go +++ b/internal/daemon/hooks/hook_config.go @@ -7,7 +7,7 @@ import ( "github.com/dsh2dsh/zrepl/internal/zfs" ) -type List []Hook +type List []*CommandHook func ListFromConfig(in []config.HookCommand) (List, error) { hl := make(List, len(in)) @@ -21,6 +21,13 @@ func ListFromConfig(in []config.HookCommand) (List, error) { return hl, nil } +func (self List) WithCombinedOutput() List { + for _, h := range self { + h.WithCombinedOutput() + } + return self +} + func (self List) CopyFilteredForFilesystem(fs *zfs.DatasetPath) (List, error) { ret := make(List, 0, len(self)) for _, h := range self { diff --git a/internal/daemon/hooks/hook_exec.go b/internal/daemon/hooks/hook_exec.go index 516c0f7c..98aa9889 100644 --- a/internal/daemon/hooks/hook_exec.go +++ b/internal/daemon/hooks/hook_exec.go @@ -6,12 +6,9 @@ import ( "strings" "sync" "time" - - "github.com/dsh2dsh/zrepl/internal/zfs" ) type Hook interface { - Filesystems() zfs.DatasetFilter String() string // If true and the Pre edge invocation of Run fails, Post edge will not run @@ -21,7 +18,7 @@ type Hook interface { // Run is invoked by HookPlan for a Pre edge. If HookReport.HadError() == // false, the Post edge will be invoked, too. Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, - extra Env, state map[interface{}]any, + extra map[string]string, ) HookReport } @@ -75,7 +72,6 @@ type Step struct { // FIXME cannot serialize this for client status, but contains interesting // info (like what error happened) Report HookReport - state map[any]any } func (s Step) String() (out string) { @@ -102,27 +98,24 @@ type Plan struct { post []*Step phase Phase - env Env + env map[string]string } -func NewPlan(hooks List, phase Phase, cb *CallbackHook, extra Env, +func NewPlan(hooks List, phase Phase, cb *CallbackHook, extra map[string]string, ) (*Plan, error) { pre := make([]*Step, 0, len(hooks)) post := make([]*Step, 0, len(hooks)) // TODO sanity check unique name of hook? for _, hook := range hooks { - state := make(map[any]any) pre = append(pre, &Step{ Hook: hook, Edge: Pre, Status: StepPending, - state: state, }) post = append(post, &Step{ Hook: hook, Edge: Post, Status: StepPending, - state: state, }) } @@ -203,7 +196,7 @@ func (p *Plan) Run(ctx context.Context, dryRun bool) { runHook := func(s *Step, ctx context.Context, edge Edge) HookReport { w(func() { s.Status = StepExec }) begin := time.Now() - r := s.Hook.Run(ctx, edge, p.phase, dryRun, p.env, s.state) + r := s.Hook.Run(ctx, edge, p.phase, dryRun, p.env) end := time.Now() w(func() { s.Report = r diff --git a/internal/daemon/hooks/hook_logging.go b/internal/daemon/hooks/hook_logging.go index 0a47e3f9..b843edab 100644 --- a/internal/daemon/hooks/hook_logging.go +++ b/internal/daemon/hooks/hook_logging.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "context" - "sync" "github.com/dsh2dsh/zrepl/internal/daemon/logging" "github.com/dsh2dsh/zrepl/internal/logger" @@ -14,67 +13,14 @@ func getLogger(ctx context.Context) logger.Logger { return logging.GetLogger(ctx, logging.SubsysHooks) } -const MAX_HOOK_LOG_SIZE_DEFAULT int = 1 << 20 - -type logWriter struct { - /* - Mutex prevents: - concurrent writes to buf, scanner in Write([]byte) - data race on scanner vs Write([]byte) - and concurrent write to buf (call to buf.Reset()) - in Close() - - (Also, Close() should generally block until any Write() call completes.) - */ - mtx *sync.Mutex - buf bytes.Buffer - scanner *bufio.Scanner - logger logger.Logger - level logger.Level - field string -} - -func NewLogWriter(mtx *sync.Mutex, logger logger.Logger, level logger.Level, - field string, -) *logWriter { - w := new(logWriter) - w.mtx = mtx - w.scanner = bufio.NewScanner(&w.buf) - w.logger = logger - w.level = level - w.field = field - return w -} - -func (w *logWriter) log(line string) { - w.logger.WithField(w.field, line).Log(w.level, "hook output") -} - -func (w *logWriter) logUnreadBytes() { - for w.scanner.Scan() { - w.log(w.scanner.Text()) +func logOutput(l logger.Logger, level logger.Level, field string, + output []byte, +) { + if len(output) == 0 { + return } -} - -func (w *logWriter) Write(in []byte) (int, error) { - w.mtx.Lock() - defer w.mtx.Unlock() - - n, err := w.buf.Write(in) - if err != nil { - return n, err + scanner := bufio.NewScanner(bytes.NewReader(output)) + for scanner.Scan() { + l.WithField(field, scanner.Text()).Log(level, "hook output") } - w.logUnreadBytes() - - // Always reset the scanner for the next Write - w.buf.Reset() - w.scanner = bufio.NewScanner(&w.buf) - return n, nil -} - -func (w *logWriter) Close() error { - w.mtx.Lock() - defer w.mtx.Unlock() - w.logUnreadBytes() - return nil } diff --git a/internal/daemon/hooks/hook_type_callback.go b/internal/daemon/hooks/hook_type_callback.go index 1abbbb0d..8d012453 100644 --- a/internal/daemon/hooks/hook_type_callback.go +++ b/internal/daemon/hooks/hook_type_callback.go @@ -63,7 +63,7 @@ func (r *CallbackHookReport) Error() string { } func (h *CallbackHook) Run(ctx context.Context, edge Edge, phase Phase, - dryRun bool, extra Env, state map[any]any, + dryRun bool, extra map[string]string, ) HookReport { return &CallbackHookReport{Name: h.displayString, Err: h.cb(ctx)} } diff --git a/internal/daemon/hooks/hook_type_command.go b/internal/daemon/hooks/hook_type_command.go index 30ceae05..a3d1ce7e 100644 --- a/internal/daemon/hooks/hook_type_command.go +++ b/internal/daemon/hooks/hook_type_command.go @@ -1,16 +1,14 @@ package hooks import ( - "bytes" "context" "errors" "fmt" - "io" "math" "os" "os/exec" + "slices" "strings" - "sync" "time" "github.com/dsh2dsh/zrepl/internal/config" @@ -28,9 +26,11 @@ const ( ) func NewHookEnv(edge Edge, phase Phase, dryRun bool, timeout time.Duration, - extra Env, -) Env { - r := Env{EnvTimeout: fmt.Sprintf("%.f", math.Floor(timeout.Seconds()))} + extra map[string]string, +) map[string]string { + r := map[string]string{ + EnvTimeout: fmt.Sprintf("%.f", math.Floor(timeout.Seconds())), + } edgeString := edge.StringForPhase(phase) r[EnvType] = strings.ToLower(edgeString) @@ -49,11 +49,8 @@ func NewHookEnv(edge Edge, phase Phase, dryRun bool, timeout time.Duration, return r } -type Env map[string]string - func NewCommandHook(in *config.HookCommand) (*CommandHook, error) { r := &CommandHook{ - edge: Pre | Post, errIsFatal: in.ErrIsFatal, command: in.Path, timeout: in.Timeout, @@ -68,71 +65,76 @@ func NewCommandHook(in *config.HookCommand) (*CommandHook, error) { } type CommandHook struct { - edge Edge filter zfs.DatasetFilter errIsFatal bool command string timeout time.Duration -} -func (h *CommandHook) Filesystems() zfs.DatasetFilter { return h.filter } - -func (h *CommandHook) ErrIsFatal() bool { - return h.errIsFatal + combinedOutput bool } -func (h *CommandHook) String() string { - return h.command +func (self *CommandHook) WithCombinedOutput() *CommandHook { + self.combinedOutput = true + return self } -func (h *CommandHook) Run(ctx context.Context, edge Edge, phase Phase, - dryRun bool, extra Env, state map[any]any, +func (self *CommandHook) Filesystems() zfs.DatasetFilter { return self.filter } + +func (self *CommandHook) ErrIsFatal() bool { return self.errIsFatal } + +func (self *CommandHook) String() string { return self.command } + +func (self *CommandHook) Run(ctx context.Context, edge Edge, phase Phase, + dryRun bool, extra map[string]string, ) HookReport { - l := getLogger(ctx).WithField("command", h.command) - if h.timeout > 0 { - cmdCtx, cancel := context.WithTimeout(ctx, h.timeout) + if self.timeout > 0 { + cmdCtx, cancel := context.WithTimeout(ctx, self.timeout) defer cancel() ctx = cmdCtx } - cmdExec := exec.CommandContext(ctx, h.command) - hookEnv := NewHookEnv(edge, phase, dryRun, h.timeout, extra) - cmdEnv := os.Environ() - for k, v := range hookEnv { - cmdEnv = append(cmdEnv, fmt.Sprintf("%s=%s", k, v)) - } - cmdExec.Env = cmdEnv - - var scanMutex sync.Mutex - logErrWriter := NewLogWriter(&scanMutex, l, logger.Warn, "stderr") - logOutWriter := NewLogWriter(&scanMutex, l, logger.Info, "stdout") - defer logErrWriter.Close() - defer logOutWriter.Close() - - var combinedOutput bytes.Buffer - cmdExec.Stderr = io.MultiWriter(logErrWriter, &combinedOutput) - cmdExec.Stdout = io.MultiWriter(logOutWriter, &combinedOutput) + cmd := exec.CommandContext(ctx, self.command) + env, hookEnv := self.makeEnv(edge, phase, dryRun, extra) + cmd.Env = env report := &CommandHookReport{ - Command: h.command, + Command: self.command, Env: hookEnv, // no report.Args } - if err := cmdExec.Start(); err != nil { - report.Err = err - return report - } + combinedOutput, err := cmd.Output() + l := getLogger(ctx).WithField("command", self.command) + logOutput(l, logger.Info, "stdout", combinedOutput) - if err := cmdExec.Wait(); err != nil { + if err != nil { + var ee *exec.ExitError + if errors.As(err, &ee) { + logOutput(l, logger.Warn, "stderr", ee.Stderr) + if self.combinedOutput { + combinedOutput = slices.Concat(combinedOutput, ee.Stderr) + } + } if errors.Is(context.Cause(ctx), context.DeadlineExceeded) { - report.Err = fmt.Errorf("timed out after %s: %s", h.timeout, err) - } else { - report.Err = err + err = fmt.Errorf("timed out after %s: %s", self.timeout, err) } + report.Err = err } - report.CombinedOutput = make([]byte, combinedOutput.Len()) - copy(report.CombinedOutput, combinedOutput.Bytes()) + if self.combinedOutput { + report.CombinedOutput = combinedOutput + } return report } + +func (self *CommandHook) makeEnv(edge Edge, phase Phase, dryRun bool, + extra map[string]string, +) ([]string, map[string]string) { + env := slices.Clone(os.Environ()) + hookEnv := NewHookEnv(edge, phase, dryRun, self.timeout, extra) + env = slices.Grow(env, len(hookEnv)) + for k, v := range hookEnv { + env = append(env, k+"="+v) + } + return env, hookEnv +} diff --git a/internal/daemon/hooks/hooks_test.go b/internal/daemon/hooks/hooks_test.go index af44e7a0..117df525 100644 --- a/internal/daemon/hooks/hooks_test.go +++ b/internal/daemon/hooks/hooks_test.go @@ -401,7 +401,7 @@ jobs: return nil }) - hookEnvExtra := hooks.Env{ + hookEnvExtra := map[string]string{ hooks.EnvFS: fs.ToString(), hooks.EnvSnapshot: testSnapshotName, } @@ -419,7 +419,8 @@ jobs: filteredHooks, err := hookList.CopyFilteredForFilesystem(fs) require.NoError(t, err) - plan, err := hooks.NewPlan(filteredHooks, hooks.PhaseTesting, cb, hookEnvExtra) + plan, err := hooks.NewPlan(filteredHooks.WithCombinedOutput(), + hooks.PhaseTesting, cb, hookEnvExtra) require.NoError(t, err) t.Logf("REPORT PRE EXECUTION:\n%s", plan.Report()) diff --git a/internal/daemon/hooks/report.go b/internal/daemon/hooks/report.go index 6d44f253..14a64887 100644 --- a/internal/daemon/hooks/report.go +++ b/internal/daemon/hooks/report.go @@ -10,7 +10,7 @@ import ( type CommandHookReport struct { Command string Args []string // currently always empty - Env Env + Env map[string]string Err error CombinedOutput []byte diff --git a/internal/daemon/snapper/impl.go b/internal/daemon/snapper/impl.go index 3404017a..92dbecf0 100644 --- a/internal/daemon/snapper/impl.go +++ b/internal/daemon/snapper/impl.go @@ -95,7 +95,7 @@ func (plan *plan) execute(ctx context.Context, dryRun bool) (ok bool) { ctx := logging.WithLogger(ctx, logging.FromContext(ctx). WithField("fs", fs.ToString()).WithField("snap", snapname)) - hookEnvExtra := hooks.Env{ + hookEnvExtra := map[string]string{ hooks.EnvFS: fs.ToString(), hooks.EnvSnapshot: snapname, }