Skip to content

Commit

Permalink
Add policy monitor debounce time (#3234)
Browse files Browse the repository at this point in the history
Add a policy_debounce_time configuration to add a forced delay to the
policy index monitor when it successfully gathers new documents. This
is needed to stop fleet-server from dispatching all policies in cases
where policy changes occur frequently and fleet-server is unable to
dispatch all policy change actions before the policy is updated.

---------

Co-authored-by: Julia Bardi <julia.bardi@elastic.co>
Co-authored-by: Julia Bardi <90178898+juliaElastic@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 7, 2024
1 parent 3a77b9c commit 659dafd
Show file tree
Hide file tree
Showing 16 changed files with 693 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add policy_debounce_time to monitor

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Add a policy_debounce_time configuration to add a forced delay to the
policy index monitor when it successfully gathers new documents. This
is needed to stop fleet-server from dispatching all policies in cases
where policy changes occur frequently and fleet-server is unable to
dispatch all policy change actions before the policy is updated.
# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
5 changes: 5 additions & 0 deletions fleet-server.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ fleet:
# upstream_url: "https://artifacts.elastic.co/GPG-KEY-elastic-agent"
# # By default dir is the directory containing the fleet-server executable (following symlinks) joined with elastic-agent-upgrade-keys
# dir: ./elastic-agent-upgrade-keys
# # monitor options are advanced configuration and should not be adjusted is most cases
# monitor:
# fetch_size: 1000 # The number of documents that each monitor may fetch at once
# poll_timeout: 4m # The poll timeout for each monitor's wait_for_advancement request
# policy_debounce_time: 1s # The debounce duration for the policy index monitor on successfull document retrievals.

##############################
# Logging configuration
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Bulk interface {
Search(ctx context.Context, index string, body []byte, opts ...Opt) (*es.ResultT, error)
HasTracer() bool
StartTransaction(name, transactionType string) *apm.Transaction
StartTransactionOptions(name, transactionType string, opts apm.TransactionOptions) *apm.Transaction

// Multi Operation API's run in the bulk engine
MCreate(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error)
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,7 @@ func (b *Bulker) HasTracer() bool {
func (b *Bulker) StartTransaction(name, transactionType string) *apm.Transaction {
return b.tracer.StartTransaction(name, transactionType)
}

func (b *Bulker) StartTransactionOptions(name, transactionType string, opts apm.TransactionOptions) *apm.Transaction {
return b.tracer.StartTransactionOptions(name, transactionType, opts)
}
25 changes: 15 additions & 10 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func TestConfig(t *testing.T) {
Server: defaultServer(),
Cache: defaultCache(),
Monitor: Monitor{
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
PolicyDebounceTime: defaultPolicyDebounceTime,
},
},
},
Expand All @@ -67,8 +68,9 @@ func TestConfig(t *testing.T) {
Server: defaultServer(),
Cache: defaultCache(),
Monitor: Monitor{
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
PolicyDebounceTime: defaultPolicyDebounceTime,
},
},
},
Expand All @@ -88,8 +90,9 @@ func TestConfig(t *testing.T) {
Server: defaultServer(),
Cache: defaultCache(),
Monitor: Monitor{
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
PolicyDebounceTime: defaultPolicyDebounceTime,
},
},
},
Expand Down Expand Up @@ -137,8 +140,9 @@ func TestConfig(t *testing.T) {
},
Cache: generateCache(0),
Monitor: Monitor{
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
PolicyDebounceTime: defaultPolicyDebounceTime,
},
},
},
Expand Down Expand Up @@ -211,8 +215,9 @@ func TestConfig(t *testing.T) {
Server: defaultServer(),
Cache: generateCache(2500),
Monitor: Monitor{
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
PolicyDebounceTime: defaultPolicyDebounceTime,
},
},
},
Expand Down
11 changes: 7 additions & 4 deletions internal/pkg/config/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ package config
import "time"

const (
defaultFetchSize = 1000
defaultPollTimeout = 4 * time.Minute
defaultFetchSize = 1000
defaultPollTimeout = 4 * time.Minute
defaultPolicyDebounceTime = time.Second
)

type Monitor struct {
FetchSize int `config:"fetch_size"`
PollTimeout time.Duration `config:"poll_timeout"`
FetchSize int `config:"fetch_size"`
PollTimeout time.Duration `config:"poll_timeout"`
PolicyDebounceTime time.Duration `config:"policy_debounce_time"`
}

func (m *Monitor) InitDefaults() {
m.FetchSize = defaultFetchSize
m.PollTimeout = defaultPollTimeout
m.PolicyDebounceTime = defaultPolicyDebounceTime
}
18 changes: 18 additions & 0 deletions internal/pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type simpleMonitorT struct {
pollTimeout time.Duration
withExpiration bool
fetchSize int
debounceTime time.Duration

checkpoint sqn.SeqNo // index global checkpoint
mx sync.RWMutex // checkpoint mutex
Expand All @@ -111,6 +112,7 @@ func NewSimple(index string, esCli, monCli *elasticsearch.Client, opts ...Option
pollTimeout: defaultPollTimeout,
withExpiration: defaultWithExpiration,
fetchSize: defaultFetchSize,
debounceTime: 0,
checkpoint: sqn.DefaultSeqNo,
outCh: make(chan []es.HitT, 1),
}
Expand Down Expand Up @@ -170,6 +172,12 @@ func WithAPMTracer(tracer *apm.Tracer) Option {
}
}

func WithDebounceTime(dur time.Duration) Option {
return func(m SimpleMonitor) {
m.(*simpleMonitorT).debounceTime = dur
}
}

// Output returns the output channel for the monitor.
func (m *simpleMonitorT) Output() <-chan []es.HitT {
return m.outCh
Expand Down Expand Up @@ -352,6 +360,16 @@ func (m *simpleMonitorT) Run(ctx context.Context) (err error) {
if m.tracer != nil {
trans.End()
}
if m.debounceTime > 0 {
m.log.Debug().Dur("debounce_time", m.debounceTime).Msg("monitor debounce start")
// Introduce a debounce time before wait advance (the signal for new docs in the index)
// This is specifically done so we can introduce a delay in for cases like rapid policy changes
// where fleet-server may not have finished dispatching policies to all agents when a new change is detected.
err := sleep.WithContext(ctx, m.debounceTime)
if err != nil {
return err
}
}
}
}

Expand Down
61 changes: 61 additions & 0 deletions internal/pkg/monitor/monitor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package monitor

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -47,6 +48,66 @@ func TestSimpleMonitorNonEmptyIndex(t *testing.T) {
runSimpleMonitorTest(t, ctx, index, bulker)
}

func TestSimpleMonitorWithDebounce(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
ctx = testlog.SetLogger(t).WithContext(ctx)

index, bulker := ftesting.SetupCleanIndex(ctx, t, ".fleet-actions")

ch := make(chan model.Action)
readyCh := make(chan error)
mon, err := NewSimple(index, bulker.Client(), bulker.Client(),
WithReadyChan(readyCh),
WithDebounceTime(time.Second),
)
require.NoError(t, err)

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
// ready function will add two actions, one immediately, and one after 100ms
return runSimpleMonitor(t, ctx, mon, readyCh, ch, func(ctx context.Context) error {
_, err := ftesting.StoreRandomAction(ctx, bulker, index)
if err != nil {
return err
}
go func(ctx context.Context) {
time.Sleep(100 * time.Millisecond)
err := sleep.WithContext(ctx, 100*time.Millisecond)
if err != nil {
return
}
ftesting.StoreRandomAction(ctx, bulker, index) //nolint:errcheck // test case
}(ctx)
return nil
})
})

// read 2 actions and check that time time between both is at least 1s
g.Go(func() error {
var ts time.Time
for {
select {
case <-ctx.Done():
return nil
case <-ch:
if ts.IsZero() {
ts = time.Now()
continue
}
dur := time.Since(ts)
assert.GreaterOrEqual(t, dur, time.Second)
cn()
}
}
})

err = g.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
t.Fatalf("unexpected error: %v", err)
}
}

func TestSimpleMonitorCheckpointOutOfSync(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
Expand Down
9 changes: 7 additions & 2 deletions internal/pkg/monitor/subscription_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

const (
defaultSubscriptionTimeout = 5 * time.Second // max amount of time subscription has to read from channel
defaultSubscriptionTimeout = 60 * time.Second // max amount of time subscription has to read from channel
)

var gCounter uint64
Expand Down Expand Up @@ -70,8 +70,8 @@ func New(index string, esCli, monCli *elasticsearch.Client, opts ...Option) (Mon

m := &monitorT{
sm: sm,
subTimeout: defaultSubscriptionTimeout,
subs: make(map[uint64]*subT),
subTimeout: defaultSubscriptionTimeout,
}

return m, nil
Expand Down Expand Up @@ -147,10 +147,15 @@ func (m *monitorT) notify(ctx context.Context, hits []es.HitT) {
defer cn()
select {
case s.c <- hits:
zerolog.Ctx(ctx).Info().
Str("ctx", "subscription monitor").
Any("hits", hits).
Msg("received notification")
case <-lc.Done():
zerolog.Ctx(ctx).Error().
Err(lc.Err()).
Str("ctx", "subscription monitor").
Any("hits", hits).
Dur("timeout", m.subTimeout).
Msg("dropped notification")
}
Expand Down
Loading

0 comments on commit 659dafd

Please sign in to comment.