diff --git a/audit/jetstream_checks.go b/audit/jetstream_checks.go index f45fe0b5..ecdff7aa 100644 --- a/audit/jetstream_checks.go +++ b/audit/jetstream_checks.go @@ -349,7 +349,7 @@ func checkStreamMetadataMonitoring(_ *Check, r *archive.Reader, examples *Exampl } opts.StreamName = streamName - monitor.StreamInfoHealthCheck(&streamDetails, check, *opts, log) + monitor.CheckStreamInfoHealth(&streamDetails, check, *opts, log) for _, warning := range check.Warnings { examples.Add("WARNING: stream %s in %s: %s", streamName, accountName, warning) diff --git a/monitor/connection.go b/monitor/connection.go index 417f5076..d4830850 100644 --- a/monitor/connection.go +++ b/monitor/connection.go @@ -20,8 +20,8 @@ import ( "github.com/nats-io/nats.go" ) -// ConnectionCheckOptions configures the NATS Connection check -type ConnectionCheckOptions struct { +// CheckConnectionOptions configures the NATS Connection check +type CheckConnectionOptions struct { // ConnectTimeWarning warning threshold for time to establish the connection (seconds) ConnectTimeWarning float64 `json:"connect_time_warning" yaml:"connect_time_warning"` // ConnectTimeCritical critical threshold for time to establish the connection (seconds) @@ -36,7 +36,7 @@ type ConnectionCheckOptions struct { RequestRttCritical float64 `json:"request_rtt_critical" yaml:"request_rtt_critical"` } -func CheckConnection(server string, nopts []nats.Option, timeout time.Duration, check *Result, opts ConnectionCheckOptions) error { +func CheckConnection(server string, nopts []nats.Option, timeout time.Duration, check *Result, opts CheckConnectionOptions) error { connStart := time.Now() nc, err := nats.Connect(server, nopts...) if check.CriticalIfErr(err, "connection failed: %v", err) { diff --git a/monitor/credentials.go b/monitor/credentials.go index 5f668ab3..08f5bf72 100644 --- a/monitor/credentials.go +++ b/monitor/credentials.go @@ -21,8 +21,8 @@ import ( "github.com/nats-io/nkeys" ) -// CredentialCheckOptions configures the credentials check -type CredentialCheckOptions struct { +// CheckCredentialOptions configures the credentials check +type CheckCredentialOptions struct { // File is the file holding the credential File string `json:"file" yaml:"file"` // ValidityWarning is the warning threshold for credential validity (seconds) @@ -33,7 +33,7 @@ type CredentialCheckOptions struct { RequiresExpiry bool `json:"requires_expiry" yaml:"requires_expiry"` } -func CheckCredential(check *Result, opts CredentialCheckOptions) error { +func CheckCredential(check *Result, opts CheckCredentialOptions) error { ok, err := fileAccessible(opts.File) if err != nil { check.Critical("credential not accessible: %v", err) diff --git a/monitor/credentials_test.go b/monitor/credentials_test.go index 0248ce89..9b073028 100644 --- a/monitor/credentials_test.go +++ b/monitor/credentials_test.go @@ -60,7 +60,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI } t.Run("no expiry", func(t *testing.T) { - opts := monitor.CredentialCheckOptions{ + opts := monitor.CheckCredentialOptions{ File: writeCred(t, noExpiry), RequiresExpiry: true, } @@ -80,7 +80,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI t.Run("critical", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckCredential(check, monitor.CredentialCheckOptions{ + assertNoError(t, monitor.CheckCredential(check, monitor.CheckCredentialOptions{ File: writeCred(t, noExpiry), ValidityCritical: 100 * 24 * 365 * 60 * 60, })) @@ -91,7 +91,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI t.Run("warning", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckCredential(check, monitor.CredentialCheckOptions{ + assertNoError(t, monitor.CheckCredential(check, monitor.CheckCredentialOptions{ File: writeCred(t, noExpiry), ValidityWarning: 100 * 24 * 365 * 60 * 60, })) diff --git a/monitor/js_account.go b/monitor/js_account.go index d691bf08..ba8413a9 100644 --- a/monitor/js_account.go +++ b/monitor/js_account.go @@ -21,7 +21,7 @@ import ( "github.com/nats-io/nats.go" ) -type JetStreamAccountOptions struct { +type CheckJetStreamAccountOptions struct { MemoryWarning int `json:"memory_warning" yaml:"memory_warning"` MemoryCritical int `json:"memory_critical" yaml:"memory_critical"` FileWarning int `json:"file_warning" yaml:"file_warning"` @@ -37,7 +37,7 @@ type JetStreamAccountOptions struct { Resolver func() *api.JetStreamAccountStats `json:"-" yaml:"-"` } -func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts JetStreamAccountOptions) error { +func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts CheckJetStreamAccountOptions) error { var mgr *jsm.Manager var err error @@ -82,7 +82,7 @@ func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, op return nil } -func checkStreamClusterHealth(check *Result, opts *JetStreamAccountOptions, info []*jsm.Stream) error { +func checkStreamClusterHealth(check *Result, opts *CheckJetStreamAccountOptions, info []*jsm.Stream) error { var okCnt, noLeaderCnt, notEnoughReplicasCnt, critCnt, lagCritCnt, seenCritCnt int for _, s := range info { @@ -177,7 +177,7 @@ func checkStreamClusterHealth(check *Result, opts *JetStreamAccountOptions, info return nil } -func checkJSAccountInfo(check *Result, opts *JetStreamAccountOptions, info *api.JetStreamAccountStats) error { +func checkJSAccountInfo(check *Result, opts *CheckJetStreamAccountOptions, info *api.JetStreamAccountStats) error { if info == nil { return fmt.Errorf("invalid account status") } diff --git a/monitor/js_account_test.go b/monitor/js_account_test.go index 92230515..03bc95d8 100644 --- a/monitor/js_account_test.go +++ b/monitor/js_account_test.go @@ -21,7 +21,7 @@ import ( ) func TestCheckAccountInfo(t *testing.T) { - setDefaults := func() (*monitor.JetStreamAccountOptions, *api.JetStreamAccountStats, *monitor.Result) { + setDefaults := func() (*monitor.CheckJetStreamAccountOptions, *api.JetStreamAccountStats, *monitor.Result) { info := &api.JetStreamAccountStats{ JetStreamTier: api.JetStreamTier{ Memory: 128, @@ -38,7 +38,7 @@ func TestCheckAccountInfo(t *testing.T) { } // cli defaults - cmd := &monitor.JetStreamAccountOptions{ + cmd := &monitor.CheckJetStreamAccountOptions{ ConsumersCritical: -1, ConsumersWarning: -1, StreamCritical: -1, diff --git a/monitor/kv.go b/monitor/kv.go index 837650ba..096feb67 100644 --- a/monitor/kv.go +++ b/monitor/kv.go @@ -19,8 +19,8 @@ import ( "github.com/nats-io/nats.go" ) -// KVCheckOptions configures the KV check -type KVCheckOptions struct { +// CheckKVBucketAndKeyOptions configures the KV check +type CheckKVBucketAndKeyOptions struct { // Bucket is the bucket to check Bucket string `json:"bucket" yaml:"bucket"` // Key requires a key to have a non delete/purge value set @@ -31,7 +31,7 @@ type KVCheckOptions struct { ValuesCritical int64 `json:"values_critical" yaml:"values_critical"` } -func CheckKVBucketAndKey(server string, nopts []nats.Option, check *Result, opts KVCheckOptions) error { +func CheckKVBucketAndKey(server string, nopts []nats.Option, check *Result, opts CheckKVBucketAndKeyOptions) error { nc, err := nats.Connect(server, nopts...) if check.CriticalIfErr(err, "connection failed: %v", err) { return nil diff --git a/monitor/kv_test.go b/monitor/kv_test.go index 471d3978..67037ab1 100644 --- a/monitor/kv_test.go +++ b/monitor/kv_test.go @@ -106,7 +106,7 @@ func TestCheckKVBucketAndKey(t *testing.T) { t.Run("Bucket", func(t *testing.T) { withJetStream(t, func(srv *server.Server, nc *nats.Conn) { check := &monitor.Result{} - err := monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.KVCheckOptions{ + err := monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.CheckKVBucketAndKeyOptions{ Bucket: "TEST", }) checkErr(t, err, "check failed: %v", err) @@ -121,7 +121,7 @@ func TestCheckKVBucketAndKey(t *testing.T) { checkErr(t, err, "kv create failed") check = &monitor.Result{} - err = monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.KVCheckOptions{ + err = monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.CheckKVBucketAndKeyOptions{ Bucket: "TEST", ValuesCritical: -1, ValuesWarning: -1, @@ -142,7 +142,7 @@ func TestCheckKVBucketAndKey(t *testing.T) { bucket, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) checkErr(t, err, "kv create failed: %v", err) - opts := monitor.KVCheckOptions{ + opts := monitor.CheckKVBucketAndKeyOptions{ Bucket: "TEST", ValuesWarning: 1, ValuesCritical: 2, @@ -227,7 +227,7 @@ func TestCheckKVBucketAndKey(t *testing.T) { bucket, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) checkErr(t, err, "kv create failed") - opts := monitor.KVCheckOptions{ + opts := monitor.CheckKVBucketAndKeyOptions{ Bucket: "TEST", Key: "KEY", ValuesWarning: -1, diff --git a/monitor/meta.go b/monitor/meta.go index 0148e3e6..7da48283 100644 --- a/monitor/meta.go +++ b/monitor/meta.go @@ -15,13 +15,14 @@ package monitor import ( "encoding/json" + "fmt" "time" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) -type CheckMetaOptions struct { +type CheckJetstreamMetaOptions struct { // ExpectServers the expected number of known servers in the meta cluster ExpectServers int `json:"expect_servers" yaml:"expect_servers"` // LagCritical the critical threshold for how many operations behind a peer may be @@ -29,15 +30,10 @@ type CheckMetaOptions struct { // SeenCritical the critical threshold for how long ago a peer was seen (seconds) SeenCritical float64 `json:"seen_critical" yaml:"seen_critical"` - Resolver func(*nats.Conn) (*JSZResponse, error) `json:"-" yaml:"-"` + Resolver func(*nats.Conn) (*server.ServerAPIJszResponse, error) `json:"-" yaml:"-"` } -type JSZResponse struct { - Data server.JSInfo `json:"data"` - Server server.ServerInfo `json:"server"` -} - -func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts CheckMetaOptions) error { +func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts CheckJetstreamMetaOptions) error { var nc *nats.Conn var err error @@ -47,8 +43,8 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts return nil } - opts.Resolver = func(conn *nats.Conn) (*JSZResponse, error) { - jszresp := &JSZResponse{} + opts.Resolver = func(conn *nats.Conn) (*server.ServerAPIJszResponse, error) { + jszresp := &server.ServerAPIJszResponse{} jreq, err := json.Marshal(&server.JSzOptions{LeaderOnly: true}) if check.CriticalIfErr(err, "request failed: %v", err) { return nil, err @@ -60,7 +56,14 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts } err = json.Unmarshal(res.Data, jszresp) - check.CriticalIfErr(err, "invalid result received: %s", err) + if check.CriticalIfErr(err, "invalid result received: %s", err) { + return nil, err + } + + if jszresp.Error == nil { + check.Critical("invalid result received: %s", jszresp.Error.Error()) + return nil, fmt.Errorf("invalid result received: %s", jszresp.Error.Error()) + } return jszresp, nil } @@ -71,6 +74,11 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts return nil } + if jszresp.Data == nil { + check.Critical("no JSZ response received") + return nil + } + ci := jszresp.Data.Meta if ci == nil { check.Critical("no cluster information") diff --git a/monitor/meta_test.go b/monitor/meta_test.go index 01015e12..37228913 100644 --- a/monitor/meta_test.go +++ b/monitor/meta_test.go @@ -25,9 +25,11 @@ import ( func TestCheckJSZ(t *testing.T) { t.Run("nil meta", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - return &monitor.JSZResponse{}, nil + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + return &server.ServerAPIJszResponse{ + Data: &server.JSInfo{}, + }, nil }, })) assertListEquals(t, check.Criticals, "no cluster information") @@ -35,10 +37,14 @@ func TestCheckJSZ(t *testing.T) { t.Run("no meta leader", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{} + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{}, + }, + } + return r, nil }, })) @@ -49,13 +55,17 @@ func TestCheckJSZ(t *testing.T) { t.Run("invalid peer count", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ ExpectServers: 2, - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{ - Leader: "L1", + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{ + Leader: "L1", + }, + }, } + return r, nil }, })) @@ -65,17 +75,20 @@ func TestCheckJSZ(t *testing.T) { t.Run("good peer", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ ExpectServers: 3, SeenCritical: 1, LagCritical: 10, - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{ - Leader: "l1", - Replicas: []*server.PeerInfo{ - {Name: "replica1", Current: true, Active: 10 * time.Millisecond, Lag: 1}, - {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{ + Leader: "l1", + Replicas: []*server.PeerInfo{ + {Name: "replica1", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + }, + }, }, } @@ -88,17 +101,20 @@ func TestCheckJSZ(t *testing.T) { t.Run("not current peer", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ ExpectServers: 3, SeenCritical: 1, LagCritical: 10, - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{ - Leader: "l1", - Replicas: []*server.PeerInfo{ - {Name: "replica1", Active: 10 * time.Millisecond, Lag: 1}, - {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{ + Leader: "l1", + Replicas: []*server.PeerInfo{ + {Name: "replica1", Active: 10 * time.Millisecond, Lag: 1}, + {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + }, + }, }, } @@ -113,17 +129,20 @@ func TestCheckJSZ(t *testing.T) { t.Run("offline peer", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ ExpectServers: 3, SeenCritical: 1, LagCritical: 10, - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{ - Leader: "l1", - Replicas: []*server.PeerInfo{ - {Name: "replica1", Current: true, Offline: true, Active: 10 * time.Millisecond, Lag: 1}, - {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{ + Leader: "l1", + Replicas: []*server.PeerInfo{ + {Name: "replica1", Current: true, Offline: true, Active: 10 * time.Millisecond, Lag: 1}, + {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + }, + }, }, } @@ -137,17 +156,20 @@ func TestCheckJSZ(t *testing.T) { t.Run("inactive peer", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ ExpectServers: 3, SeenCritical: 1, LagCritical: 10, - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{ - Leader: "l1", - Replicas: []*server.PeerInfo{ - {Name: "replica1", Current: true, Active: 10 * time.Hour, Lag: 1}, - {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{ + Leader: "l1", + Replicas: []*server.PeerInfo{ + {Name: "replica1", Current: true, Active: 10 * time.Hour, Lag: 1}, + {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + }, + }, }, } @@ -161,17 +183,20 @@ func TestCheckJSZ(t *testing.T) { t.Run("lagged peer", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ ExpectServers: 3, SeenCritical: 1, LagCritical: 10, - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{ - Leader: "l1", - Replicas: []*server.PeerInfo{ - {Name: "replica1", Current: true, Active: 10 * time.Millisecond, Lag: 10000}, - {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{ + Leader: "l1", + Replicas: []*server.PeerInfo{ + {Name: "replica1", Current: true, Active: 10 * time.Millisecond, Lag: 10000}, + {Name: "replica2", Current: true, Active: 10 * time.Millisecond, Lag: 1}, + }, + }, }, } @@ -184,19 +209,22 @@ func TestCheckJSZ(t *testing.T) { t.Run("multiple errors", func(t *testing.T) { check := &monitor.Result{} - assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckMetaOptions{ + assertNoError(t, monitor.CheckJetstreamMeta("", nil, check, monitor.CheckJetstreamMetaOptions{ ExpectServers: 3, SeenCritical: 1, LagCritical: 10, - Resolver: func(_ *nats.Conn) (*monitor.JSZResponse, error) { - r := &monitor.JSZResponse{} - r.Data.Meta = &server.MetaClusterInfo{ - Leader: "l1", - Replicas: []*server.PeerInfo{ - {Name: "replica1", Current: true, Active: 10 * time.Millisecond, Lag: 10000}, - {Name: "replica2", Current: true, Active: 10 * time.Hour, Lag: 1}, - {Name: "replica3", Current: true, Offline: true, Active: 10 * time.Millisecond, Lag: 1}, - {Name: "replica4", Active: 10 * time.Millisecond, Lag: 1}, + Resolver: func(_ *nats.Conn) (*server.ServerAPIJszResponse, error) { + r := &server.ServerAPIJszResponse{ + Data: &server.JSInfo{ + Meta: &server.MetaClusterInfo{ + Leader: "l1", + Replicas: []*server.PeerInfo{ + {Name: "replica1", Current: true, Active: 10 * time.Millisecond, Lag: 10000}, + {Name: "replica2", Current: true, Active: 10 * time.Hour, Lag: 1}, + {Name: "replica3", Current: true, Offline: true, Active: 10 * time.Millisecond, Lag: 1}, + {Name: "replica4", Active: 10 * time.Millisecond, Lag: 1}, + }, + }, }, } diff --git a/monitor/server.go b/monitor/server.go index 3a03bfaf..e87ba369 100644 --- a/monitor/server.go +++ b/monitor/server.go @@ -22,8 +22,8 @@ import ( "github.com/nats-io/nats.go" ) -// ServerCheckOptions configures the server check -type ServerCheckOptions struct { +// CheckServerOptions configures the server check +type CheckServerOptions struct { // Name is the server to get details for Name string `json:"name" yaml:"name"` // CPUWarning is the warning threshold for CPU usage @@ -56,7 +56,7 @@ type ServerCheckOptions struct { Resolver func(nc *nats.Conn, name string, timeout time.Duration) (*server.Varz, error) `json:"-" yaml:"-"` } -func CheckServer(server string, nopts []nats.Option, check *Result, timeout time.Duration, opts ServerCheckOptions) error { +func CheckServer(server string, nopts []nats.Option, check *Result, timeout time.Duration, opts CheckServerOptions) error { var nc *nats.Conn var err error @@ -168,8 +168,6 @@ func CheckServer(server string, nopts []nats.Option, check *Result, timeout time } func fetchVarz(nc *nats.Conn, name string, timeout time.Duration) (*server.Varz, error) { - var vz json.RawMessage - if name == "" { return nil, fmt.Errorf("server name is required") } @@ -184,28 +182,19 @@ func fetchVarz(nc *nats.Conn, name string, timeout time.Duration) (*server.Varz, return nil, err } - reqresp := map[string]json.RawMessage{} - err = json.Unmarshal(res.Data, &reqresp) + resp := &server.ServerAPIVarzResponse{} + err = json.Unmarshal(res.Data, &resp) if err != nil { return nil, err } - errresp, ok := reqresp["error"] - if ok { - return nil, fmt.Errorf("invalid response received: %#v", errresp) + if resp.Error != nil { + return nil, fmt.Errorf("invalid response received: %#v", resp.Error.Error()) } - vz = reqresp["data"] - - if len(vz) == 0 { + if resp.Data == nil { return nil, fmt.Errorf("no data received for %s", name) } - varz := &server.Varz{} - err = json.Unmarshal(vz, varz) - if err != nil { - return nil, err - } - - return varz, nil + return resp.Data, nil } diff --git a/monitor/server_test.go b/monitor/server_test.go index 55dcb011..738e535a 100644 --- a/monitor/server_test.go +++ b/monitor/server_test.go @@ -29,7 +29,7 @@ func TestCheckVarz(t *testing.T) { } check := &monitor.Result{} - err := monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "x", Resolver: vzResolver}) + err := monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "x", Resolver: vzResolver}) checkErr(t, err, "check failed: %v", err) assertListIsEmpty(t, check.Warnings) assertListIsEmpty(t, check.OKs) @@ -42,7 +42,7 @@ func TestCheckVarz(t *testing.T) { } check := &monitor.Result{} - err := monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "example.net", Resolver: vzResolver}) + err := monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "example.net", Resolver: vzResolver}) checkErr(t, err, "check failed: %v", err) assertListIsEmpty(t, check.Warnings) assertListIsEmpty(t, check.OKs) @@ -56,13 +56,13 @@ func TestCheckVarz(t *testing.T) { } check := &monitor.Result{} - err := monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "example.net", JetStreamRequired: true, Resolver: vzResolver}) + err := monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "example.net", JetStreamRequired: true, Resolver: vzResolver}) checkErr(t, err, "check failed: %v", err) assertListEquals(t, check.Criticals, "JetStream not enabled") vz.JetStream.Config = &server.JetStreamConfig{} check = &monitor.Result{} - err = monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "example.net", JetStreamRequired: true, Resolver: vzResolver}) + err = monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "example.net", JetStreamRequired: true, Resolver: vzResolver}) checkErr(t, err, "check failed: %v", err) assertListIsEmpty(t, check.Criticals) assertListEquals(t, check.OKs, "JetStream enabled") @@ -75,13 +75,13 @@ func TestCheckVarz(t *testing.T) { } check := &monitor.Result{} - err := monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "example.net", TLSRequired: true, Resolver: vzResolver}) + err := monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "example.net", TLSRequired: true, Resolver: vzResolver}) checkErr(t, err, "check failed: %v", err) assertListEquals(t, check.Criticals, "TLS not required") vz.TLSRequired = true check = &monitor.Result{} - err = monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "example.net", TLSRequired: true, Resolver: vzResolver}) + err = monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "example.net", TLSRequired: true, Resolver: vzResolver}) checkErr(t, err, "check failed: %v", err) assertListIsEmpty(t, check.Criticals) assertListEquals(t, check.OKs, "TLS required") @@ -94,12 +94,12 @@ func TestCheckVarz(t *testing.T) { } check := &monitor.Result{} - assertNoError(t, monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "example.net", AuthenticationRequired: true, Resolver: vzResolver})) + assertNoError(t, monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "example.net", AuthenticationRequired: true, Resolver: vzResolver})) assertListEquals(t, check.Criticals, "Authentication not required") vz.AuthRequired = true check = &monitor.Result{} - assertNoError(t, monitor.CheckServer("", nil, check, time.Second, monitor.ServerCheckOptions{Name: "example.net", AuthenticationRequired: true, Resolver: vzResolver})) + assertNoError(t, monitor.CheckServer("", nil, check, time.Second, monitor.CheckServerOptions{Name: "example.net", AuthenticationRequired: true, Resolver: vzResolver})) assertListIsEmpty(t, check.Criticals) assertListEquals(t, check.OKs, "Authentication required") }) @@ -110,7 +110,7 @@ func TestCheckVarz(t *testing.T) { return vz, nil } - opts := monitor.ServerCheckOptions{ + opts := monitor.CheckServerOptions{ Name: "example.net", Resolver: vzResolver, } @@ -216,7 +216,7 @@ func TestCheckVarz(t *testing.T) { return vz, nil } - opts := monitor.ServerCheckOptions{ + opts := monitor.CheckServerOptions{ Name: "example.net", Resolver: vzResolver, } @@ -267,7 +267,7 @@ func TestCheckVarz(t *testing.T) { return vz, nil } - opts := monitor.ServerCheckOptions{ + opts := monitor.CheckServerOptions{ Name: "example.net", Resolver: vzResolver, } diff --git a/monitor/stream.go b/monitor/stream.go index 4a5173c3..8cf4646e 100644 --- a/monitor/stream.go +++ b/monitor/stream.go @@ -36,10 +36,10 @@ const ( StreamMonitorMetaSubjectsCritical = "io.nats.monitor.subjects-critical" ) -type StreamHealthCheckF func(*jsm.Stream, *Result, StreamHealthCheckOptions, api.Logger) +type StreamHealthCheckF func(*jsm.Stream, *Result, CheckStreamHealthOptions, api.Logger) -// StreamHealthCheckOptions configures the stream check -type StreamHealthCheckOptions struct { +// CheckStreamHealthOptions configures the stream check +type CheckStreamHealthOptions struct { // StreamName stream to monitor StreamName string `json:"stream_name" yaml:"stream_name"` // SourcesLagCritical critical threshold for how many operations behind sources may be @@ -74,16 +74,16 @@ type monitorMetaParser struct { fn func(string) error } -// ExtractStreamHealthCheckOptions checks stream metadata and populate StreamHealthCheckOptions based on it -func ExtractStreamHealthCheckOptions(metadata map[string]string, extraChecks ...StreamHealthCheckF) (*StreamHealthCheckOptions, error) { - opts := &StreamHealthCheckOptions{ +// ExtractStreamHealthCheckOptions checks stream metadata and populate CheckStreamHealthOptions based on it +func ExtractStreamHealthCheckOptions(metadata map[string]string, extraChecks ...StreamHealthCheckF) (*CheckStreamHealthOptions, error) { + opts := &CheckStreamHealthOptions{ HealthChecks: extraChecks, } return populateStreamHealthCheckOptions(metadata, opts) } -func populateStreamHealthCheckOptions(metadata map[string]string, opts *StreamHealthCheckOptions) (*StreamHealthCheckOptions, error) { +func populateStreamHealthCheckOptions(metadata map[string]string, opts *CheckStreamHealthOptions) (*CheckStreamHealthOptions, error) { var err error parser := []monitorMetaParser{ {MonitorMetaEnabled, func(v string) error { @@ -154,7 +154,7 @@ func populateStreamHealthCheckOptions(metadata map[string]string, opts *StreamHe return opts, nil } -func StreamInfoHealthCheck(nfo *api.StreamInfo, check *Result, opts StreamHealthCheckOptions, log api.Logger) { +func CheckStreamInfoHealth(nfo *api.StreamInfo, check *Result, opts CheckStreamHealthOptions, log api.Logger) { streamCheckCluster(nfo, check, opts, log) streamCheckMessages(nfo, check, opts, log) streamCheckSubjects(nfo, check, opts, log) @@ -162,7 +162,7 @@ func StreamInfoHealthCheck(nfo *api.StreamInfo, check *Result, opts StreamHealth streamCheckMirror(nfo, check, opts, log) } -func StreamHealthCheck(server string, nopts []nats.Option, check *Result, opts StreamHealthCheckOptions, log api.Logger) error { +func CheckStreamHealth(server string, nopts []nats.Option, check *Result, opts CheckStreamHealthOptions, log api.Logger) error { if opts.StreamName == "" { check.Critical("stream name is required") return nil @@ -194,7 +194,7 @@ func StreamHealthCheck(server string, nopts []nats.Option, check *Result, opts S return nil } - StreamInfoHealthCheck(nfo, check, opts, log) + CheckStreamInfoHealth(nfo, check, opts, log) for _, hc := range opts.HealthChecks { hc(stream, check, opts, log) @@ -203,7 +203,7 @@ func StreamHealthCheck(server string, nopts []nats.Option, check *Result, opts S return nil } -func streamCheckMirror(si *api.StreamInfo, check *Result, opts StreamHealthCheckOptions, log api.Logger) { +func streamCheckMirror(si *api.StreamInfo, check *Result, opts CheckStreamHealthOptions, log api.Logger) { // We check sources here because they are mutually exclusive with mirrors. If sources are set, mirrors // can't be so we can bail out of this check early if (opts.SourcesLagCritical <= 0 && opts.SourcesSeenCritical <= 0) || si.Config.Name == "" || len(si.Sources) > 0 { @@ -249,7 +249,7 @@ func streamCheckMirror(si *api.StreamInfo, check *Result, opts StreamHealthCheck } } -func streamCheckSources(si *api.StreamInfo, check *Result, opts StreamHealthCheckOptions, log api.Logger) { +func streamCheckSources(si *api.StreamInfo, check *Result, opts CheckStreamHealthOptions, log api.Logger) { sources := si.Sources count := len(sources) @@ -302,7 +302,7 @@ func streamCheckSources(si *api.StreamInfo, check *Result, opts StreamHealthChec } } -func streamCheckSubjects(si *api.StreamInfo, check *Result, opts StreamHealthCheckOptions, log api.Logger) { +func streamCheckSubjects(si *api.StreamInfo, check *Result, opts CheckStreamHealthOptions, log api.Logger) { if opts.SubjectsWarn <= 0 && opts.SubjectsCrit <= 0 { return } @@ -331,7 +331,7 @@ func streamCheckSubjects(si *api.StreamInfo, check *Result, opts StreamHealthChe } // TODO: support inverting logic and also in cli -func streamCheckMessages(si *api.StreamInfo, check *Result, opts StreamHealthCheckOptions, log api.Logger) { +func streamCheckMessages(si *api.StreamInfo, check *Result, opts CheckStreamHealthOptions, log api.Logger) { if opts.MessagesCrit <= 0 && opts.MessagesWarn <= 0 { return } @@ -353,7 +353,7 @@ func streamCheckMessages(si *api.StreamInfo, check *Result, opts StreamHealthChe check.Ok("%d messages", si.State.Msgs) } -func streamCheckCluster(si *api.StreamInfo, check *Result, opts StreamHealthCheckOptions, log api.Logger) { +func streamCheckCluster(si *api.StreamInfo, check *Result, opts CheckStreamHealthOptions, log api.Logger) { nfo := si.Cluster if (nfo == nil || si.Config.Replicas <= 1) && opts.ClusterExpectedPeers <= 0 { diff --git a/monitor/stream_test.go b/monitor/stream_test.go index 1df6f79e..cb65cfd2 100644 --- a/monitor/stream_test.go +++ b/monitor/stream_test.go @@ -32,7 +32,7 @@ func TestStream_checkSources(t *testing.T) { t.Run("Should handle fewer than desired", func(t *testing.T) { check, si := setup() - streamCheckSources(si, check, StreamHealthCheckOptions{ + streamCheckSources(si, check, CheckStreamHealthOptions{ MinSources: 1, MaxSources: 2, }, api.NewDiscardLogger()) @@ -40,7 +40,7 @@ func TestStream_checkSources(t *testing.T) { check, si = setup() si.Sources = append(si.Sources, &api.StreamSourceInfo{}) - streamCheckSources(si, check, StreamHealthCheckOptions{ + streamCheckSources(si, check, CheckStreamHealthOptions{ MinSources: 2, MaxSources: 3, }, api.NewDiscardLogger()) @@ -51,7 +51,7 @@ func TestStream_checkSources(t *testing.T) { check, si := setup() si.Sources = append(si.Sources, &api.StreamSourceInfo{}) si.Sources = append(si.Sources, &api.StreamSourceInfo{}) - streamCheckSources(si, check, StreamHealthCheckOptions{ + streamCheckSources(si, check, CheckStreamHealthOptions{ MaxSources: 1, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "2 sources") @@ -61,7 +61,7 @@ func TestStream_checkSources(t *testing.T) { check, si := setup() si.Sources = append(si.Sources, &api.StreamSourceInfo{}) si.Sources = append(si.Sources, &api.StreamSourceInfo{}) - streamCheckSources(si, check, StreamHealthCheckOptions{ + streamCheckSources(si, check, CheckStreamHealthOptions{ MinSources: 2, MaxSources: 3, }, api.NewDiscardLogger()) @@ -77,7 +77,7 @@ func TestStream_checkSources(t *testing.T) { si.Sources = append(si.Sources, &api.StreamSourceInfo{ Lag: 200, }) - streamCheckSources(si, check, StreamHealthCheckOptions{ + streamCheckSources(si, check, CheckStreamHealthOptions{ SourcesLagCritical: 100, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "2 sources are lagged") @@ -91,7 +91,7 @@ func TestStream_checkSources(t *testing.T) { si.Sources = append(si.Sources, &api.StreamSourceInfo{ Active: 2 * time.Second, }) - streamCheckSources(si, check, StreamHealthCheckOptions{ + streamCheckSources(si, check, CheckStreamHealthOptions{ SourcesSeenCritical: float64(1) / 1000, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "2 sources are inactive") @@ -107,7 +107,7 @@ func TestStream_checkSources(t *testing.T) { Lag: 200, Active: time.Millisecond, }) - streamCheckSources(si, check, StreamHealthCheckOptions{ + streamCheckSources(si, check, CheckStreamHealthOptions{ SourcesLagCritical: 500, SourcesSeenCritical: 1, MinSources: 2, @@ -134,7 +134,7 @@ func TestStream_checkMessages(t *testing.T) { check, si := setup() si.State.Msgs = 1000 - streamCheckMessages(si, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + streamCheckMessages(si, check, CheckStreamHealthOptions{}, api.NewDiscardLogger()) requireEmpty(t, check.Criticals) requireEmpty(t, check.Warnings) requireEmpty(t, check.OKs) @@ -143,7 +143,7 @@ func TestStream_checkMessages(t *testing.T) { t.Run("Should handle critical situations", func(t *testing.T) { check, si := setup() si.State.Msgs = 1000 - streamCheckMessages(si, check, StreamHealthCheckOptions{ + streamCheckMessages(si, check, CheckStreamHealthOptions{ MessagesCrit: 1000, }, api.NewDiscardLogger()) @@ -152,7 +152,7 @@ func TestStream_checkMessages(t *testing.T) { check, si = setup() si.State.Msgs = 999 - streamCheckMessages(si, check, StreamHealthCheckOptions{ + streamCheckMessages(si, check, CheckStreamHealthOptions{ MessagesCrit: 1000, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "999 messages") @@ -162,7 +162,7 @@ func TestStream_checkMessages(t *testing.T) { t.Run("Should handle warning situations", func(t *testing.T) { check, si := setup() si.State.Msgs = 1000 - streamCheckMessages(si, check, StreamHealthCheckOptions{ + streamCheckMessages(si, check, CheckStreamHealthOptions{ MessagesWarn: 1000, }, api.NewDiscardLogger()) requireElement(t, check.Warnings, "1000 messages") @@ -172,7 +172,7 @@ func TestStream_checkMessages(t *testing.T) { check, si = setup() si.State.Msgs = 999 - streamCheckMessages(si, check, StreamHealthCheckOptions{ + streamCheckMessages(si, check, CheckStreamHealthOptions{ MessagesWarn: 1000, }, api.NewDiscardLogger()) requireElement(t, check.Warnings, "999 messages") @@ -183,7 +183,7 @@ func TestStream_checkMessages(t *testing.T) { t.Run("Should handle ok situations", func(t *testing.T) { check, si := setup() si.State.Msgs = 1000 - streamCheckMessages(si, check, StreamHealthCheckOptions{ + streamCheckMessages(si, check, CheckStreamHealthOptions{ MessagesWarn: 500, MessagesCrit: 200, }, api.NewDiscardLogger()) @@ -204,7 +204,7 @@ func TestStream_checkSubjects(t *testing.T) { t.Run("Should handle no thresholds", func(t *testing.T) { check, si := setup() - streamCheckSubjects(si, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + streamCheckSubjects(si, check, CheckStreamHealthOptions{}, api.NewDiscardLogger()) requireEmpty(t, check.Criticals) requireEmpty(t, check.Warnings) requireEmpty(t, check.OKs) @@ -215,7 +215,7 @@ func TestStream_checkSubjects(t *testing.T) { check, si := setup() si.State.NumSubjects = 100 - streamCheckSubjects(si, check, StreamHealthCheckOptions{ + streamCheckSubjects(si, check, CheckStreamHealthOptions{ SubjectsWarn: 200, SubjectsCrit: 300, }, api.NewDiscardLogger()) @@ -227,7 +227,7 @@ func TestStream_checkSubjects(t *testing.T) { t.Run("Should handle more than subjects", func(t *testing.T) { check, si := setup() si.State.NumSubjects = 400 - streamCheckSubjects(si, check, StreamHealthCheckOptions{ + streamCheckSubjects(si, check, CheckStreamHealthOptions{ SubjectsWarn: 200, SubjectsCrit: 300, }, api.NewDiscardLogger()) @@ -237,7 +237,7 @@ func TestStream_checkSubjects(t *testing.T) { check, si = setup() si.State.NumSubjects = 250 - streamCheckSubjects(si, check, StreamHealthCheckOptions{ + streamCheckSubjects(si, check, CheckStreamHealthOptions{ SubjectsWarn: 200, SubjectsCrit: 300, }, api.NewDiscardLogger()) @@ -249,7 +249,7 @@ func TestStream_checkSubjects(t *testing.T) { t.Run("Should handle valid subject counts", func(t *testing.T) { check, si := setup() si.State.NumSubjects = 100 - streamCheckSubjects(si, check, StreamHealthCheckOptions{ + streamCheckSubjects(si, check, CheckStreamHealthOptions{ SubjectsWarn: 200, SubjectsCrit: 300, }, api.NewDiscardLogger()) @@ -263,7 +263,7 @@ func TestStream_checkSubjects(t *testing.T) { t.Run("Should handle fewer subjects", func(t *testing.T) { check, si := setup() si.State.NumSubjects = 100 - streamCheckSubjects(si, check, StreamHealthCheckOptions{ + streamCheckSubjects(si, check, CheckStreamHealthOptions{ SubjectsWarn: 300, SubjectsCrit: 200, }, api.NewDiscardLogger()) @@ -273,7 +273,7 @@ func TestStream_checkSubjects(t *testing.T) { check, si = setup() si.State.NumSubjects = 250 - streamCheckSubjects(si, check, StreamHealthCheckOptions{ + streamCheckSubjects(si, check, CheckStreamHealthOptions{ SubjectsWarn: 300, SubjectsCrit: 200, }, api.NewDiscardLogger()) @@ -285,7 +285,7 @@ func TestStream_checkSubjects(t *testing.T) { t.Run("Should handle valid subject counts", func(t *testing.T) { check, si := setup() si.State.NumSubjects = 400 - streamCheckSubjects(si, check, StreamHealthCheckOptions{ + streamCheckSubjects(si, check, CheckStreamHealthOptions{ SubjectsWarn: 300, SubjectsCrit: 200, }, api.NewDiscardLogger()) @@ -300,7 +300,7 @@ func TestStream_checkSubjects(t *testing.T) { func TestStream_checkMirror(t *testing.T) { t.Run("Should handle no thresholds", func(t *testing.T) { check := &Result{} - streamCheckMirror(&api.StreamInfo{}, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + streamCheckMirror(&api.StreamInfo{}, check, CheckStreamHealthOptions{}, api.NewDiscardLogger()) requireEmpty(t, check.Criticals) requireEmpty(t, check.Warnings) requireEmpty(t, check.OKs) @@ -315,7 +315,7 @@ func TestStream_checkMirror(t *testing.T) { }, } - streamCheckMirror(si, check, StreamHealthCheckOptions{ + streamCheckMirror(si, check, CheckStreamHealthOptions{ SourcesLagCritical: 1, SourcesSeenCritical: 1, }, api.NewDiscardLogger()) @@ -334,14 +334,14 @@ func TestStream_checkMirror(t *testing.T) { }, } - streamCheckMirror(si, check, StreamHealthCheckOptions{ + streamCheckMirror(si, check, CheckStreamHealthOptions{ SourcesLagCritical: 100, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "Mirror Lag 100") check = &Result{} si.Mirror.Lag = 200 - streamCheckMirror(si, check, StreamHealthCheckOptions{ + streamCheckMirror(si, check, CheckStreamHealthOptions{ SourcesLagCritical: 100, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "Mirror Lag 200") @@ -359,14 +359,14 @@ func TestStream_checkMirror(t *testing.T) { }, } - streamCheckMirror(si, check, StreamHealthCheckOptions{ + streamCheckMirror(si, check, CheckStreamHealthOptions{ SourcesSeenCritical: float64(1) / 1000, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "Mirror Seen 1ms") check = &Result{} si.Mirror.Active = time.Second - streamCheckMirror(si, check, StreamHealthCheckOptions{ + streamCheckMirror(si, check, CheckStreamHealthOptions{ SourcesSeenCritical: float64(1) / 1000, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "Mirror Seen 1s") @@ -387,7 +387,7 @@ func TestStream_checkMirror(t *testing.T) { }, } - streamCheckMirror(si, check, StreamHealthCheckOptions{ + streamCheckMirror(si, check, CheckStreamHealthOptions{ SourcesLagCritical: 200, SourcesSeenCritical: 1, }, api.NewDiscardLogger()) @@ -401,7 +401,7 @@ func TestStream_checkMirror(t *testing.T) { func TestStream_checkCluster(t *testing.T) { t.Run("Skip without threshold", func(t *testing.T) { check := &Result{} - streamCheckCluster(&api.StreamInfo{}, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + streamCheckCluster(&api.StreamInfo{}, check, CheckStreamHealthOptions{}, api.NewDiscardLogger()) requireEmpty(t, check.Criticals) requireEmpty(t, check.Warnings) requireEmpty(t, check.OKs) @@ -409,7 +409,7 @@ func TestStream_checkCluster(t *testing.T) { t.Run("Should be critical when the stream is not clustered and a threshold is given", func(t *testing.T) { check := &Result{} - streamCheckCluster(&api.StreamInfo{}, check, StreamHealthCheckOptions{ + streamCheckCluster(&api.StreamInfo{}, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "Stream is not clustered") @@ -431,7 +431,7 @@ func TestStream_checkCluster(t *testing.T) { }, } - streamCheckCluster(si, check, StreamHealthCheckOptions{ + streamCheckCluster(si, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "Expected 3 replicas got 2") @@ -452,7 +452,7 @@ func TestStream_checkCluster(t *testing.T) { }, } - streamCheckCluster(si, check, StreamHealthCheckOptions{ + streamCheckCluster(si, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, }, api.NewDiscardLogger()) requireElement(t, check.Criticals, "No leader") @@ -466,7 +466,7 @@ func TestStream_checkCluster(t *testing.T) { }, } - streamCheckCluster(si, check, StreamHealthCheckOptions{ + streamCheckCluster(si, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, }, api.NewDiscardLogger()) requireEmpty(t, check.Criticals) @@ -488,7 +488,7 @@ func TestStream_checkCluster(t *testing.T) { }, } - streamCheckCluster(si, check, StreamHealthCheckOptions{ + streamCheckCluster(si, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, ClusterLagCritical: 100, }, api.NewDiscardLogger()) @@ -510,7 +510,7 @@ func TestStream_checkCluster(t *testing.T) { }, } - streamCheckCluster(si, check, StreamHealthCheckOptions{ + streamCheckCluster(si, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, ClusterSeenCritical: 60, }, api.NewDiscardLogger()) @@ -533,7 +533,7 @@ func TestStream_checkCluster(t *testing.T) { }, } - streamCheckCluster(si, check, StreamHealthCheckOptions{ + streamCheckCluster(si, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, }, api.NewDiscardLogger()) @@ -554,7 +554,7 @@ func TestStream_checkCluster(t *testing.T) { Replicas: 3, }, } - streamCheckCluster(si, check, StreamHealthCheckOptions{ + streamCheckCluster(si, check, CheckStreamHealthOptions{ ClusterExpectedPeers: 3, ClusterLagCritical: 20, ClusterSeenCritical: 60,