Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Raft Node Support for Health Checks and Metrics Collection #643

Merged
merged 7 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gatewayd-io/gatewayd/network"
"github.com/gatewayd-io/gatewayd/plugin"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/gatewayd-io/gatewayd/raft"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/codes"
Expand All @@ -27,6 +28,7 @@ type Options struct {
GRPCAddress string
HTTPAddress string
Servers map[string]*network.Server
RaftNode *raft.Node
}

type API struct {
Expand Down
6 changes: 4 additions & 2 deletions api/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/gatewayd-io/gatewayd/network"
"github.com/gatewayd-io/gatewayd/raft"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
Expand All @@ -12,14 +13,15 @@ import (
type HealthChecker struct {
grpc_health_v1.UnimplementedHealthServer

Servers map[string]*network.Server
Servers map[string]*network.Server
raftNode *raft.Node
}

func (h *HealthChecker) Check(
context.Context, *grpc_health_v1.HealthCheckRequest,
) (*grpc_health_v1.HealthCheckResponse, error) {
// Check if all servers are running
if liveness(h.Servers) {
if liveness(h.Servers, h.raftNode) {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
Expand Down
27 changes: 27 additions & 0 deletions api/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"
"testing"
"time"

"github.com/gatewayd-io/gatewayd/act"
"github.com/gatewayd-io/gatewayd/config"
Expand Down Expand Up @@ -63,6 +64,16 @@ func Test_Healthchecker(t *testing.T) {
},
)

raftHelper, err := testhelpers.NewTestRaftNode(t)
if err != nil {
t.Fatalf("Failed to create test raft node: %v", err)
}
defer func() {
if err := raftHelper.Cleanup(); err != nil {
t.Errorf("Failed to cleanup raft: %v", err)
}
}()

server := network.NewServer(
context.TODO(),
network.Server{
Expand All @@ -77,20 +88,36 @@ func Test_Healthchecker(t *testing.T) {
PluginRegistry: pluginRegistry,
PluginTimeout: config.DefaultPluginTimeout,
HandshakeTimeout: config.DefaultHandshakeTimeout,
RaftNode: raftHelper.Node,
},
)

healthchecker := HealthChecker{
Servers: map[string]*network.Server{
config.Default: server,
},
raftNode: raftHelper.Node,
}
assert.NotNil(t, healthchecker)
hcr, err := healthchecker.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
assert.NotNil(t, hcr)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, hcr.GetStatus())

go func(t *testing.T, server *network.Server) {
t.Helper()
if err := server.Run(); err != nil {
t.Errorf("server.Run() error = %v", err)
}
}(t, server)

time.Sleep(1 * time.Second)
// Test for SERVING status
hcr, err = healthchecker.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
assert.NotNil(t, hcr)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, hcr.GetStatus())

err = healthchecker.Watch(&grpc_health_v1.HealthCheckRequest{}, nil)
assert.Error(t, err)
assert.Equal(t, "rpc error: code = Unimplemented desc = not implemented", err.Error())
Expand Down
2 changes: 1 addition & 1 deletion api/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createHTTPAPI(options *Options) *http.Server {
mux := http.NewServeMux()
mux.Handle("/", rmux)
mux.HandleFunc("/healthz", func(writer http.ResponseWriter, _ *http.Request) {
if liveness(options.Servers) {
if liveness(options.Servers, options.RaftNode) {
writer.Header().Set("Content-Type", "application/json")
writer.WriteHeader(http.StatusOK)
if err := json.NewEncoder(writer).Encode(Healthz{Status: "SERVING"}); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package api

import (
"github.com/gatewayd-io/gatewayd/network"
"github.com/gatewayd-io/gatewayd/raft"
)

func liveness(servers map[string]*network.Server) bool {
func liveness(servers map[string]*network.Server, raftNode *raft.Node) bool {
for _, v := range servers {
if !v.IsRunning() {
return false
}
}
if raftNode != nil {
if !raftNode.GetHealthStatus().IsHealthy {
return false
}
}
return true
}
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,7 @@ var runCmd = &cobra.Command{
GRPCAddress: conf.Global.API.GRPCAddress,
HTTPAddress: conf.Global.API.HTTPAddress,
Servers: servers,
RaftNode: raftNode,
}

apiObj := &api.API{
Expand Down
40 changes: 39 additions & 1 deletion docker-compose-raft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ services:
- "19090:19090"
- "42223:2223"
- "50051:50051"
- "29090:9090"
volumes:
- ./gatewayd-files:/gatewayd-files:ro
- ./raft-data-1:/var/lib/gatewayd/raft
Expand Down Expand Up @@ -124,9 +125,28 @@ services:
- "19091:19090"
- "42224:2223"
- "50052:50051"
- "29091:9090"
volumes:
- ./gatewayd-files:/gatewayd-files:ro
- ./raft-data-2:/var/lib/gatewayd/raft
links:
- write-postgres
- read-postgres
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://gatewayd-2:18080/healthz"]
interval: 5s
timeout: 5s
retries: 5
depends_on:
write-postgres:
condition: service_healthy
read-postgres:
condition: service_healthy
redis:
condition: service_healthy
install_plugins:
condition: service_completed_successfully

gatewayd-3:
image: gatewaydio/gatewayd:latest
Expand Down Expand Up @@ -156,10 +176,28 @@ services:
- "19092:19090"
- "42225:2223"
- "50053:50051"
- "29092:9090"
volumes:
- ./gatewayd-files:/gatewayd-files:ro
- ./raft-data-3:/var/lib/gatewayd/raft

links:
- write-postgres
- read-postgres
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://gatewayd-3:18080/healthz"]
interval: 5s
timeout: 5s
retries: 5
depends_on:
write-postgres:
condition: service_healthy
read-postgres:
condition: service_healthy
redis:
condition: service_healthy
install_plugins:
condition: service_completed_successfully
prometheus:
image: prom/prometheus:latest
volumes:
Expand Down
24 changes: 24 additions & 0 deletions metrics/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,28 @@ var (
Name: "api_requests_errors_total",
Help: "Number of API request errors",
}, []string{"method", "endpoint", "error"})
RaftHealthStatus = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "gatewayd",
Name: "raft_health_status",
Help: "Current health status of the Raft node (1 if healthy, 0 if unhealthy)",
},
[]string{"node_id"},
)
RaftLeaderStatus = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "gatewayd",
Name: "raft_leader_status",
Help: "Indicates if this node is the current Raft leader (1) or follower (0)",
},
[]string{"node_id"},
)
RaftLastContactLatency = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "gatewayd",
Name: "raft_last_contact_seconds",
Help: "Time since last contact with the Raft leader in seconds",
},
[]string{"node_id"},
)
)
77 changes: 77 additions & 0 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/gatewayd-io/gatewayd/config"
"github.com/gatewayd-io/gatewayd/metrics"
pb "github.com/gatewayd-io/gatewayd/raft/proto"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
Expand Down Expand Up @@ -565,3 +566,79 @@ func (n *Node) startRPCServer() error {

return nil
}

type HealthStatus struct {
IsHealthy bool
HasLeader bool
IsLeader bool
LastContact time.Duration
Error error
}

// GetHealthStatus returns the health status of the Raft node.
func (n *Node) GetHealthStatus() HealthStatus {
// Handle uninitialized raft node
if n == nil || n.raft == nil {
return HealthStatus{
IsHealthy: false,
Error: errors.New("raft node not initialized"),
}
}

raftState := n.raft.State()
stats := n.raft.Stats()
nodeID := string(n.config.LocalID)

// Determine leadership status
_, leaderID := n.raft.LeaderWithID()
isLeader := raftState == raft.Leader
hasLeader := leaderID != ""

// Update metrics for leadership status
metrics.RaftLeaderStatus.WithLabelValues(nodeID).Set(boolToFloat(isLeader))

// Parse last contact with leader
lastContact, lastContactErr := parseLastContact(stats["last_contact"])
communicatesWithLeader := isLeader || (hasLeader && lastContactErr == nil && lastContact <= LeaderElectionTimeout)

// Determine health status
isHealthy := communicatesWithLeader
metrics.RaftHealthStatus.WithLabelValues(nodeID).Set(boolToFloat(isHealthy))

// Update latency metric if last contact is valid
if lastContactErr == nil && lastContact >= 0 {
metrics.RaftLastContactLatency.WithLabelValues(nodeID).Set(float64(lastContact.Milliseconds()))
}

return HealthStatus{
IsHealthy: isHealthy,
HasLeader: hasLeader,
IsLeader: isLeader,
LastContact: lastContact,
Error: lastContactErr,
}
}

// Helper function to parse last contact time safely.
func parseLastContact(value string) (time.Duration, error) {
switch value {
case "", "never":
return -1, errors.New("no contact with leader")
case "0":
return 0, nil
default:
duration, err := time.ParseDuration(value)
if err != nil {
return 0, fmt.Errorf("invalid last_contact format: %w", err)
}
return duration, nil
}
}

// Convert bool to float for metric values.
func boolToFloat(val bool) float64 {
if val {
return 1
}
return 0
}
Loading
Loading