From c0a5c3a74c8fd25f20aa6f9d563109af7454f02f Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 22 Dec 2024 01:24:18 +0100 Subject: [PATCH 1/7] Add Raft health monitoring and metrics Add functionality to monitor Raft node health status and expose key metrics: - Add new Prometheus metrics for Raft health status, leader status, and last contact latency - Implement GetHealthStatus() method to track node health state - Monitor leadership status and communication with leader - Track last contact time with leader - Add helper functions for metric value conversion and time parsing The metrics will help monitor cluster health and leadership changes in production. --- metrics/builtins.go | 24 ++++++++++++++ raft/raft.go | 77 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/metrics/builtins.go b/metrics/builtins.go index 0e48bdae..8a4a2d39 100644 --- a/metrics/builtins.go +++ b/metrics/builtins.go @@ -105,4 +105,28 @@ var ( Name: "api_requests_errors_total", Help: "Number of API request errors", }, []string{"method", "endpoint", "error"}) + RaftHealthStatus = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gatewayd", + Name: "raft_health_status", + Help: "Current health status of the Raft node (1 if healthy, 0 if unhealthy)", + }, + []string{"node_id"}, + ) + RaftLeaderStatus = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gatewayd", + Name: "raft_leader_status", + Help: "Indicates if this node is the current Raft leader (1) or follower (0)", + }, + []string{"node_id"}, + ) + RaftLastContactLatency = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gatewayd", + Name: "raft_last_contact_seconds", + Help: "Time since last contact with the Raft leader in seconds", + }, + []string{"node_id"}, + ) ) diff --git a/raft/raft.go b/raft/raft.go index 8554e078..c8f3670e 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -14,6 +14,7 @@ import ( "time" "github.com/gatewayd-io/gatewayd/config" + "github.com/gatewayd-io/gatewayd/metrics" pb "github.com/gatewayd-io/gatewayd/raft/proto" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" @@ -565,3 +566,79 @@ func (n *Node) startRPCServer() error { return nil } + +type HealthStatus struct { + IsHealthy bool + HasLeader bool + IsLeader bool + LastContact time.Duration + Error error +} + +func (n *Node) GetHealthStatus() HealthStatus { + // Handle uninitialized raft node + if n == nil || n.raft == nil { + return HealthStatus{ + IsHealthy: false, + Error: errors.New("raft node not initialized"), + } + } + + // Cache commonly used values + raftState := n.raft.State() + stats := n.raft.Stats() + nodeID := string(n.config.LocalID) + + // Determine leadership status + _, leaderId := n.raft.LeaderWithID() + isLeader := raftState == raft.Leader + hasLeader := leaderId != "" + + // Update metrics for leadership status + metrics.RaftLeaderStatus.WithLabelValues(nodeID).Set(boolToFloat(isLeader)) + + // Parse last contact with leader + lastContact, lastContactErr := parseLastContact(stats["last_contact"]) + communicatesWithLeader := isLeader || (hasLeader && lastContactErr == nil && lastContact <= LeaderElectionTimeout) + + // Determine health status + isHealthy := communicatesWithLeader + metrics.RaftHealthStatus.WithLabelValues(nodeID).Set(boolToFloat(isHealthy)) + + // Update latency metric if last contact is valid + if lastContactErr == nil && lastContact >= 0 { + metrics.RaftLastContactLatency.WithLabelValues(nodeID).Set(float64(lastContact.Milliseconds())) + } + + return HealthStatus{ + IsHealthy: isHealthy, + HasLeader: hasLeader, + IsLeader: isLeader, + LastContact: lastContact, + Error: lastContactErr, + } +} + +// Helper function to parse last contact time safely +func parseLastContact(value string) (time.Duration, error) { + switch value { + case "", "never": + return -1, errors.New("no contact with leader") + case "0": + return 0, nil + default: + duration, err := time.ParseDuration(value) + if err != nil { + return 0, fmt.Errorf("invalid last_contact format: %v", err) + } + return duration, nil + } +} + +// Convert bool to float for metric values +func boolToFloat(val bool) float64 { + if val { + return 1 + } + return 0 +} From ffde96131371cccbbccbd66d6efd7ed3dea7edef Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 22 Dec 2024 15:36:24 +0100 Subject: [PATCH 2/7] feat: Integrate Raft node health checks into API - Added Raft node dependency to `Options` and `HealthChecker` structs. - Updated `liveness` function to include Raft node health status. - Modified health check logic in `healthcheck.go` and `http_server.go` to consider Raft node status. - Enhanced `healthcheck_test.go` to include tests for Raft node integration. - Ensured Raft node is properly initialized and cleaned up in tests. --- api/api.go | 2 ++ api/healthcheck.go | 6 ++++-- api/healthcheck_test.go | 27 +++++++++++++++++++++++++++ api/http_server.go | 2 +- api/utils.go | 8 +++++++- 5 files changed, 41 insertions(+), 4 deletions(-) diff --git a/api/api.go b/api/api.go index e7235676..8e0763b0 100644 --- a/api/api.go +++ b/api/api.go @@ -13,6 +13,7 @@ import ( "github.com/gatewayd-io/gatewayd/network" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" + "github.com/gatewayd-io/gatewayd/raft" "github.com/rs/zerolog" "go.opentelemetry.io/otel" "google.golang.org/grpc/codes" @@ -27,6 +28,7 @@ type Options struct { GRPCAddress string HTTPAddress string Servers map[string]*network.Server + RaftNode *raft.Node } type API struct { diff --git a/api/healthcheck.go b/api/healthcheck.go index 074c2e5c..62ffa8d1 100644 --- a/api/healthcheck.go +++ b/api/healthcheck.go @@ -4,6 +4,7 @@ import ( "context" "github.com/gatewayd-io/gatewayd/network" + "github.com/gatewayd-io/gatewayd/raft" "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" @@ -12,14 +13,15 @@ import ( type HealthChecker struct { grpc_health_v1.UnimplementedHealthServer - Servers map[string]*network.Server + Servers map[string]*network.Server + raftNode *raft.Node } func (h *HealthChecker) Check( context.Context, *grpc_health_v1.HealthCheckRequest, ) (*grpc_health_v1.HealthCheckResponse, error) { // Check if all servers are running - if liveness(h.Servers) { + if liveness(h.Servers, h.raftNode) { return &grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_SERVING, }, nil diff --git a/api/healthcheck_test.go b/api/healthcheck_test.go index bea86883..74b0988f 100644 --- a/api/healthcheck_test.go +++ b/api/healthcheck_test.go @@ -3,6 +3,7 @@ package api import ( "context" "testing" + "time" "github.com/gatewayd-io/gatewayd/act" "github.com/gatewayd-io/gatewayd/config" @@ -63,6 +64,16 @@ func Test_Healthchecker(t *testing.T) { }, ) + raftHelper, err := testhelpers.NewTestRaftNode(t) + if err != nil { + t.Fatalf("Failed to create test raft node: %v", err) + } + defer func() { + if err := raftHelper.Cleanup(); err != nil { + t.Errorf("Failed to cleanup raft: %v", err) + } + }() + server := network.NewServer( context.TODO(), network.Server{ @@ -77,6 +88,7 @@ func Test_Healthchecker(t *testing.T) { PluginRegistry: pluginRegistry, PluginTimeout: config.DefaultPluginTimeout, HandshakeTimeout: config.DefaultHandshakeTimeout, + RaftNode: raftHelper.Node, }, ) @@ -84,6 +96,7 @@ func Test_Healthchecker(t *testing.T) { Servers: map[string]*network.Server{ config.Default: server, }, + raftNode: raftHelper.Node, } assert.NotNil(t, healthchecker) hcr, err := healthchecker.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{}) @@ -91,6 +104,20 @@ func Test_Healthchecker(t *testing.T) { assert.NotNil(t, hcr) assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, hcr.GetStatus()) + go func(t *testing.T, server *network.Server) { + t.Helper() + if err := server.Run(); err != nil { + t.Errorf("server.Run() error = %v", err) + } + }(t, server) + + time.Sleep(1 * time.Second) + // Test for SERVING status + hcr, err = healthchecker.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{}) + assert.NoError(t, err) + assert.NotNil(t, hcr) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, hcr.GetStatus()) + err = healthchecker.Watch(&grpc_health_v1.HealthCheckRequest{}, nil) assert.Error(t, err) assert.Equal(t, "rpc error: code = Unimplemented desc = not implemented", err.Error()) diff --git a/api/http_server.go b/api/http_server.go index 6f05b911..596214b3 100644 --- a/api/http_server.go +++ b/api/http_server.go @@ -63,7 +63,7 @@ func createHTTPAPI(options *Options) *http.Server { mux := http.NewServeMux() mux.Handle("/", rmux) mux.HandleFunc("/healthz", func(writer http.ResponseWriter, _ *http.Request) { - if liveness(options.Servers) { + if liveness(options.Servers, options.RaftNode) { writer.Header().Set("Content-Type", "application/json") writer.WriteHeader(http.StatusOK) if err := json.NewEncoder(writer).Encode(Healthz{Status: "SERVING"}); err != nil { diff --git a/api/utils.go b/api/utils.go index 8a1d09c2..cd9bf0e9 100644 --- a/api/utils.go +++ b/api/utils.go @@ -2,13 +2,19 @@ package api import ( "github.com/gatewayd-io/gatewayd/network" + "github.com/gatewayd-io/gatewayd/raft" ) -func liveness(servers map[string]*network.Server) bool { +func liveness(servers map[string]*network.Server, raftNode *raft.Node) bool { for _, v := range servers { if !v.IsRunning() { return false } } + if raftNode != nil { + if !raftNode.GetHealthStatus().IsHealthy { + return false + } + } return true } From bc6092bf1445dd927bcea5c1263d5bcb5d1767bd Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 22 Dec 2024 15:53:57 +0100 Subject: [PATCH 3/7] Add TestGetHealthStatus to verify Raft node health - Introduced a new test `TestGetHealthStatus` to check the health status of Raft nodes. - The test covers three scenarios: 1. When the node is the leader, it should be healthy and recognize itself as the leader. 2. When the node is a follower, it should be healthy and recognize the leader. 3. When no leader is available, the node should not be healthy and should report an error. - Utilized temporary directories and a test logger for setup. - Ensured proper shutdown of nodes after tests to prevent resource leaks. --- raft/raft_test.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/raft/raft_test.go b/raft/raft_test.go index a2a90849..02c07b47 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -456,3 +456,88 @@ func TestNodeShutdown(t *testing.T) { err = node.Shutdown() assert.NoError(t, err) // Should not error on multiple shutdowns } + +func TestGetHealthStatus(t *testing.T) { + logger := setupTestLogger() + tempDir := t.TempDir() + + // Create a Raft node configuration for the first node + nodeConfig1 := config.Raft{ + NodeID: "testGetHealthStatusNode1", + Address: "127.0.0.1:0", + IsBootstrap: true, + Directory: tempDir, + Peers: []config.RaftPeer{ + {ID: "testGetHealthStatusNode2", Address: "127.0.0.1:5678"}, + }, + } + + // Create a Raft node configuration for the second node + nodeConfig2 := config.Raft{ + NodeID: "testGetHealthStatusNode2", + Address: "127.0.0.1:5678", + IsBootstrap: false, + Directory: tempDir, + } + + // Initialize the first Raft node + node1, err := NewRaftNode(logger, nodeConfig1) + require.NoError(t, err) + defer func() { + _ = node1.Shutdown() + }() + + // Initialize the second Raft node + node2, err := NewRaftNode(logger, nodeConfig2) + require.NoError(t, err) + defer func() { + _ = node2.Shutdown() + }() + + // Wait for leader election + time.Sleep(3 * time.Second) + + // Test 1: Check health status when node1 is the leader + if node1.GetState() == raft.Leader { + healthStatus := node1.GetHealthStatus() + assert.True(t, healthStatus.IsHealthy, "Leader node should be healthy") + assert.True(t, healthStatus.IsLeader, "Node should be the leader") + assert.True(t, healthStatus.HasLeader, "Node should have a leader (itself)") + assert.NoError(t, healthStatus.Error, "There should be no error in health status") + assert.Equal(t, healthStatus.LastContact, time.Duration(0), "Last contact should be 0") + } + + // Test 2: Check health status when node2 is a follower + if node2.GetState() == raft.Follower { + healthStatus := node2.GetHealthStatus() + assert.True(t, healthStatus.IsHealthy, "Follower node should be healthy") + assert.False(t, healthStatus.IsLeader, "Node should not be the leader") + assert.True(t, healthStatus.HasLeader, "Node should have a leader") + assert.NoError(t, healthStatus.Error, "There should be no error in health status") + assert.Greater(t, healthStatus.LastContact.Milliseconds(), int64(0), "Last contact should be greater than 0") + } + + // Test 3: Check health status when no leader is available + // Simulate no leader by not bootstrapping any node + nodeConfig3 := config.Raft{ + NodeID: "testGetHealthStatusNode3", + Address: "127.0.0.1:0", + IsBootstrap: false, + Directory: tempDir, + } + node3, err := NewRaftNode(logger, nodeConfig3) + require.NoError(t, err) + defer func() { + _ = node3.Shutdown() + }() + + // Wait for the node to realize there's no leader + time.Sleep(3 * time.Second) + + healthStatus := node3.GetHealthStatus() + assert.False(t, healthStatus.IsHealthy, "Node should not be healthy without a leader") + assert.False(t, healthStatus.IsLeader, "Node should not be the leader") + assert.False(t, healthStatus.HasLeader, "Node should not have a leader") + assert.Error(t, healthStatus.Error, "There should be an error indicating no leader") + assert.Equal(t, healthStatus.LastContact, time.Duration(-1), "Last contact should be -1") +} From 66e1a94aaf7fa8e7e31d4aa7c92ae80a3ce6eb6a Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 22 Dec 2024 15:58:26 +0100 Subject: [PATCH 4/7] fix lint issues - Corrected variable naming from `leaderId` to `leaderID` for consistency. - Added missing periods to comments for proper punctuation. - Used `%w` in `fmt.Errorf` for error wrapping. --- raft/raft.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index c8f3670e..9618b654 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -590,9 +590,9 @@ func (n *Node) GetHealthStatus() HealthStatus { nodeID := string(n.config.LocalID) // Determine leadership status - _, leaderId := n.raft.LeaderWithID() + _, leaderID := n.raft.LeaderWithID() isLeader := raftState == raft.Leader - hasLeader := leaderId != "" + hasLeader := leaderID != "" // Update metrics for leadership status metrics.RaftLeaderStatus.WithLabelValues(nodeID).Set(boolToFloat(isLeader)) @@ -619,7 +619,7 @@ func (n *Node) GetHealthStatus() HealthStatus { } } -// Helper function to parse last contact time safely +// Helper function to parse last contact time safely. func parseLastContact(value string) (time.Duration, error) { switch value { case "", "never": @@ -629,13 +629,13 @@ func parseLastContact(value string) (time.Duration, error) { default: duration, err := time.ParseDuration(value) if err != nil { - return 0, fmt.Errorf("invalid last_contact format: %v", err) + return 0, fmt.Errorf("invalid last_contact format: %w", err) } return duration, nil } } -// Convert bool to float for metric values +// Convert bool to float for metric values. func boolToFloat(val bool) float64 { if val { return 1 From 4112975816d971a0bd5f6e2b2be98a65b7213996 Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 22 Dec 2024 20:36:22 +0100 Subject: [PATCH 5/7] Add RaftNode to API configuration in run command - Updated the `runCmd` in `cmd/run.go` to include `RaftNode` in the API configuration. - This change ensures that the RaftNode is properly initialized and passed to the API object, which may be necessary for distributed consensus or state management. --- cmd/run.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/run.go b/cmd/run.go index 0dad18a8..51e61241 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -1003,6 +1003,7 @@ var runCmd = &cobra.Command{ GRPCAddress: conf.Global.API.GRPCAddress, HTTPAddress: conf.Global.API.HTTPAddress, Servers: servers, + RaftNode: raftNode, } apiObj := &api.API{ From 0ac936a003b2279fde0d8fc18c8802c61958982b Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 22 Dec 2024 20:50:28 +0100 Subject: [PATCH 6/7] feat: Update gatewayd services with additional ports and health checks - Added port mappings for 9090 to each gatewayd service. - Introduced health checks for gatewayd-2 and gatewayd-3 services to ensure they are running correctly. - Added service dependencies for write-postgres, read-postgres, redis, and install_plugins to ensure proper startup order. - Linked gatewayd-2 and gatewayd-3 services to write-postgres, read-postgres, and redis for improved connectivity. --- docker-compose-raft.yaml | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/docker-compose-raft.yaml b/docker-compose-raft.yaml index f02b958b..3fa4408f 100644 --- a/docker-compose-raft.yaml +++ b/docker-compose-raft.yaml @@ -74,6 +74,7 @@ services: - "19090:19090" - "42223:2223" - "50051:50051" + - "29090:9090" volumes: - ./gatewayd-files:/gatewayd-files:ro - ./raft-data-1:/var/lib/gatewayd/raft @@ -124,9 +125,28 @@ services: - "19091:19090" - "42224:2223" - "50052:50051" + - "29091:9090" volumes: - ./gatewayd-files:/gatewayd-files:ro - ./raft-data-2:/var/lib/gatewayd/raft + links: + - write-postgres + - read-postgres + - redis + healthcheck: + test: ["CMD", "curl", "-f", "http://gatewayd-2:18080/healthz"] + interval: 5s + timeout: 5s + retries: 5 + depends_on: + write-postgres: + condition: service_healthy + read-postgres: + condition: service_healthy + redis: + condition: service_healthy + install_plugins: + condition: service_completed_successfully gatewayd-3: image: gatewaydio/gatewayd:latest @@ -156,10 +176,28 @@ services: - "19092:19090" - "42225:2223" - "50053:50051" + - "29092:9090" volumes: - ./gatewayd-files:/gatewayd-files:ro - ./raft-data-3:/var/lib/gatewayd/raft - + links: + - write-postgres + - read-postgres + - redis + healthcheck: + test: ["CMD", "curl", "-f", "http://gatewayd-3:18080/healthz"] + interval: 5s + timeout: 5s + retries: 5 + depends_on: + write-postgres: + condition: service_healthy + read-postgres: + condition: service_healthy + redis: + condition: service_healthy + install_plugins: + condition: service_completed_successfully prometheus: image: prom/prometheus:latest volumes: From ed25ae155777b9129a6bbb7ceceb923425640741 Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 22 Dec 2024 21:15:26 +0100 Subject: [PATCH 7/7] Add GetHealthStatus method documentation and refactor - Added a comment to document the GetHealthStatus method, explaining its purpose. - Removed an unnecessary comment about caching commonly used values, as it was redundant with the code's functionality. --- raft/raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/raft.go b/raft/raft.go index 9618b654..8315bb4f 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -575,6 +575,7 @@ type HealthStatus struct { Error error } +// GetHealthStatus returns the health status of the Raft node. func (n *Node) GetHealthStatus() HealthStatus { // Handle uninitialized raft node if n == nil || n.raft == nil { @@ -584,7 +585,6 @@ func (n *Node) GetHealthStatus() HealthStatus { } } - // Cache commonly used values raftState := n.raft.State() stats := n.raft.Stats() nodeID := string(n.config.LocalID)