From a216162a595a974b68571c045ea2a106047fcb06 Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Sun, 16 Feb 2025 23:41:10 +0100 Subject: [PATCH] test(raft): Add comprehensive LeaveCluster test suite Implement thorough test cases for LeaveCluster method covering: - Single node cluster - Follower leaving multi-node cluster - Leader leaving multi-node cluster - Handling nil node scenarios - Verifying cluster state after node departure Enhance test coverage for Raft cluster management and node removal logic. --- raft/raft.go | 82 +++++++----------- raft/raft_test.go | 206 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+), 50 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index f34be506..709dfd76 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -459,6 +459,19 @@ func (n *Node) waitForLeader() (raft.ServerID, error) { // AddPeer adds a new peer to the Raft cluster. func (n *Node) AddPeer(ctx context.Context, peerID, peerAddr, grpcAddr string) error { + if err := validatePeerPayload(PeerPayload{ + ID: peerID, + Address: peerAddr, + GRPCAddress: grpcAddr, + }); err != nil { + return fmt.Errorf("invalid peer parameters: %w", err) + } + + // Check if node is initialized + if n == nil || n.raft == nil { + return errors.New("raft node not initialized") + } + if n.raft.State() != raft.Leader { // Get the leader client client, err := n.getLeaderClient() @@ -549,17 +562,22 @@ func (n *Node) AddPeerInternal(ctx context.Context, peerID, peerAddress, grpcAdd // RemovePeer removes a peer from the Raft cluster. func (n *Node) RemovePeer(ctx context.Context, peerID string) error { - _, leaderID := n.raft.LeaderWithID() - if leaderID == "" { - return errors.New("no leader available") + if peerID == "" { + return errors.New("peer ID cannot be empty") + } + + if n == nil || n.raft == nil { + return errors.New("raft node not initialized") } if n.raft.State() != raft.Leader { + // Get the leader client with retry logic client, err := n.getLeaderClient() if err != nil { return fmt.Errorf("failed to get leader client: %w", err) } + // Forward the request to the leader resp, err := client.RemovePeer(ctx, &pb.RemovePeerRequest{ PeerId: peerID, }) @@ -583,52 +601,10 @@ func (n *Node) RemovePeerInternal(ctx context.Context, peerID string) error { return errors.New("only the leader can remove peers") } - // get peer address and grpc address from FSM - n.Fsm.mu.RLock() - peer, exists := n.Fsm.raftPeers[peerID] - n.Fsm.mu.RUnlock() - - if !exists { - return fmt.Errorf("peer %s not found in FSM", peerID) - } - - // send remove peer command to peers - // need to remove peer from FSM before removing from Raft to handle the case that leader want to remove itself - cmd := Command{ - Type: CommandRemovePeer, - Payload: PeerPayload{ - ID: peerID, - }, - } - - data, err := json.Marshal(cmd) - if err != nil { - return fmt.Errorf("failed to marshal peer command: %w", err) - } - - if err := n.Apply(ctx, data, ApplyTimeout); err != nil { - return fmt.Errorf("failed to apply peer command: %w", err) - } - future := n.raft.RemoveServer(raft.ServerID(peerID), 0, 0) if err := future.Error(); err != nil { - // need to add again peer to FSM by applying add peer command - cmd := Command{ - Type: CommandAddPeer, - Payload: PeerPayload{ - ID: peerID, - Address: peer.Address, - GRPCAddress: peer.GRPCAddress, - }, - } - - data, err := json.Marshal(cmd) - if err != nil { - return fmt.Errorf("failed to marshal peer command: %w", err) - } - - if err := n.Apply(ctx, data, ApplyTimeout); err != nil { - return fmt.Errorf("failed to apply peer command: %w", err) + if errors.Is(err, raft.ErrNotLeader) { + return n.RemovePeer(ctx, peerID) } return fmt.Errorf("failed to remove peer: %w", err) } @@ -669,6 +645,11 @@ func (n *Node) LeaveCluster(ctx context.Context) error { } } + if err := n.Shutdown(); err != nil { + n.Logger.Error().Err(err).Msg("failed to shutdown raft node") + return fmt.Errorf("failed to leave cluster: %w", err) + } + metrics.RaftPeerRemovals.Inc() n.Logger.Info().Msg("successfully left raft cluster") return nil @@ -744,13 +725,14 @@ func (n *Node) Shutdown() error { n.peerSyncCancel() } - if n.rpcServer != nil { - n.rpcServer.GracefulStop() - } if n.rpcClient != nil { n.rpcClient.close() } + if n.rpcServer != nil { + n.rpcServer.GracefulStop() + } + if err := n.raft.Shutdown().Error(); err != nil && !errors.Is(err, raft.ErrRaftShutdown) { return fmt.Errorf("failed to shutdown raft node: %w", err) } diff --git a/raft/raft_test.go b/raft/raft_test.go index dd363ff4..3cb55c22 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1055,3 +1055,209 @@ func TestFSMPeerOperations(t *testing.T) { "Leader address should be consistent across all nodes") } } + +func TestLeaveCluster(t *testing.T) { + logger := setupTestLogger() + tempDir := t.TempDir() + + // Test cases for different cluster scenarios + tests := []struct { + name string + setupNodes func() []*Node + testNode int // index of node that will leave + wantErr bool // whether we expect an error + }{ + { + name: "single node cluster", + setupNodes: func() []*Node { + node, err := NewRaftNode(logger, config.Raft{ + NodeID: "singleNode", + Address: "127.0.0.1:0", + GRPCAddress: "127.0.0.1:0", + IsBootstrap: true, + Directory: tempDir, + }) + require.NoError(t, err) + return []*Node{node} + }, + testNode: 0, + wantErr: false, + }, + { + name: "follower leaves multi-node cluster", + setupNodes: func() []*Node { + // Create a 3-node cluster + nodes := make([]*Node, 0, 3) + + // Leader node + node1, err := NewRaftNode(logger, config.Raft{ + NodeID: "multiNode1", + Address: "127.0.0.1:7011", + GRPCAddress: "127.0.0.1:7021", + IsBootstrap: true, + Directory: tempDir, + Peers: []config.RaftPeer{ + {ID: "multiNode2", Address: "127.0.0.1:7012", GRPCAddress: "127.0.0.1:7022"}, + {ID: "multiNode3", Address: "127.0.0.1:7013", GRPCAddress: "127.0.0.1:7023"}, + }, + }) + require.NoError(t, err) + nodes = append(nodes, node1) + + // Follower nodes + node2, err := NewRaftNode(logger, config.Raft{ + NodeID: "multiNode2", + Address: "127.0.0.1:7012", + GRPCAddress: "127.0.0.1:7022", + IsBootstrap: false, + Directory: tempDir, + Peers: []config.RaftPeer{ + {ID: "multiNode1", Address: "127.0.0.1:7011", GRPCAddress: "127.0.0.1:7021"}, + {ID: "multiNode3", Address: "127.0.0.1:7013", GRPCAddress: "127.0.0.1:7023"}, + }, + }) + require.NoError(t, err) + nodes = append(nodes, node2) + + node3, err := NewRaftNode(logger, config.Raft{ + NodeID: "multiNode3", + Address: "127.0.0.1:7013", + GRPCAddress: "127.0.0.1:7023", + IsBootstrap: false, + Directory: tempDir, + Peers: []config.RaftPeer{ + {ID: "multiNode1", Address: "127.0.0.1:7011", GRPCAddress: "127.0.0.1:7021"}, + {ID: "multiNode2", Address: "127.0.0.1:7012", GRPCAddress: "127.0.0.1:7022"}, + }, + }) + require.NoError(t, err) + nodes = append(nodes, node3) + + // Wait for cluster to stabilize + time.Sleep(3 * time.Second) + return nodes + }, + testNode: 2, // Test with the last follower + wantErr: false, + }, + { + name: "leader leaves multi-node cluster", + setupNodes: func() []*Node { + // Similar setup to previous test case + nodes := make([]*Node, 0, 3) + + node1, err := NewRaftNode(logger, config.Raft{ + NodeID: "leaderNode1", + Address: "127.0.0.1:7021", + GRPCAddress: "127.0.0.1:7031", + IsBootstrap: true, + Directory: tempDir, + Peers: []config.RaftPeer{ + {ID: "leaderNode2", Address: "127.0.0.1:7022", GRPCAddress: "127.0.0.1:7032"}, + {ID: "leaderNode3", Address: "127.0.0.1:7023", GRPCAddress: "127.0.0.1:7033"}, + }, + }) + require.NoError(t, err) + nodes = append(nodes, node1) + + node2, err := NewRaftNode(logger, config.Raft{ + NodeID: "leaderNode2", + Address: "127.0.0.1:7022", + GRPCAddress: "127.0.0.1:7032", + IsBootstrap: false, + Directory: tempDir, + Peers: []config.RaftPeer{ + {ID: "leaderNode1", Address: "127.0.0.1:7021", GRPCAddress: "127.0.0.1:7031"}, + {ID: "leaderNode3", Address: "127.0.0.1:7023", GRPCAddress: "127.0.0.1:7033"}, + }, + }) + require.NoError(t, err) + nodes = append(nodes, node2) + + node3, err := NewRaftNode(logger, config.Raft{ + NodeID: "leaderNode3", + Address: "127.0.0.1:7023", + GRPCAddress: "127.0.0.1:7033", + IsBootstrap: false, + Directory: tempDir, + Peers: []config.RaftPeer{ + {ID: "leaderNode1", Address: "127.0.0.1:7021", GRPCAddress: "127.0.0.1:7031"}, + {ID: "leaderNode2", Address: "127.0.0.1:7022", GRPCAddress: "127.0.0.1:7032"}, + }, + }) + require.NoError(t, err) + nodes = append(nodes, node3) + + // Wait for cluster to stabilize + time.Sleep(3 * time.Second) + return nodes + }, + testNode: 0, // Test with the leader + wantErr: false, + }, + { + name: "nil node", + setupNodes: func() []*Node { + return []*Node{nil} + }, + testNode: 0, + wantErr: true, + }, + } + + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + nodes := testCase.setupNodes() + defer func() { + for _, node := range nodes { + if node != nil { + _ = node.Shutdown() + } + } + }() + + // For multi-node clusters, verify the initial state + if len(nodes) > 1 { + require.Eventually(t, func() bool { + leaderCount := 0 + followerCount := 0 + for _, node := range nodes { + if node != nil { + state, _ := node.GetState() + if state == raft.Leader { + leaderCount++ + } else if state == raft.Follower { + followerCount++ + } + } + } + return leaderCount == 1 && followerCount == len(nodes)-1 + }, 10*time.Second, 100*time.Millisecond, "Failed to establish initial cluster state") + } + + // Execute the leave operation + err := nodes[testCase.testNode].LeaveCluster(context.Background()) + + // Verify the result + if testCase.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + + // For multi-node clusters, verify the remaining cluster state + if len(nodes) > 1 { + // Verify the node is no longer in the cluster configuration + for i, node := range nodes { + if i != testCase.testNode && node != nil { + config := node.GetPeers() + for _, server := range config { + assert.NotEqual(t, nodes[testCase.testNode].config.LocalID, server.ID, + "Departed node should not be in cluster configuration") + } + } + } + } + } + }) + } +}