Skip to content

Commit

Permalink
Move prometheus metrics into control job
Browse files Browse the repository at this point in the history
  • Loading branch information
dsh2dsh committed Sep 10, 2024
1 parent 26c209d commit e4ac5cf
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 44 deletions.
61 changes: 33 additions & 28 deletions daemon/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ const (
ControlJobEndpointSignal string = "/signal"
)

var promControl middleware.PromControl

type controlJob struct {
sockaddr *net.UnixAddr
sockmode os.FileMode
Expand All @@ -40,6 +38,9 @@ type controlJob struct {

log logger.Logger
pprofServer *pprofServer

requestBegin *prometheus.CounterVec
requestFinished *prometheus.HistogramVec
}

func newControlJob(sockpath string, jobs *jobs, mode uint32,
Expand All @@ -66,22 +67,22 @@ func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { r
func (j *controlJob) SenderConfig() *endpoint.SenderConfig { return nil }

func (j *controlJob) RegisterMetrics(registerer prometheus.Registerer) {
promControl.RequestBegin = prometheus.NewCounterVec(prometheus.CounterOpts{
j.requestBegin = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "zrepl",
Subsystem: "control",
Name: "request_begin",
Help: "number of request we started to handle",
}, []string{"endpoint"})

promControl.RequestFinished = prometheus.NewHistogramVec(prometheus.HistogramOpts{
j.requestFinished = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "control",
Name: "request_finished",
Help: "time it took a request to finish",
Buckets: []float64{1e-6, 10e-6, 100e-6, 500e-6, 1e-3, 10e-3, 100e-3, 200e-3, 400e-3, 800e-3, 1, 10, 20},
}, []string{"endpoint"})
registerer.MustRegister(promControl.RequestBegin)
registerer.MustRegister(promControl.RequestFinished)
registerer.MustRegister(j.requestBegin)
registerer.MustRegister(j.requestFinished)
}

func (j *controlJob) Run(ctx context.Context, cron *cron.Cron) {
Expand Down Expand Up @@ -115,29 +116,8 @@ func (j *controlJob) Run(ctx context.Context, cron *cron.Cron) {
})
}

mux := http.NewServeMux()
logRequest := middleware.RequestLogger(j.log, &promControl)

mux.Handle(ControlJobEndpointPProf, middleware.New(
logRequest,
middleware.JsonRequestResponder(j.log, j.pprof)))

mux.Handle(ControlJobEndpointVersion, middleware.New(
logRequest,
middleware.JsonResponder(j.log, func() (any, error) {
return version.NewZreplVersionInformation(), nil
})))

mux.Handle(ControlJobEndpointStatus, middleware.New(
// don't log requests to status endpoint, too spammy
middleware.JsonResponder(j.log, j.status)))

mux.Handle(ControlJobEndpointSignal, middleware.New(
logRequest,
middleware.JsonRequestResponder(j.log, j.signal)))

server := http.Server{
Handler: mux,
Handler: j.mux(),
// control socket is local, 1s timeout should be more than sufficient, even
// on a loaded system
WriteTimeout: envconst.Duration(
Expand Down Expand Up @@ -169,6 +149,31 @@ func (j *controlJob) Run(ctx context.Context, cron *cron.Cron) {
j.pprofServer.Wait()
}

func (j *controlJob) mux() *http.ServeMux {
mux := http.NewServeMux()
logRequest := middleware.RequestLogger(j.log,
middleware.WithPrometheusMetrics(j.requestBegin, j.requestFinished))

mux.Handle(ControlJobEndpointPProf, middleware.New(
logRequest,
middleware.JsonRequestResponder(j.log, j.pprof)))

mux.Handle(ControlJobEndpointVersion, middleware.New(
logRequest,
middleware.JsonResponder(j.log, func() (any, error) {
return version.NewZreplVersionInformation(), nil
})))

mux.Handle(ControlJobEndpointStatus, middleware.New(
// don't log requests to status endpoint, too spammy
middleware.JsonResponder(j.log, j.status)))

mux.Handle(ControlJobEndpointSignal, middleware.New(
logRequest,
middleware.JsonRequestResponder(j.log, j.signal)))
return mux
}

func (j *controlJob) pprof(decoder middleware.JsonDecoder) (any, error) {
var msg PprofServerControlMsg
err := decoder(&msg)
Expand Down
40 changes: 24 additions & 16 deletions daemon/middleware/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,9 @@ import (
"github.com/dsh2dsh/zrepl/logger"
)

type PromControl struct {
RequestBegin *prometheus.CounterVec
RequestFinished *prometheus.HistogramVec
}

func RequestLogger(log logger.Logger, prom *PromControl, opts ...loggerOption,
) Middleware {
func RequestLogger(log logger.Logger, opts ...loggerOption) Middleware {
l := &requestLogger{
log: log,
prom: prom,

log: log,
completedLevel: logger.Debug,
}

Expand All @@ -35,11 +27,22 @@ func WithCompletedInfo() loggerOption {
return func(self *requestLogger) { self.completedLevel = logger.Info }
}

func WithPrometheusMetrics(requestBegin *prometheus.CounterVec,
requestFinished *prometheus.HistogramVec,
) loggerOption {
return func(self *requestLogger) {
self.requestBegin = requestBegin
self.requestFinished = requestFinished
}
}

type requestLogger struct {
log logger.Logger
prom *PromControl
log logger.Logger

completedLevel logger.Level

requestBegin *prometheus.CounterVec
requestFinished *prometheus.HistogramVec
}

func (self *requestLogger) middleware(next http.Handler) http.Handler {
Expand All @@ -53,10 +56,14 @@ func (self *requestLogger) middleware(next http.Handler) http.Handler {
log.Info("\"" + methodURL + "\"")
log = log.WithField("req", methodURL)

self.prom.RequestBegin.WithLabelValues(r.URL.Path).Inc()
defer prometheus.
NewTimer(self.prom.RequestFinished.WithLabelValues(r.URL.Path)).
ObserveDuration()
if self.requestBegin != nil {
self.requestBegin.WithLabelValues(r.URL.Path).Inc()
}
if self.requestFinished != nil {
defer prometheus.
NewTimer(self.requestFinished.WithLabelValues(r.URL.Path)).
ObserveDuration()
}

if next == nil {
log.Error("no next handler configured")
Expand All @@ -68,5 +75,6 @@ func (self *requestLogger) middleware(next http.Handler) http.Handler {
log.WithField("duration", time.Since(t)).
Log(self.completedLevel, "request completed")
}

return http.HandlerFunc(fn)
}

0 comments on commit e4ac5cf

Please sign in to comment.