Skip to content

Commit

Permalink
Refactor commands (#644)
Browse files Browse the repository at this point in the history
* Refactor global variables into a global struct
* Update tests
* Use param instead of global variable
* Update tests
* Fix formatting
* Add a hack to fix the tests
* Ignore raft files
* Fix tests
* Remove global variables
* Use a global app variable for test only and assign it only in tests
* Add stopGracefully as a method to the GatewayDInstance struct
* Move signal handler to the top to avoid half-states on exit
* Add a new function for creating an instance of GatewayDInstance using parsed flags
* Remove duplicate internal functions
* Update tests
* Use exported functions instead of internal variables
* Remove global variables
* Update tests
* Fix linter error
* Ignore dupl linter
* Fix tests with the actual log output
* Clean up after test
* Remove global variables
* Skip path slip verification
* Fix tests
* Remove backup
* Another try to fix the test
* Fix missing/unknown behavior
* Add a pull-only test
* Declare variable before assignment
* Fix plugin install behavior
* Fix plugin install with no overwrite
* Revert changes
* Rename variable
* Use filepath to join paths
* Remove unnecessary file
* Ignore plugins file if exists
* Remove duplicate code
* Reset the pull-only flag
* Check if the server is properly closed before erroring out
* Refactor run command into a separate file
* Fix bug in handling early exit
* Move left-over functions
* Add comments
* Fix linter errors
* Fix missing log message and span
* Handle errors when stopping the listener for gRPC server
* Graceful shutdown of gRPC server
* Revert changes to path
* Use local variable
* Replace all context.TODO with context.Background
* Split exit codes into a separate file
* Remove unused constant and renumber others
* Use exported function instead of internal variables
* Ignore linter errors
* Rename variable and comment to match the behavior
  • Loading branch information
mostafa authored and sinadarbouy committed Feb 18, 2025
1 parent 486ac52 commit dcfc48b
Show file tree
Hide file tree
Showing 28 changed files with 1,602 additions and 1,358 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ gatewayd-files/
cmd/gatewayd-plugin-cache-linux-amd64-*
tempo-data

raft/node*
# Raft files
raft/node*/

# Accidental installation of plugin
plugins/gatewayd-plugin-cache
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ linters:
enable-all: true
disable:
- cyclop
- dupl
- wsl
- godox
- gochecknoglobals
Expand Down
6 changes: 3 additions & 3 deletions api/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// getAPIConfig returns a new API configuration with all the necessary components.
func getAPIConfig() *API {
func getAPIConfig(httpAddr, grpcAddr string) *API {
logger := zerolog.New(nil)
defaultPool := pool.NewPool(context.Background(), config.DefaultPoolSize)
pluginReg := plugin.NewRegistry(
Expand Down Expand Up @@ -60,8 +60,8 @@ func getAPIConfig() *API {
return &API{
Options: &Options{
GRPCNetwork: "tcp",
GRPCAddress: "localhost:19090",
HTTPAddress: "localhost:18080",
GRPCAddress: grpcAddr,
HTTPAddress: httpAddr,
Logger: logger,
Servers: servers,
},
Expand Down
38 changes: 19 additions & 19 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func TestGetVersion(t *testing.T) {

func TestGetGlobalConfig(t *testing.T) {
// Load config from the default config file.
conf := config.NewConfig(context.TODO(),
conf := config.NewConfig(context.Background(),
config.Config{GlobalConfigFile: "../gatewayd.yaml", PluginConfigFile: "../gatewayd_plugins.yaml"})
gerr := conf.InitConfig(context.TODO())
gerr := conf.InitConfig(context.Background())
require.Nil(t, gerr)
assert.NotEmpty(t, conf.Global)

Expand All @@ -61,9 +61,9 @@ func TestGetGlobalConfig(t *testing.T) {

func TestGetGlobalConfigWithGroupName(t *testing.T) {
// Load config from the default config file.
conf := config.NewConfig(context.TODO(),
conf := config.NewConfig(context.Background(),
config.Config{GlobalConfigFile: "../gatewayd.yaml", PluginConfigFile: "../gatewayd_plugins.yaml"})
gerr := conf.InitConfig(context.TODO())
gerr := conf.InitConfig(context.Background())
require.Nil(t, gerr)
assert.NotEmpty(t, conf.Global)

Expand Down Expand Up @@ -94,9 +94,9 @@ func TestGetGlobalConfigWithGroupName(t *testing.T) {

func TestGetGlobalConfigWithNonExistingGroupName(t *testing.T) {
// Load config from the default config file.
conf := config.NewConfig(context.TODO(),
conf := config.NewConfig(context.Background(),
config.Config{GlobalConfigFile: "../gatewayd.yaml", PluginConfigFile: "../gatewayd_plugins.yaml"})
gerr := conf.InitConfig(context.TODO())
gerr := conf.InitConfig(context.Background())
require.Nil(t, gerr)
assert.NotEmpty(t, conf.Global)

Expand All @@ -112,9 +112,9 @@ func TestGetGlobalConfigWithNonExistingGroupName(t *testing.T) {

func TestGetPluginConfig(t *testing.T) {
// Load config from the default config file.
conf := config.NewConfig(context.TODO(),
conf := config.NewConfig(context.Background(),
config.Config{GlobalConfigFile: "../gatewayd.yaml", PluginConfigFile: "../gatewayd_plugins.yaml"})
gerr := conf.InitConfig(context.TODO())
gerr := conf.InitConfig(context.Background())
require.Nil(t, gerr)
assert.NotEmpty(t, conf.Global)

Expand All @@ -141,7 +141,7 @@ func TestGetPlugins(t *testing.T) {
Logger: zerolog.Logger{},
})
pluginRegistry := plugin.NewRegistry(
context.TODO(),
context.Background(),
plugin.Registry{
ActRegistry: actRegistry,
Compatibility: config.Loose,
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestGetPluginsWithEmptyPluginRegistry(t *testing.T) {
Logger: zerolog.Logger{},
})
pluginRegistry := plugin.NewRegistry(
context.TODO(),
context.Background(),
plugin.Registry{
ActRegistry: actRegistry,
Compatibility: config.Loose,
Expand All @@ -218,7 +218,7 @@ func TestGetPluginsWithEmptyPluginRegistry(t *testing.T) {
func TestPools(t *testing.T) {
api := API{
Pools: map[string]map[string]*pool.Pool{
config.Default: {config.DefaultConfigurationBlock: pool.NewPool(context.TODO(), config.EmptyPoolCapacity)},
config.Default: {config.DefaultConfigurationBlock: pool.NewPool(context.Background(), config.EmptyPoolCapacity)},
},
ctx: context.Background(),
}
Expand Down Expand Up @@ -253,13 +253,13 @@ func TestGetProxies(t *testing.T) {
Network: config.DefaultNetwork,
Address: postgresAddress,
}
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{}, nil)
client := network.NewClient(context.Background(), clientConfig, zerolog.Logger{}, nil)
require.NotNil(t, client)
newPool := pool.NewPool(context.TODO(), 1)
newPool := pool.NewPool(context.Background(), 1)
assert.Nil(t, newPool.Put(client.ID, client))

proxy := network.NewProxy(
context.TODO(),
context.Background(),
network.Proxy{
AvailableConnections: newPool,
HealthCheckPeriod: config.DefaultHealthCheckPeriod,
Expand Down Expand Up @@ -305,13 +305,13 @@ func TestGetServers(t *testing.T) {
Network: config.DefaultNetwork,
Address: postgresAddress,
}
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{}, nil)
newPool := pool.NewPool(context.TODO(), 1)
client := network.NewClient(context.Background(), clientConfig, zerolog.Logger{}, nil)
newPool := pool.NewPool(context.Background(), 1)
require.NotNil(t, newPool)
assert.Nil(t, newPool.Put(client.ID, client))

proxy := network.NewProxy(
context.TODO(),
context.Background(),
network.Proxy{
AvailableConnections: newPool,
HealthCheckPeriod: config.DefaultHealthCheckPeriod,
Expand All @@ -336,7 +336,7 @@ func TestGetServers(t *testing.T) {
})

pluginRegistry := plugin.NewRegistry(
context.TODO(),
context.Background(),
plugin.Registry{
ActRegistry: actRegistry,
Compatibility: config.Loose,
Expand All @@ -346,7 +346,7 @@ func TestGetServers(t *testing.T) {
)

server := network.NewServer(
context.TODO(),
context.Background(),
network.Server{
Network: config.DefaultNetwork,
Address: postgresAddress,
Expand Down
26 changes: 10 additions & 16 deletions api/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"errors"
"net"

v1 "github.com/gatewayd-io/gatewayd/api/v1"
Expand Down Expand Up @@ -41,18 +42,23 @@ func NewGRPCServer(ctx context.Context, server GRPCServer) *GRPCServer {

// Start starts the gRPC server.
func (s *GRPCServer) Start() {
s.start(s.API, s.grpcServer, s.listener)
if err := s.grpcServer.Serve(s.listener); err != nil && !errors.Is(err, net.ErrClosed) {
s.API.Options.Logger.Err(err).Msg("failed to start gRPC API")
}
}

// Shutdown shuts down the gRPC server.
func (s *GRPCServer) Shutdown(_ context.Context) {
s.shutdown(s.grpcServer)
func (s *GRPCServer) Shutdown(context.Context) {
if err := s.listener.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
s.API.Options.Logger.Err(err).Msg("failed to close listener")
}
s.grpcServer.GracefulStop()
}

// createGRPCAPI creates a new gRPC API server and listener.
func createGRPCAPI(api *API, healthchecker *HealthChecker) (*grpc.Server, net.Listener) {
listener, err := net.Listen(api.Options.GRPCNetwork, api.Options.GRPCAddress)
if err != nil {
if err != nil && !errors.Is(err, net.ErrClosed) {
api.Options.Logger.Err(err).Msg("failed to start gRPC API")
return nil, nil
}
Expand All @@ -64,15 +70,3 @@ func createGRPCAPI(api *API, healthchecker *HealthChecker) (*grpc.Server, net.Li

return grpcServer, listener
}

// start starts the gRPC API.
func (s *GRPCServer) start(api *API, grpcServer *grpc.Server, listener net.Listener) {
if err := grpcServer.Serve(listener); err != nil {
api.Options.Logger.Err(err).Msg("failed to start gRPC API")
}
}

// shutdown shuts down the gRPC API.
func (s *GRPCServer) shutdown(grpcServer *grpc.Server) {
grpcServer.GracefulStop()
}
9 changes: 6 additions & 3 deletions api/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (

// Test_GRPC_Server tests the gRPC server.
func Test_GRPC_Server(t *testing.T) {
api := getAPIConfig()
api := getAPIConfig(
"localhost:18081",
"localhost:19091",
)
healthchecker := &HealthChecker{Servers: api.Servers}
grpcServer := NewGRPCServer(
context.Background(), GRPCServer{API: api, HealthChecker: healthchecker})
Expand All @@ -25,7 +28,7 @@ func Test_GRPC_Server(t *testing.T) {
}(grpcServer)

grpcClient, err := grpc.NewClient(
"localhost:19090", grpc.WithTransportCredentials(insecure.NewCredentials()))
"localhost:19091", grpc.WithTransportCredentials(insecure.NewCredentials()))
assert.Nil(t, err)
defer grpcClient.Close()

Expand All @@ -36,5 +39,5 @@ func Test_GRPC_Server(t *testing.T) {
assert.Equal(t, config.Version, resp.GetVersion())
assert.Equal(t, config.VersionInfo(), resp.GetVersionInfo())

grpcServer.Shutdown(context.Background())
grpcServer.Shutdown(nil) //nolint:staticcheck
}
27 changes: 19 additions & 8 deletions api/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ func Test_Healthchecker(t *testing.T) {
Network: config.DefaultNetwork,
Address: postgresAddress,
}
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{}, nil)
newPool := pool.NewPool(context.TODO(), 1)
client := network.NewClient(context.Background(), clientConfig, zerolog.Logger{}, nil)
newPool := pool.NewPool(context.Background(), 1)
require.NotNil(t, newPool)
assert.Nil(t, newPool.Put(client.ID, client))

proxy := network.NewProxy(
context.TODO(),
context.Background(),
network.Proxy{
AvailableConnections: newPool,
HealthCheckPeriod: config.DefaultHealthCheckPeriod,
Expand All @@ -55,7 +55,7 @@ func Test_Healthchecker(t *testing.T) {
})

pluginRegistry := plugin.NewRegistry(
context.TODO(),
context.Background(),
plugin.Registry{
ActRegistry: actRegistry,
Compatibility: config.Loose,
Expand All @@ -75,10 +75,10 @@ func Test_Healthchecker(t *testing.T) {
}()

server := network.NewServer(
context.TODO(),
context.Background(),
network.Server{
Network: config.DefaultNetwork,
Address: postgresAddress,
Address: "127.0.0.1:15432",
TickInterval: config.DefaultTickInterval,
Options: network.Option{
EnableTicker: false,
Expand All @@ -99,26 +99,37 @@ func Test_Healthchecker(t *testing.T) {
raftNode: raftHelper.Node,
}
assert.NotNil(t, healthchecker)
hcr, err := healthchecker.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{})
hcr, err := healthchecker.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
assert.NotNil(t, hcr)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, hcr.GetStatus())

go func(t *testing.T, server *network.Server) {
t.Helper()

if err := server.Run(); err != nil {
t.Errorf("server.Run() error = %v", err)
}
}(t, server)

time.Sleep(1 * time.Second)
// Test for SERVING status
hcr, err = healthchecker.Check(context.TODO(), &grpc_health_v1.HealthCheckRequest{})
hcr, err = healthchecker.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
assert.NotNil(t, hcr)
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, hcr.GetStatus())

err = healthchecker.Watch(&grpc_health_v1.HealthCheckRequest{}, nil)
assert.Error(t, err)
assert.Equal(t, "rpc error: code = Unimplemented desc = not implemented", err.Error())

server.Shutdown()
pluginRegistry.Shutdown()

// Wait for the server to stop.
<-time.After(100 * time.Millisecond)

// check server status and connections
assert.False(t, server.IsRunning())
assert.Zero(t, server.CountConnections())
}
24 changes: 7 additions & 17 deletions api/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ func NewHTTPServer(options *Options) *HTTPServer {

// Start starts the HTTP server.
func (s *HTTPServer) Start() {
s.start(s.options, s.httpServer)
// Start HTTP server (and proxy calls to gRPC server endpoint)
if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.options.Logger.Err(err).Msg("failed to start HTTP API")
}
}

// Shutdown shuts down the HTTP server.
func (s *HTTPServer) Shutdown(ctx context.Context) {
s.shutdown(ctx, s.httpServer, s.logger)
if err := s.httpServer.Shutdown(ctx); err != nil {
s.logger.Err(err).Msg("failed to shutdown HTTP API")
}
}

// CreateHTTPAPI creates a new HTTP API.
Expand Down Expand Up @@ -113,18 +118,3 @@ func createHTTPAPI(options *Options) *http.Server {

return server
}

// start starts the HTTP API.
func (s *HTTPServer) start(options *Options, server *http.Server) {
// Start HTTP server (and proxy calls to gRPC server endpoint)
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
options.Logger.Err(err).Msg("failed to start HTTP API")
}
}

// shutdown shuts down the HTTP API.
func (s *HTTPServer) shutdown(ctx context.Context, server *http.Server, logger zerolog.Logger) {
if err := server.Shutdown(ctx); err != nil {
logger.Err(err).Msg("failed to shutdown HTTP API")
}
}
Loading

0 comments on commit dcfc48b

Please sign in to comment.