Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding more resilitent defaults to ES client #4515

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,36 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


--------------------------------------------------------------------------------
Dependency : github.com/cenkalti/backoff/v4
Version: v4.3.0
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/cenkalti/backoff/v4@v4.3.0/LICENSE:

The MIT License (MIT)

Copyright (c) 2014 Cenk Altı

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


--------------------------------------------------------------------------------
Dependency : github.com/dgraph-io/ristretto
Version: v0.2.0
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.23

require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/dgraph-io/ristretto v0.2.0
github.com/docker/go-units v0.5.0
github.com/elastic/elastic-agent-client/v7 v7.17.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down
18 changes: 8 additions & 10 deletions internal/pkg/api/handleFileDelivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ import (
)

type FileDeliveryT struct {
bulker bulk.Bulk
cache cache.Cache
chunkClient *elasticsearch.Client
deliverer *delivery.Deliverer
authAgent func(*http.Request, *string, bulk.Bulk, cache.Cache) (*model.Agent, error) // injectable for testing purposes
bulker bulk.Bulk
cache cache.Cache
deliverer *delivery.Deliverer
authAgent func(*http.Request, *string, bulk.Bulk, cache.Cache) (*model.Agent, error) // injectable for testing purposes
}

func NewFileDeliveryT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *FileDeliveryT {
return &FileDeliveryT{
chunkClient: chunkClient,
bulker: bulker,
cache: cache,
deliverer: delivery.New(chunkClient, bulker, maxFileSize),
authAgent: authAgent,
bulker: bulker,
cache: cache,
deliverer: delivery.New(chunkClient, bulker, maxFileSize),
authAgent: authAgent,
}
}

Expand Down
7 changes: 3 additions & 4 deletions internal/pkg/api/handleFileDelivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,9 @@ func prepareFileDeliveryMock(t *testing.T) (http.Handler, apiServer, *MockTransp

si := apiServer{
ft: &FileDeliveryT{
bulker: fakebulk,
chunkClient: mockES,
cache: c,
deliverer: delivery.New(mockES, fakebulk, maxFileSize),
bulker: fakebulk,
cache: c,
deliverer: delivery.New(mockES, fakebulk, maxFileSize),
authAgent: func(r *http.Request, id *string, bulker bulk.Bulk, c cache.Cache) (*model.Agent, error) {
return &model.Agent{
ESDocument: model.ESDocument{
Expand Down
102 changes: 102 additions & 0 deletions internal/pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,60 @@ package es

import (
"context"
"errors"
"fmt"
"net/http"
"runtime"
"sync"
"syscall"
"time"

"go.elastic.co/apm/module/apmelasticsearch/v2"

"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/rs/zerolog"

backoff "github.com/cenkalti/backoff/v4"
"github.com/elastic/go-elasticsearch/v8"
)

const (
initialRetryBackoff = 500 * time.Millisecond
maxRetryBackoff = 10 * time.Second
randomizationFactor = 0.5
defaultMaxRetries = 5
)

type ConfigOption func(config *elasticsearch.Config)

var retriableErrors []error
var retriableErrorsLock sync.RWMutex

func applyDefaultOptions(escfg *elasticsearch.Config) {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = initialRetryBackoff
exp.RandomizationFactor = randomizationFactor
exp.MaxInterval = maxRetryBackoff

opts := []ConfigOption{
WithRetryOnErr(syscall.ECONNREFUSED), // server not ready
WithRetryOnErr(syscall.ECONNRESET), // server may be restarting

WithRetryOnStatus(http.StatusTooManyRequests),
WithRetryOnStatus(http.StatusBadGateway),
WithRetryOnStatus(http.StatusServiceUnavailable),
WithRetryOnStatus(http.StatusGatewayTimeout),

WithBackoff(exp),
WithMaxRetries(defaultMaxRetries),
}

for _, opt := range opts {
opt(escfg)
}
}

func NewClient(ctx context.Context, cfg *config.Config, longPoll bool, opts ...ConfigOption) (*elasticsearch.Client, error) {
escfg, err := cfg.Output.Elasticsearch.ToESConfig(longPoll)
if err != nil {
Expand All @@ -29,6 +68,9 @@ func NewClient(ctx context.Context, cfg *config.Config, longPoll bool, opts ...C
addr := cfg.Output.Elasticsearch.Hosts
mcph := cfg.Output.Elasticsearch.MaxConnPerHost

// apply defauly config
applyDefaultOptions(&escfg)

// Apply configuration options
for _, opt := range opts {
opt(&escfg)
Expand Down Expand Up @@ -78,6 +120,66 @@ func InstrumentRoundTripper() ConfigOption {
}
}

func WithRetryOnErr(err error) ConfigOption {
retriableErrorsLock.Lock()
defer retriableErrorsLock.Unlock()

if retriableErrors == nil {
retriableErrors = make([]error, 0, 1)
}
retriableErrors = append(retriableErrors, err)

return func(config *elasticsearch.Config) {
config.RetryOnError = func(_ *http.Request, err error) bool {
retriableErrorsLock.RLock()
defer retriableErrorsLock.RUnlock()

for _, e := range retriableErrors {
if errors.Is(err, e) {
return true
}
}
return false
}
}
}

func WithMaxRetries(retries int) ConfigOption {
return func(config *elasticsearch.Config) {
config.MaxRetries = retries
}
}

func WithRetryOnStatus(status int) ConfigOption {
return func(config *elasticsearch.Config) {
for _, s := range config.RetryOnStatus {
// check for duplicities
if s == status {
return
}
}

config.RetryOnStatus = append(config.RetryOnStatus, status)
}
}

func WithBackoff(exp *backoff.ExponentialBackOff) ConfigOption {
return func(config *elasticsearch.Config) {
if exp == nil {
// no retry backoff
config.RetryBackoff = nil
return
}

config.RetryBackoff = func(attempt int) time.Duration {
if attempt == 1 {
exp.Reset()
}
return exp.NextBackOff()
}
}
}

func userAgent(name string, bi build.Info) string {
return fmt.Sprintf("Elastic-%s/%s (%s; %s; %s; %s)",
name,
Expand Down
28 changes: 24 additions & 4 deletions testing/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *Scaffold) IsFleetServerPortFree() bool {
func (s *Scaffold) FleetServerStatusOK(ctx context.Context, url string) {
s.FleetServerStatusCondition(ctx, url, func(resp *http.Response) bool {
return resp.StatusCode == http.StatusOK
})
}, true)
}

// FleetServerStatusIs will poll fleet-server's status endpoint every second and return when it returns the expected state.
Expand All @@ -172,18 +172,38 @@ func (s *Scaffold) FleetServerStatusIs(ctx context.Context, url string, state cl
s.Require().NoError(err)

return status.Status == state.String()
})
}, true)
}

// FleetServerStatusIs will poll fleet-server's status endpoint every second and return when it returns the expected state.
// If the passed context terminates before a 200 is returned the current test will be marked as failed.
func (s *Scaffold) FleetServerStatusNeverBecomes(ctx context.Context, url string, state client.UnitState) {
s.FleetServerStatusCondition(ctx, url, func(resp *http.Response) bool {
var status struct {
Status string `json:"status"`
}
d, err := io.ReadAll(resp.Body)
s.Require().NoError(err)

err = json.Unmarshal(d, &status)
s.Require().NoError(err)

s.NotEqual(state.String(), status.Status)
return false
}, false)
}

// FleetServerStatusCondition will poll fleet-server's status till the response satisfies the given
// condition.
// If the passed context terminates before, the current test will be marked as failed.
func (s *Scaffold) FleetServerStatusCondition(ctx context.Context, url string, condition func(resp *http.Response) bool) {
func (s *Scaffold) FleetServerStatusCondition(ctx context.Context, url string, condition func(resp *http.Response) bool, failOnDone bool) {
timer := time.NewTimer(time.Second)
for {
select {
case <-ctx.Done():
s.Require().NoError(ctx.Err(), "context expired before status endpoint returned 200")
if failOnDone {
s.Require().NoError(ctx.Err(), "context expired before status endpoint returned 200")
}
return
case <-timer.C:
// ping /api/status
Expand Down
53 changes: 53 additions & 0 deletions testing/e2e/stand_alone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,59 @@ func (suite *StandAloneSuite) TestWithElasticsearchConnectionFailures() {
cmd.Wait()
}

// TestWithElasticsearchConnectionFlakyness checks the behaviour of stand alone Fleet Server
// when Elasticsearch is not reachable portion of the time.
func (suite *StandAloneSuite) TestWithElasticsearchConnectionFlakyness() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)

proxy, err := suite.StartToxiproxy(ctx).CreateProxy("es", "localhost:0", suite.ESHosts)
suite.Require().NoError(err)

// Create a config file from a template in the test temp dir
dir := suite.T().TempDir()
tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-http.tpl"))
suite.Require().NoError(err)
f, err := os.Create(filepath.Join(dir, "config.yml"))
suite.Require().NoError(err)
err = tpl.Execute(f, map[string]string{
"Hosts": "http://" + proxy.Listen,
"ServiceToken": suite.ServiceToken,
})
f.Close()
suite.Require().NoError(err)

// Run the fleet-server binary
cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml"))
cmd.Cancel = func() error {
return cmd.Process.Signal(syscall.SIGTERM)
}
cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath}
err = cmd.Start()
suite.Require().NoError(err)

// Wait to check that it is healthy.
suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateHealthy)

// Provoke timeouts and wait for the healthcheck to fail.
_, err = proxy.AddToxic("force_timeout", "timeout", "upstream", 0.6, toxiproxy.Attributes{}) // we have 5 retries, test with failure 6 out of 10 should be ok
suite.Require().NoError(err)

// wait for unit state degraded
timeoutCtx, tCancel := context.WithTimeout(ctx, 30*time.Second)
suite.FleetServerStatusNeverBecomes(timeoutCtx, "http://localhost:8220", client.UnitStateDegraded)

// test should not fail at this point
tCancel()

// Recover the network and wait for the healthcheck to be healthy again.
err = proxy.RemoveToxic("force_timeout")
suite.Require().NoError(err)
suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateHealthy)

cancel()
cmd.Wait()
}

// TestWithSecretFiles tests starting an HTTPS server using a service-token file, public/private keys + passphrase file.
func (suite *StandAloneSuite) TestWithSecretFiles() {
// Create a service token file in the temp test dir
Expand Down
2 changes: 1 addition & 1 deletion testing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/containerd/containerd v1.7.15 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions testing/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes=
github.com/containerd/containerd v1.7.15/go.mod h1:ISzRRTMF8EXNpJlTzyr2XMhN+j9K302C21/+cr3kUnY=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
Expand Down