Skip to content

Commit

Permalink
test(raft): Add comprehensive LeaveCluster test suite
Browse files Browse the repository at this point in the history
Implement thorough test cases for LeaveCluster method covering:
- Single node cluster
- Follower leaving multi-node cluster
- Leader leaving multi-node cluster
- Handling nil node scenarios
- Verifying cluster state after node departure

Enhance test coverage for Raft cluster management and node removal logic.
  • Loading branch information
sinadarbouy committed Feb 16, 2025
1 parent 0479ed8 commit a216162
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 50 deletions.
82 changes: 32 additions & 50 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,19 @@ func (n *Node) waitForLeader() (raft.ServerID, error) {

// AddPeer adds a new peer to the Raft cluster.
func (n *Node) AddPeer(ctx context.Context, peerID, peerAddr, grpcAddr string) error {
if err := validatePeerPayload(PeerPayload{
ID: peerID,
Address: peerAddr,
GRPCAddress: grpcAddr,
}); err != nil {
return fmt.Errorf("invalid peer parameters: %w", err)
}

// Check if node is initialized
if n == nil || n.raft == nil {
return errors.New("raft node not initialized")
}

if n.raft.State() != raft.Leader {
// Get the leader client
client, err := n.getLeaderClient()
Expand Down Expand Up @@ -549,17 +562,22 @@ func (n *Node) AddPeerInternal(ctx context.Context, peerID, peerAddress, grpcAdd

// RemovePeer removes a peer from the Raft cluster.
func (n *Node) RemovePeer(ctx context.Context, peerID string) error {
_, leaderID := n.raft.LeaderWithID()
if leaderID == "" {
return errors.New("no leader available")
if peerID == "" {
return errors.New("peer ID cannot be empty")
}

if n == nil || n.raft == nil {
return errors.New("raft node not initialized")
}

if n.raft.State() != raft.Leader {
// Get the leader client with retry logic
client, err := n.getLeaderClient()
if err != nil {
return fmt.Errorf("failed to get leader client: %w", err)
}

// Forward the request to the leader
resp, err := client.RemovePeer(ctx, &pb.RemovePeerRequest{
PeerId: peerID,
})
Expand All @@ -583,52 +601,10 @@ func (n *Node) RemovePeerInternal(ctx context.Context, peerID string) error {
return errors.New("only the leader can remove peers")
}

// get peer address and grpc address from FSM
n.Fsm.mu.RLock()
peer, exists := n.Fsm.raftPeers[peerID]
n.Fsm.mu.RUnlock()

if !exists {
return fmt.Errorf("peer %s not found in FSM", peerID)
}

// send remove peer command to peers
// need to remove peer from FSM before removing from Raft to handle the case that leader want to remove itself
cmd := Command{
Type: CommandRemovePeer,
Payload: PeerPayload{
ID: peerID,
},
}

data, err := json.Marshal(cmd)
if err != nil {
return fmt.Errorf("failed to marshal peer command: %w", err)
}

if err := n.Apply(ctx, data, ApplyTimeout); err != nil {
return fmt.Errorf("failed to apply peer command: %w", err)
}

future := n.raft.RemoveServer(raft.ServerID(peerID), 0, 0)
if err := future.Error(); err != nil {
// need to add again peer to FSM by applying add peer command
cmd := Command{
Type: CommandAddPeer,
Payload: PeerPayload{
ID: peerID,
Address: peer.Address,
GRPCAddress: peer.GRPCAddress,
},
}

data, err := json.Marshal(cmd)
if err != nil {
return fmt.Errorf("failed to marshal peer command: %w", err)
}

if err := n.Apply(ctx, data, ApplyTimeout); err != nil {
return fmt.Errorf("failed to apply peer command: %w", err)
if errors.Is(err, raft.ErrNotLeader) {
return n.RemovePeer(ctx, peerID)
}
return fmt.Errorf("failed to remove peer: %w", err)
}
Expand Down Expand Up @@ -669,6 +645,11 @@ func (n *Node) LeaveCluster(ctx context.Context) error {
}
}

if err := n.Shutdown(); err != nil {
n.Logger.Error().Err(err).Msg("failed to shutdown raft node")
return fmt.Errorf("failed to leave cluster: %w", err)
}

metrics.RaftPeerRemovals.Inc()
n.Logger.Info().Msg("successfully left raft cluster")
return nil
Expand Down Expand Up @@ -744,13 +725,14 @@ func (n *Node) Shutdown() error {
n.peerSyncCancel()
}

if n.rpcServer != nil {
n.rpcServer.GracefulStop()
}
if n.rpcClient != nil {
n.rpcClient.close()
}

if n.rpcServer != nil {
n.rpcServer.GracefulStop()
}

if err := n.raft.Shutdown().Error(); err != nil && !errors.Is(err, raft.ErrRaftShutdown) {
return fmt.Errorf("failed to shutdown raft node: %w", err)
}
Expand Down
206 changes: 206 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,3 +1055,209 @@ func TestFSMPeerOperations(t *testing.T) {
"Leader address should be consistent across all nodes")
}
}

func TestLeaveCluster(t *testing.T) {
logger := setupTestLogger()
tempDir := t.TempDir()

// Test cases for different cluster scenarios
tests := []struct {
name string
setupNodes func() []*Node
testNode int // index of node that will leave
wantErr bool // whether we expect an error
}{
{
name: "single node cluster",
setupNodes: func() []*Node {
node, err := NewRaftNode(logger, config.Raft{
NodeID: "singleNode",
Address: "127.0.0.1:0",
GRPCAddress: "127.0.0.1:0",
IsBootstrap: true,
Directory: tempDir,
})
require.NoError(t, err)
return []*Node{node}
},
testNode: 0,
wantErr: false,
},
{
name: "follower leaves multi-node cluster",
setupNodes: func() []*Node {
// Create a 3-node cluster
nodes := make([]*Node, 0, 3)

// Leader node
node1, err := NewRaftNode(logger, config.Raft{
NodeID: "multiNode1",
Address: "127.0.0.1:7011",
GRPCAddress: "127.0.0.1:7021",
IsBootstrap: true,
Directory: tempDir,
Peers: []config.RaftPeer{
{ID: "multiNode2", Address: "127.0.0.1:7012", GRPCAddress: "127.0.0.1:7022"},
{ID: "multiNode3", Address: "127.0.0.1:7013", GRPCAddress: "127.0.0.1:7023"},
},
})
require.NoError(t, err)
nodes = append(nodes, node1)

// Follower nodes
node2, err := NewRaftNode(logger, config.Raft{
NodeID: "multiNode2",
Address: "127.0.0.1:7012",
GRPCAddress: "127.0.0.1:7022",
IsBootstrap: false,
Directory: tempDir,
Peers: []config.RaftPeer{
{ID: "multiNode1", Address: "127.0.0.1:7011", GRPCAddress: "127.0.0.1:7021"},
{ID: "multiNode3", Address: "127.0.0.1:7013", GRPCAddress: "127.0.0.1:7023"},
},
})
require.NoError(t, err)
nodes = append(nodes, node2)

node3, err := NewRaftNode(logger, config.Raft{
NodeID: "multiNode3",
Address: "127.0.0.1:7013",
GRPCAddress: "127.0.0.1:7023",
IsBootstrap: false,
Directory: tempDir,
Peers: []config.RaftPeer{
{ID: "multiNode1", Address: "127.0.0.1:7011", GRPCAddress: "127.0.0.1:7021"},
{ID: "multiNode2", Address: "127.0.0.1:7012", GRPCAddress: "127.0.0.1:7022"},
},
})
require.NoError(t, err)
nodes = append(nodes, node3)

// Wait for cluster to stabilize
time.Sleep(3 * time.Second)
return nodes
},
testNode: 2, // Test with the last follower
wantErr: false,
},
{
name: "leader leaves multi-node cluster",
setupNodes: func() []*Node {
// Similar setup to previous test case
nodes := make([]*Node, 0, 3)

node1, err := NewRaftNode(logger, config.Raft{
NodeID: "leaderNode1",
Address: "127.0.0.1:7021",
GRPCAddress: "127.0.0.1:7031",
IsBootstrap: true,
Directory: tempDir,
Peers: []config.RaftPeer{
{ID: "leaderNode2", Address: "127.0.0.1:7022", GRPCAddress: "127.0.0.1:7032"},
{ID: "leaderNode3", Address: "127.0.0.1:7023", GRPCAddress: "127.0.0.1:7033"},
},
})
require.NoError(t, err)
nodes = append(nodes, node1)

node2, err := NewRaftNode(logger, config.Raft{
NodeID: "leaderNode2",
Address: "127.0.0.1:7022",
GRPCAddress: "127.0.0.1:7032",
IsBootstrap: false,
Directory: tempDir,
Peers: []config.RaftPeer{
{ID: "leaderNode1", Address: "127.0.0.1:7021", GRPCAddress: "127.0.0.1:7031"},
{ID: "leaderNode3", Address: "127.0.0.1:7023", GRPCAddress: "127.0.0.1:7033"},
},
})
require.NoError(t, err)
nodes = append(nodes, node2)

node3, err := NewRaftNode(logger, config.Raft{
NodeID: "leaderNode3",
Address: "127.0.0.1:7023",
GRPCAddress: "127.0.0.1:7033",
IsBootstrap: false,
Directory: tempDir,
Peers: []config.RaftPeer{
{ID: "leaderNode1", Address: "127.0.0.1:7021", GRPCAddress: "127.0.0.1:7031"},
{ID: "leaderNode2", Address: "127.0.0.1:7022", GRPCAddress: "127.0.0.1:7032"},
},
})
require.NoError(t, err)
nodes = append(nodes, node3)

// Wait for cluster to stabilize
time.Sleep(3 * time.Second)
return nodes
},
testNode: 0, // Test with the leader
wantErr: false,
},
{
name: "nil node",
setupNodes: func() []*Node {
return []*Node{nil}
},
testNode: 0,
wantErr: true,
},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
nodes := testCase.setupNodes()
defer func() {
for _, node := range nodes {
if node != nil {
_ = node.Shutdown()
}
}
}()

// For multi-node clusters, verify the initial state
if len(nodes) > 1 {
require.Eventually(t, func() bool {
leaderCount := 0
followerCount := 0
for _, node := range nodes {
if node != nil {
state, _ := node.GetState()
if state == raft.Leader {
leaderCount++
} else if state == raft.Follower {
followerCount++
}
}
}
return leaderCount == 1 && followerCount == len(nodes)-1
}, 10*time.Second, 100*time.Millisecond, "Failed to establish initial cluster state")
}

// Execute the leave operation
err := nodes[testCase.testNode].LeaveCluster(context.Background())

// Verify the result
if testCase.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)

// For multi-node clusters, verify the remaining cluster state
if len(nodes) > 1 {
// Verify the node is no longer in the cluster configuration
for i, node := range nodes {
if i != testCase.testNode && node != nil {
config := node.GetPeers()
for _, server := range config {
assert.NotEqual(t, nodes[testCase.testNode].config.LocalID, server.ID,
"Departed node should not be in cluster configuration")
}
}
}
}
}
})
}
}

0 comments on commit a216162

Please sign in to comment.