Skip to content

Commit

Permalink
One more refactor of CommandHook
Browse files Browse the repository at this point in the history
  • Loading branch information
dsh2dsh committed Oct 23, 2024
1 parent b1c81f3 commit 1636f42
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 129 deletions.
9 changes: 8 additions & 1 deletion internal/daemon/hooks/hook_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/dsh2dsh/zrepl/internal/zfs"
)

type List []Hook
type List []*CommandHook

func ListFromConfig(in []config.HookCommand) (List, error) {
hl := make(List, len(in))
Expand All @@ -21,6 +21,13 @@ func ListFromConfig(in []config.HookCommand) (List, error) {
return hl, nil
}

func (self List) WithCombinedOutput() List {
for _, h := range self {
h.WithCombinedOutput()
}
return self
}

func (self List) CopyFilteredForFilesystem(fs *zfs.DatasetPath) (List, error) {
ret := make(List, 0, len(self))
for _, h := range self {
Expand Down
15 changes: 4 additions & 11 deletions internal/daemon/hooks/hook_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ import (
"strings"
"sync"
"time"

"github.com/dsh2dsh/zrepl/internal/zfs"
)

type Hook interface {
Filesystems() zfs.DatasetFilter
String() string

// If true and the Pre edge invocation of Run fails, Post edge will not run
Expand All @@ -21,7 +18,7 @@ type Hook interface {
// Run is invoked by HookPlan for a Pre edge. If HookReport.HadError() ==
// false, the Post edge will be invoked, too.
Run(ctx context.Context, edge Edge, phase Phase, dryRun bool,
extra Env, state map[interface{}]any,
extra map[string]string,
) HookReport
}

Expand Down Expand Up @@ -75,7 +72,6 @@ type Step struct {
// FIXME cannot serialize this for client status, but contains interesting
// info (like what error happened)
Report HookReport
state map[any]any
}

func (s Step) String() (out string) {
Expand All @@ -102,27 +98,24 @@ type Plan struct {
post []*Step

phase Phase
env Env
env map[string]string
}

func NewPlan(hooks List, phase Phase, cb *CallbackHook, extra Env,
func NewPlan(hooks List, phase Phase, cb *CallbackHook, extra map[string]string,
) (*Plan, error) {
pre := make([]*Step, 0, len(hooks))
post := make([]*Step, 0, len(hooks))
// TODO sanity check unique name of hook?
for _, hook := range hooks {
state := make(map[any]any)
pre = append(pre, &Step{
Hook: hook,
Edge: Pre,
Status: StepPending,
state: state,
})
post = append(post, &Step{
Hook: hook,
Edge: Post,
Status: StepPending,
state: state,
})
}

Expand Down Expand Up @@ -203,7 +196,7 @@ func (p *Plan) Run(ctx context.Context, dryRun bool) {
runHook := func(s *Step, ctx context.Context, edge Edge) HookReport {
w(func() { s.Status = StepExec })
begin := time.Now()
r := s.Hook.Run(ctx, edge, p.phase, dryRun, p.env, s.state)
r := s.Hook.Run(ctx, edge, p.phase, dryRun, p.env)
end := time.Now()
w(func() {
s.Report = r
Expand Down
70 changes: 8 additions & 62 deletions internal/daemon/hooks/hook_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"context"
"sync"

"github.com/dsh2dsh/zrepl/internal/daemon/logging"
"github.com/dsh2dsh/zrepl/internal/logger"
Expand All @@ -14,67 +13,14 @@ func getLogger(ctx context.Context) logger.Logger {
return logging.GetLogger(ctx, logging.SubsysHooks)
}

const MAX_HOOK_LOG_SIZE_DEFAULT int = 1 << 20

type logWriter struct {
/*
Mutex prevents:
concurrent writes to buf, scanner in Write([]byte)
data race on scanner vs Write([]byte)
and concurrent write to buf (call to buf.Reset())
in Close()
(Also, Close() should generally block until any Write() call completes.)
*/
mtx *sync.Mutex
buf bytes.Buffer
scanner *bufio.Scanner
logger logger.Logger
level logger.Level
field string
}

func NewLogWriter(mtx *sync.Mutex, logger logger.Logger, level logger.Level,
field string,
) *logWriter {
w := new(logWriter)
w.mtx = mtx
w.scanner = bufio.NewScanner(&w.buf)
w.logger = logger
w.level = level
w.field = field
return w
}

func (w *logWriter) log(line string) {
w.logger.WithField(w.field, line).Log(w.level, "hook output")
}

func (w *logWriter) logUnreadBytes() {
for w.scanner.Scan() {
w.log(w.scanner.Text())
func logOutput(l logger.Logger, level logger.Level, field string,
output []byte,
) {
if len(output) == 0 {
return
}
}

func (w *logWriter) Write(in []byte) (int, error) {
w.mtx.Lock()
defer w.mtx.Unlock()

n, err := w.buf.Write(in)
if err != nil {
return n, err
scanner := bufio.NewScanner(bytes.NewReader(output))
for scanner.Scan() {
l.WithField(field, scanner.Text()).Log(level, "hook output")
}
w.logUnreadBytes()

// Always reset the scanner for the next Write
w.buf.Reset()
w.scanner = bufio.NewScanner(&w.buf)
return n, nil
}

func (w *logWriter) Close() error {
w.mtx.Lock()
defer w.mtx.Unlock()
w.logUnreadBytes()
return nil
}
2 changes: 1 addition & 1 deletion internal/daemon/hooks/hook_type_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *CallbackHookReport) Error() string {
}

func (h *CallbackHook) Run(ctx context.Context, edge Edge, phase Phase,
dryRun bool, extra Env, state map[any]any,
dryRun bool, extra map[string]string,
) HookReport {
return &CallbackHookReport{Name: h.displayString, Err: h.cb(ctx)}
}
102 changes: 52 additions & 50 deletions internal/daemon/hooks/hook_type_command.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package hooks

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"os"
"os/exec"
"slices"
"strings"
"sync"
"time"

"github.com/dsh2dsh/zrepl/internal/config"
Expand All @@ -28,9 +26,11 @@ const (
)

func NewHookEnv(edge Edge, phase Phase, dryRun bool, timeout time.Duration,
extra Env,
) Env {
r := Env{EnvTimeout: fmt.Sprintf("%.f", math.Floor(timeout.Seconds()))}
extra map[string]string,
) map[string]string {
r := map[string]string{
EnvTimeout: fmt.Sprintf("%.f", math.Floor(timeout.Seconds())),
}

edgeString := edge.StringForPhase(phase)
r[EnvType] = strings.ToLower(edgeString)
Expand All @@ -49,11 +49,8 @@ func NewHookEnv(edge Edge, phase Phase, dryRun bool, timeout time.Duration,
return r
}

type Env map[string]string

func NewCommandHook(in *config.HookCommand) (*CommandHook, error) {
r := &CommandHook{
edge: Pre | Post,
errIsFatal: in.ErrIsFatal,
command: in.Path,
timeout: in.Timeout,
Expand All @@ -68,71 +65,76 @@ func NewCommandHook(in *config.HookCommand) (*CommandHook, error) {
}

type CommandHook struct {
edge Edge
filter zfs.DatasetFilter
errIsFatal bool
command string
timeout time.Duration
}

func (h *CommandHook) Filesystems() zfs.DatasetFilter { return h.filter }

func (h *CommandHook) ErrIsFatal() bool {
return h.errIsFatal
combinedOutput bool
}

func (h *CommandHook) String() string {
return h.command
func (self *CommandHook) WithCombinedOutput() *CommandHook {
self.combinedOutput = true
return self
}

func (h *CommandHook) Run(ctx context.Context, edge Edge, phase Phase,
dryRun bool, extra Env, state map[any]any,
func (self *CommandHook) Filesystems() zfs.DatasetFilter { return self.filter }

func (self *CommandHook) ErrIsFatal() bool { return self.errIsFatal }

func (self *CommandHook) String() string { return self.command }

func (self *CommandHook) Run(ctx context.Context, edge Edge, phase Phase,
dryRun bool, extra map[string]string,
) HookReport {
l := getLogger(ctx).WithField("command", h.command)
if h.timeout > 0 {
cmdCtx, cancel := context.WithTimeout(ctx, h.timeout)
if self.timeout > 0 {
cmdCtx, cancel := context.WithTimeout(ctx, self.timeout)
defer cancel()
ctx = cmdCtx
}

cmdExec := exec.CommandContext(ctx, h.command)
hookEnv := NewHookEnv(edge, phase, dryRun, h.timeout, extra)
cmdEnv := os.Environ()
for k, v := range hookEnv {
cmdEnv = append(cmdEnv, fmt.Sprintf("%s=%s", k, v))
}
cmdExec.Env = cmdEnv

var scanMutex sync.Mutex
logErrWriter := NewLogWriter(&scanMutex, l, logger.Warn, "stderr")
logOutWriter := NewLogWriter(&scanMutex, l, logger.Info, "stdout")
defer logErrWriter.Close()
defer logOutWriter.Close()

var combinedOutput bytes.Buffer
cmdExec.Stderr = io.MultiWriter(logErrWriter, &combinedOutput)
cmdExec.Stdout = io.MultiWriter(logOutWriter, &combinedOutput)
cmd := exec.CommandContext(ctx, self.command)
env, hookEnv := self.makeEnv(edge, phase, dryRun, extra)
cmd.Env = env

report := &CommandHookReport{
Command: h.command,
Command: self.command,
Env: hookEnv,
// no report.Args
}

if err := cmdExec.Start(); err != nil {
report.Err = err
return report
}
combinedOutput, err := cmd.Output()
l := getLogger(ctx).WithField("command", self.command)
logOutput(l, logger.Info, "stdout", combinedOutput)

if err := cmdExec.Wait(); err != nil {
if err != nil {
var ee *exec.ExitError
if errors.As(err, &ee) {
logOutput(l, logger.Warn, "stderr", ee.Stderr)
if self.combinedOutput {
combinedOutput = slices.Concat(combinedOutput, ee.Stderr)
}
}
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
report.Err = fmt.Errorf("timed out after %s: %s", h.timeout, err)
} else {
report.Err = err
err = fmt.Errorf("timed out after %s: %s", self.timeout, err)
}
report.Err = err
}

report.CombinedOutput = make([]byte, combinedOutput.Len())
copy(report.CombinedOutput, combinedOutput.Bytes())
if self.combinedOutput {
report.CombinedOutput = combinedOutput
}
return report
}

func (self *CommandHook) makeEnv(edge Edge, phase Phase, dryRun bool,
extra map[string]string,
) ([]string, map[string]string) {
env := slices.Clone(os.Environ())
hookEnv := NewHookEnv(edge, phase, dryRun, self.timeout, extra)
env = slices.Grow(env, len(hookEnv))
for k, v := range hookEnv {
env = append(env, k+"="+v)
}
return env, hookEnv
}
5 changes: 3 additions & 2 deletions internal/daemon/hooks/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ jobs:
return nil
})

hookEnvExtra := hooks.Env{
hookEnvExtra := map[string]string{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: testSnapshotName,
}
Expand All @@ -419,7 +419,8 @@ jobs:

filteredHooks, err := hookList.CopyFilteredForFilesystem(fs)
require.NoError(t, err)
plan, err := hooks.NewPlan(filteredHooks, hooks.PhaseTesting, cb, hookEnvExtra)
plan, err := hooks.NewPlan(filteredHooks.WithCombinedOutput(),
hooks.PhaseTesting, cb, hookEnvExtra)
require.NoError(t, err)
t.Logf("REPORT PRE EXECUTION:\n%s", plan.Report())

Expand Down
2 changes: 1 addition & 1 deletion internal/daemon/hooks/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type CommandHookReport struct {
Command string
Args []string // currently always empty
Env Env
Env map[string]string
Err error

CombinedOutput []byte
Expand Down
2 changes: 1 addition & 1 deletion internal/daemon/snapper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (plan *plan) execute(ctx context.Context, dryRun bool) (ok bool) {
ctx := logging.WithLogger(ctx, logging.FromContext(ctx).
WithField("fs", fs.ToString()).WithField("snap", snapname))

hookEnvExtra := hooks.Env{
hookEnvExtra := map[string]string{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: snapname,
}
Expand Down

0 comments on commit 1636f42

Please sign in to comment.