Skip to content

Commit

Permalink
feat(raft): Add peer removal
Browse files Browse the repository at this point in the history
Enhance Raft cluster management with new peer-related features:
- Implement RemovePeer RPC endpoint for removing peers from the cluster
- Add GetPeerInfo RPC method to retrieve peer information
- Update Raft configuration to support secure and non-secure modes
- Introduce new metrics for peer management operations
- Remove compatibility policy from plugin and global configurations
- Update protobuf and gRPC service definitions

The changes improve cluster management by providing more flexible peer
manipulation and discovery mechanisms.
  • Loading branch information
sinadarbouy committed Feb 18, 2025
2 parents 0e49f89 + c01cd47 commit 4bbd0c2
Show file tree
Hide file tree
Showing 79 changed files with 6,826 additions and 2,307 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ jobs:
test:
name: Test GatewayD
runs-on: ubuntu-latest
# Timeout after 5 minutes, to avoid hanging tests
timeout-minutes: 5
# Timeout after 6 minutes, to avoid hanging tests
timeout-minutes: 6
services:
postgres:
image: postgres
Expand Down Expand Up @@ -135,7 +135,6 @@ jobs:
cd plugin-template-go && make build && cp plugin-template-go ../ptg && cd ..
export SHA256SUM=$(sha256sum ptg | awk '{print $1}')
cat <<EOF > gatewayd_plugins.yaml
compatibilityPolicy: "strict"
metricsMergerPeriod: 1s
healthCheckPeriod: 1s
reloadOnCrash: true
Expand Down
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ gatewayd-files/
cmd/gatewayd-plugin-cache-linux-amd64-*
tempo-data

raft/node*
# Raft files
raft/node*/

# Accidental installation of plugin
plugins/gatewayd-plugin-cache
5 changes: 2 additions & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ linters:
enable-all: true
disable:
- cyclop
- dupl
- wsl
- godox
- gochecknoglobals
Expand Down Expand Up @@ -53,10 +54,8 @@ linters-settings:
- "github.com/hashicorp/go-plugin"
- "github.com/mitchellh/mapstructure"
- "github.com/google/go-cmp"
- "github.com/google/go-github/v53/github"
- "github.com/google/go-github/v68/github"
- "github.com/codingsince1985/checksum"
- "golang.org/x/exp/maps"
- "golang.org/x/exp/slices"
- "gopkg.in/yaml.v3"
- "github.com/zenizh/go-capturer"
- "gopkg.in/natefinch/lumberjack.v2"
Expand Down
9 changes: 6 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# syntax=docker/dockerfile:1

# Use the official golang image to build the binary.
FROM golang:1.23-alpine3.20 AS builder
FROM golang:1.23-alpine3.21 AS builder

ARG TARGETOS
ARG TARGETARCH
Expand All @@ -10,12 +10,15 @@ ARG TARGETPLATFORM
WORKDIR /gatewayd
COPY . /gatewayd

RUN apk --no-cache add git=2.45.2-r0 make=4.4.1-r2 openssl=3.3.2-r1 && \
RUN apk --no-cache add \
'git~=2.47' \
'make~=4.4' \
'openssl~=3.3' && \
mkdir -p dist && \
make build-platform GOOS=${TARGETOS} GOARCH=${TARGETARCH} OUTPUT_DIR=dist/${TARGETOS}-${TARGETARCH}

# Use alpine to create a minimal image to run the gatewayd binary.
FROM alpine:3.20 AS runner
FROM alpine:3.21 AS runner

ARG TARGETOS
ARG TARGETARCH
Expand Down
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ update-direct-deps:
@go list -f '{{if not (or .Main .Indirect)}}{{.Path}}{{end}}' -m all | xargs -n1 go get
@go mod tidy

install-deps:
@go install github.com/bufbuild/buf/cmd/buf@latest
@go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
@go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
@go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
@go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@latest

lint:
@buf lint

Expand Down
103 changes: 103 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gatewayd-io/gatewayd/plugin"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/gatewayd-io/gatewayd/raft"
hcraft "github.com/hashicorp/raft"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -307,3 +308,105 @@ func (a *API) GetServers(context.Context, *emptypb.Empty) (*structpb.Struct, err
metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/GetServers").Inc()
return serversConfig, nil
}

// GetPeers returns the raft peers configuration of the GatewayD.
func (a *API) GetPeers(context.Context, *emptypb.Empty) (*structpb.Struct, error) {
_, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get Peers")
defer span.End()

if a.Options.RaftNode == nil {
return nil, status.Errorf(codes.Unavailable, "raft node not initialized")
}

peers := a.Options.RaftNode.GetPeers()
peerMap := make(map[string]any)

// Get current leader ID for comparison
_, leaderID := a.Options.RaftNode.GetState()

for _, peer := range peers {
// Determine peer status
var status string
switch {
case string(peer.ID) == string(leaderID):
status = "Leader"
case peer.Suffrage == hcraft.Voter:
status = "Follower"
case peer.Suffrage == hcraft.Nonvoter:
status = "NonVoter"
default:
status = "Unknown"
}

peerMap[string(peer.ID)] = map[string]any{
"id": string(peer.ID),
"address": string(peer.Address),
"status": status,
"suffrage": peer.Suffrage.String(),
}
}

raftPeers, err := structpb.NewStruct(peerMap)
if err != nil {
metrics.APIRequestsErrors.WithLabelValues(
"GET", "/v1/raft/peers", codes.Internal.String(),
).Inc()
a.Options.Logger.Err(err).Msg("Failed to marshal peers config")
return nil, status.Errorf(codes.Internal, "failed to marshal peers config: %v", err)
}

metrics.APIRequests.WithLabelValues("GET", "/v1/raft/peers").Inc()
return raftPeers, nil
}

// AddPeer adds a new peer to the raft cluster.
func (a *API) AddPeer(ctx context.Context, req *v1.AddPeerRequest) (*v1.AddPeerResponse, error) {
_, span := otel.Tracer(config.TracerName).Start(ctx, "Add Peer")
defer span.End()

if a.Options.RaftNode == nil {
return nil, status.Errorf(codes.Unavailable, "AddPeer: raft node not initialized")
}

if req.GetPeerId() == "" || req.GetAddress() == "" || req.GetGrpcAddress() == "" {
return nil, status.Errorf(codes.InvalidArgument, "AddPeer: peer id, address, and grpc address are required")
}

err := a.Options.RaftNode.AddPeer(ctx, req.GetPeerId(), req.GetAddress(), req.GetGrpcAddress())
if err != nil {
metrics.APIRequestsErrors.WithLabelValues(
"POST", "/v1/raft/peers", codes.Internal.String(),
).Inc()
a.Options.Logger.Err(err).Msg("Failed to add peer")
return nil, status.Errorf(codes.Internal, "AddPeer: failed to add peer: %v", err)
}

metrics.APIRequests.WithLabelValues("POST", "/v1/raft/peers").Inc()
return &v1.AddPeerResponse{Success: true}, nil
}

// RemovePeer removes a peer from the raft cluster.
func (a *API) RemovePeer(ctx context.Context, req *v1.RemovePeerRequest) (*v1.RemovePeerResponse, error) {
_, span := otel.Tracer(config.TracerName).Start(ctx, "Remove Peer")
defer span.End()

if a.Options.RaftNode == nil {
return nil, status.Errorf(codes.Unavailable, "RemovePeer: raft node not initialized")
}

if req.GetPeerId() == "" {
return nil, status.Errorf(codes.InvalidArgument, "RemovePeer: peer id is required")
}

err := a.Options.RaftNode.RemovePeer(ctx, req.GetPeerId())
if err != nil {
metrics.APIRequestsErrors.WithLabelValues(
"DELETE", "/v1/raft/peers", codes.Internal.String(),
).Inc()
a.Options.Logger.Err(err).Msg("Failed to remove peer")
return nil, status.Errorf(codes.Internal, "RemovePeer: failed to remove peer: %v", err)
}

metrics.APIRequests.WithLabelValues("DELETE", "/v1/raft/peers").Inc()
return &v1.RemovePeerResponse{Success: true}, nil
}
13 changes: 6 additions & 7 deletions api/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// getAPIConfig returns a new API configuration with all the necessary components.
func getAPIConfig() *API {
func getAPIConfig(httpAddr, grpcAddr string) *API {
logger := zerolog.New(nil)
defaultPool := pool.NewPool(context.Background(), config.DefaultPoolSize)
pluginReg := plugin.NewRegistry(
Expand All @@ -27,10 +27,9 @@ func getAPIConfig() *API {
Actions: act.BuiltinActions(),
DefaultPolicyName: config.DefaultPolicy,
}),
DevMode: true,
Logger: logger,
Compatibility: config.DefaultCompatibilityPolicy,
StartTimeout: config.DefaultPluginStartTimeout,
DevMode: true,
Logger: logger,
StartTimeout: config.DefaultPluginStartTimeout,
},
)
defaultProxy := network.NewProxy(
Expand Down Expand Up @@ -60,8 +59,8 @@ func getAPIConfig() *API {
return &API{
Options: &Options{
GRPCNetwork: "tcp",
GRPCAddress: "localhost:19090",
HTTPAddress: "localhost:18080",
GRPCAddress: grpcAddr,
HTTPAddress: httpAddr,
Logger: logger,
Servers: servers,
},
Expand Down
Loading

0 comments on commit 4bbd0c2

Please sign in to comment.