Skip to content

Commit

Permalink
Merge pull request #625 from ripienaar/check_request
Browse files Browse the repository at this point in the history
Adds a check that can check a request-reply service
  • Loading branch information
ripienaar authored Feb 4, 2025
2 parents 4d953a4 + 2efcc55 commit a685220
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 7 deletions.
100 changes: 100 additions & 0 deletions monitor/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package monitor

import (
"regexp"
"time"

"github.com/nats-io/nats.go"
)

type CheckRequestOptions struct {
// Subject is the subject to send the request to
Subject string `json:"subject" yaml:"subject"`
// Payload is the payload to send to the service
Payload string `json:"payload" yaml:"payload"`
// Header is headers to send with the request
Header map[string]string `json:"header" yaml:"header"`
//HeaderMatch to send in the payload
HeaderMatch map[string]string `json:"headers" yaml:"headers"`
// ResponseMatch applies regular expression match against the payload
ResponseMatch string `json:"response_match" yaml:"response_match"`
// ResponseTimeWarn warns when the response takes longer than a certain time
ResponseTimeWarn time.Duration `json:"response_time_warn" yaml:"response_time_warn"`
// ResponseTimeCritical logs critical when the response takes longer than a certain time
ResponseTimeCritical time.Duration `json:"response_time_crit" yaml:"response_time_crit"`
}

func CheckRequest(server string, nopts []nats.Option, check *Result, timeout time.Duration, opts CheckRequestOptions) error {
nc, err := nats.Connect(server, nopts...)
if check.CriticalIfErr(err, "could not load info: %v", err) {
return nil
}

if opts.Subject == "" {
check.Critical("no subject specified")
return nil
}

msg := nats.NewMsg(opts.Subject)
msg.Data = []byte(opts.Payload)
for k, v := range opts.Header {
msg.Header.Add(k, v)
}

start := time.Now()
resp, err := nc.RequestMsg(msg, timeout)
since := time.Since(start)

check.Pd(&PerfDataItem{
Help: "How long the request took",
Name: "time",
Value: float64(since.Round(time.Millisecond).Seconds()),
Warn: opts.ResponseTimeWarn.Seconds(),
Crit: opts.ResponseTimeCritical.Seconds(),
Unit: "s",
})
if check.CriticalIfErr(err, "could not send request: %v", err) {
return nil
}

if opts.ResponseMatch != "" {
re, err := regexp.Compile(opts.ResponseMatch)
if check.CriticalIfErr(err, "content regex compile failed: %v", err) {
return nil
}

if !re.Match(resp.Data) {
check.Critical("response does not match regexp")
}
}

for k, v := range opts.HeaderMatch {
rv := resp.Header.Get(k)
if rv != v {
check.Critical("invalid header %q = %q", k, rv)
}
}

if opts.ResponseTimeCritical > 0 && since > opts.ResponseTimeCritical {
check.Critical("response took %v", since.Round(time.Millisecond))
} else if opts.ResponseTimeWarn > 0 && since > opts.ResponseTimeWarn {
check.Warn("response took %v", since.Round(time.Millisecond))
}

check.OkIfNoWarningsOrCriticals("Valid response")

return nil
}
138 changes: 138 additions & 0 deletions monitor/request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package monitor_test

import (
"regexp"
"testing"
"time"

"github.com/nats-io/jsm.go/monitor"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)

func TestCheckRequest(t *testing.T) {
t.Run("Body match", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}

_, err := nc.Subscribe("test", func(msg *nats.Msg) {
msg.Respond([]byte("test payload"))
})
assertNoError(t, err)

assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseMatch: "no match",
}))
assertListIsEmpty(t, check.OKs)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.Criticals, "response does not match regexp")

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseMatch: ".+payload",
}))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.OKs, "Valid response")
})
})

t.Run("Headers", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}

_, err := nc.Subscribe("test", func(msg *nats.Msg) {
rmsg := nats.NewMsg(msg.Reply)
rmsg.Header.Add("test", "test header")
msg.RespondMsg(rmsg)
})
assertNoError(t, err)

assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
HeaderMatch: map[string]string{"test": "no match", "other": "header"},
}))
assertListIsEmpty(t, check.OKs)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.Criticals, `invalid header "other" = ""`, `invalid header "test" = "test header"`)

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
HeaderMatch: map[string]string{"test": "test header"},
}))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.OKs, "Valid response")
})
})

t.Run("Response Time", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}
_, err := nc.Subscribe("test", func(msg *nats.Msg) {
time.Sleep(500 * time.Millisecond)
msg.Respond([]byte("test payload"))
})
assertNoError(t, err)

assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseTimeWarn: 20 * time.Millisecond,
ResponseTimeCritical: time.Second,
}))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.OKs)
if len(check.Warnings) != 1 {
t.Fatalf("expected 1 warning, got %d", len(check.Warnings))
}
m, err := regexp.MatchString("^response took \\d+ms", check.Warnings[0])
assertNoError(t, err)
if !m {
t.Fatalf("warning not match %s", check.Warnings[0])
}

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseTimeWarn: 20 * time.Millisecond,
ResponseTimeCritical: 400 * time.Millisecond,
}))
assertListIsEmpty(t, check.Warnings)
assertListIsEmpty(t, check.OKs)
if len(check.Criticals) != 1 {
t.Fatalf("expected 1 warning, got %d", len(check.Criticals))
}
m, err = regexp.MatchString("^response took \\d+ms", check.Criticals[0])
assertNoError(t, err)
if !m {
t.Fatalf("warning not match %s", check.Criticals[0])
}

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseTimeWarn: 800 * time.Millisecond,
ResponseTimeCritical: 1000 * time.Millisecond,
}))
assertListIsEmpty(t, check.Warnings)
assertListIsEmpty(t, check.Criticals)
assertListEquals(t, check.OKs, "Valid response")
})
})
}
6 changes: 6 additions & 0 deletions monitor/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (r *Result) Ok(format string, a ...any) {
r.OKs = append(r.OKs, fmt.Sprintf(format, a...))
}

func (r *Result) OkIfNoWarningsOrCriticals(format string, a ...any) {
if len(r.Warnings) == 0 && len(r.Criticals) == 0 {
r.Ok(format, a...)
}
}

func (r *Result) CriticalExitIfErr(err error, format string, a ...any) bool {
if err == nil {
return false
Expand Down
12 changes: 5 additions & 7 deletions monitor/stream_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts

since := time.Since(ts)

if opts.AgeWarning > 0 || opts.AgeCritical > 0 {
if opts.AgeCritical > 0 && since > secondsToDuration(opts.AgeCritical) {
check.Critical("%v old", since.Round(time.Millisecond))
} else if opts.AgeWarning > 0 && since > secondsToDuration(opts.AgeWarning) {
check.Warn("%v old", time.Since(ts).Round(time.Millisecond))
}
if opts.AgeCritical > 0 && since > secondsToDuration(opts.AgeCritical) {
check.Critical("%v old", since.Round(time.Millisecond))
} else if opts.AgeWarning > 0 && since > secondsToDuration(opts.AgeWarning) {
check.Warn("%v old", time.Since(ts).Round(time.Millisecond))
}

if opts.Content != "" {
Expand All @@ -104,7 +102,7 @@ func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts
}
}

check.Ok("Valid message on %s > %s", opts.StreamName, opts.Subject)
check.OkIfNoWarningsOrCriticals("Valid message on %s > %s", opts.StreamName, opts.Subject)

return nil
}

0 comments on commit a685220

Please sign in to comment.