Skip to content

Commit

Permalink
Add graceful shutdown to grpc server and http server
Browse files Browse the repository at this point in the history
  • Loading branch information
zeina1i committed Mar 14, 2024
1 parent 4d744d8 commit c88552d
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 20 deletions.
8 changes: 8 additions & 0 deletions api/api_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package api

import "context"

type IAPIServer interface {
Start()
Shutdown(ctx context.Context)
}
42 changes: 40 additions & 2 deletions api/grpc_server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"net"

v1 "github.com/gatewayd-io/gatewayd/api/v1"
Expand All @@ -9,8 +10,34 @@ import (
"google.golang.org/grpc/reflection"
)

// StartGRPCAPI starts the gRPC API.
func StartGRPCAPI(api *API, healthchecker *HealthChecker) {
type GRPCServer struct {
api *API
grpcServer *grpc.Server
listener net.Listener
}

// NewGRPCServer creates a new gRPC server.
func NewGRPCServer(api *API, healthchecker *HealthChecker) *GRPCServer {
grpcServer, listener := createGRPCAPI(api, healthchecker)
return &GRPCServer{
api: api,
grpcServer: grpcServer,
listener: listener,
}
}

// Start starts the gRPC server.
func (s *GRPCServer) Start() {
s.start(s.api, s.grpcServer, s.listener)
}

// Shutdown shuts down the gRPC server.
func (s *GRPCServer) Shutdown(_ context.Context) {
s.shutdown(s.grpcServer)
}

// createGRPCAPI creates a new gRPC API server and listener.
func createGRPCAPI(api *API, healthchecker *HealthChecker) (*grpc.Server, net.Listener) {
listener, err := net.Listen(api.Options.GRPCNetwork, api.Options.GRPCAddress)
if err != nil {
api.Options.Logger.Err(err).Msg("failed to start gRPC API")
Expand All @@ -20,7 +47,18 @@ func StartGRPCAPI(api *API, healthchecker *HealthChecker) {
reflection.Register(grpcServer)
v1.RegisterGatewayDAdminAPIServiceServer(grpcServer, api)
grpc_health_v1.RegisterHealthServer(grpcServer, healthchecker)

return grpcServer, listener
}

// start starts the gRPC API.
func (s *GRPCServer) start(api *API, grpcServer *grpc.Server, listener net.Listener) {
if err := grpcServer.Serve(listener); err != nil {
api.Options.Logger.Err(err).Msg("failed to start gRPC API")
}
}

// shutdown shuts down the gRPC API.
func (s *GRPCServer) shutdown(grpcServer *grpc.Server) {
grpcServer.GracefulStop()
}
57 changes: 53 additions & 4 deletions api/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,53 @@ package api
import (
"context"
"encoding/json"
"errors"
"io/fs"
"net/http"
"time"

v1 "github.com/gatewayd-io/gatewayd/api/v1"
"github.com/gatewayd-io/gatewayd/config"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const headerReadTimeout = 10 * time.Second

type Healthz struct {
Status string `json:"status"`
}

// StartHTTPAPI starts the HTTP API.
func StartHTTPAPI(options *Options) {
type HTTPServer struct {
httpServer *http.Server
options *Options
logger zerolog.Logger
}

// NewHTTPServer creates a new HTTP server.
func NewHTTPServer(options *Options) *HTTPServer {
httpServer := createHTTPAPI(options)
return &HTTPServer{
httpServer: httpServer,
options: options,
logger: options.Logger,
}
}

// Start starts the HTTP server.
func (s *HTTPServer) Start() {
s.start(s.options, s.httpServer)
}

// Shutdown shuts down the HTTP server.
func (s *HTTPServer) Shutdown(ctx context.Context) {
s.shutdown(ctx, s.httpServer, s.logger)
}

// CreateHTTPAPI creates a new HTTP API.
func createHTTPAPI(options *Options) *http.Server {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -73,13 +104,31 @@ func StartHTTPAPI(options *Options) {
fsys, err := fs.Sub(swaggerUI, "v1/swagger-ui")
if err != nil {
options.Logger.Err(err).Msg("failed to serve swagger-ui")
return
return nil
}
mux.Handle("/swagger-ui/", http.StripPrefix("/swagger-ui/", http.FileServer(http.FS(fsys))))
}

server := &http.Server{
Addr: options.HTTPAddress,
Handler: mux,
ReadHeaderTimeout: headerReadTimeout,
}

return server
}

// start starts the HTTP API.
func (s *HTTPServer) start(options *Options, server *http.Server) {
// Start HTTP server (and proxy calls to gRPC server endpoint)
if err := http.ListenAndServe(options.HTTPAddress, mux); err != nil { //nolint:gosec
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
options.Logger.Err(err).Msg("failed to start HTTP API")
}
}

// shutdown shuts down the HTTP API.
func (s *HTTPServer) shutdown(ctx context.Context, server *http.Server, logger zerolog.Logger) {
if err := server.Shutdown(ctx); err != nil {
logger.Err(err).Msg("failed to shutdown HTTP API")
}
}
54 changes: 40 additions & 14 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/getsentry/sentry-go"
"io"
"log"
"net/http"
Expand All @@ -31,7 +32,6 @@ import (
"github.com/gatewayd-io/gatewayd/pool"
"github.com/gatewayd-io/gatewayd/tracing"
usage "github.com/gatewayd-io/gatewayd/usagereport/v1"
"github.com/getsentry/sentry-go"
"github.com/go-co-op/gocron"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -93,6 +93,8 @@ func StopGracefully(
logger zerolog.Logger,
servers map[string]*network.Server,
stopChan chan struct{},
httpServer *api.HTTPServer,
grpcServer *api.GRPCServer,
) {
_, span := otel.Tracer(config.TracerName).Start(runCtx, "Shutdown server")
signal := "unknown"
Expand Down Expand Up @@ -155,6 +157,18 @@ func StopGracefully(
}
span.End()

if httpServer != nil {
httpServer.Shutdown(runCtx)
logger.Info().Msg("Stopped HTTP Server")
span.AddEvent("Stopped HTTP Server")
}

if grpcServer != nil {
grpcServer.Shutdown(runCtx)
logger.Info().Msg("Stopped gRPC Server")
span.AddEvent("Stopped gRPC Server")
}

// Close the stop channel to notify the other goroutines to stop.
stopChan <- struct{}{}
close(stopChan)
Expand Down Expand Up @@ -568,6 +582,10 @@ var runCmd = &cobra.Command{
logger.Error().Msg("Failed to get loggers from config")
}

// Declare httpServer and grpcServer here as it is used in the StopGracefully function ahead of their definition.
var httpServer *api.HTTPServer
var grpcServer *api.GRPCServer

_, span = otel.Tracer(config.TracerName).Start(runCtx, "Create pools and clients")
// Create and initialize pools of connections.
for name, cfg := range conf.Global.Pools {
Expand Down Expand Up @@ -729,6 +747,8 @@ var runCmd = &cobra.Command{
logger,
servers,
stopChan,
httpServer,
grpcServer,
)
}
}
Expand Down Expand Up @@ -876,25 +896,27 @@ var runCmd = &cobra.Command{
Servers: servers,
}

go api.StartGRPCAPI(
&api.API{
Options: &apiOptions,
Config: conf,
PluginRegistry: pluginRegistry,
Pools: pools,
Proxies: proxies,
Servers: servers,
},
&api.HealthChecker{Servers: servers})
apiObj := &api.API{
Options: &apiOptions,
Config: conf,
PluginRegistry: pluginRegistry,
Pools: pools,
Proxies: proxies,
Servers: servers,
}
grpcServer = api.NewGRPCServer(apiObj, &api.HealthChecker{Servers: servers})
go grpcServer.Start()
logger.Info().Str("address", apiOptions.HTTPAddress).Msg("Started the HTTP API")

go api.StartHTTPAPI(&apiOptions)
httpServer = api.NewHTTPServer(&apiOptions)
go httpServer.Start()

logger.Info().Fields(
map[string]interface{}{
"network": apiOptions.GRPCNetwork,
"address": apiOptions.GRPCAddress,
},
).Msg("Started the gRPC API")
).Msg("Started the gRPC Server")
}

// Report usage statistics.
Expand Down Expand Up @@ -960,6 +982,8 @@ var runCmd = &cobra.Command{
metricsMerger *metrics.Merger,
metricsServer *http.Server,
stopChan chan struct{},
httpServer *api.HTTPServer,
grpcServer *api.GRPCServer,
) {
for sig := range signalsCh {
for _, s := range signals {
Expand All @@ -973,12 +997,14 @@ var runCmd = &cobra.Command{
logger,
servers,
stopChan,
httpServer,
grpcServer,
)
os.Exit(0)
}
}
}
}(pluginRegistry, logger, servers, metricsMerger, metricsServer, stopChan)
}(pluginRegistry, logger, servers, metricsMerger, metricsServer, stopChan, httpServer, grpcServer)

_, span = otel.Tracer(config.TracerName).Start(runCtx, "Start servers")
// Start the server.
Expand Down
8 changes: 8 additions & 0 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func Test_runCmd(t *testing.T) {
loggers[config.Default],
servers,
stopChan,
nil,
nil,
)

waitGroup.Done()
Expand Down Expand Up @@ -118,6 +120,8 @@ func Test_runCmdWithTLS(t *testing.T) {
loggers[config.Default],
servers,
stopChan,
nil,
nil,
)

waitGroup.Done()
Expand Down Expand Up @@ -175,6 +179,8 @@ func Test_runCmdWithMultiTenancy(t *testing.T) {
loggers[config.Default],
servers,
stopChan,
nil,
nil,
)

waitGroup.Done()
Expand Down Expand Up @@ -252,6 +258,8 @@ func Test_runCmdWithCachePlugin(t *testing.T) {
loggers[config.Default],
servers,
stopChan,
nil,
nil,
)

waitGroup.Done()
Expand Down

0 comments on commit c88552d

Please sign in to comment.