From 59facd66b493b506d5f932bc08737eebd54a2502 Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Mon, 11 Mar 2024 14:53:02 +0100 Subject: [PATCH] =?UTF-8?q?Stop=20self=20monitor=20output=20health=20repor?= =?UTF-8?q?ting=20if=20output=20config=20is=20not=20ack=E2=80=A6=20(#3335)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Stop self monitor output health reporting if output config is not acked by agents * updated changelog * added test to error scenario --- ...157707-fix-self-monitor-output-health.yaml | 32 ++++++++++ internal/pkg/bulk/engine.go | 15 ++++- internal/pkg/policy/self.go | 12 ++++ internal/pkg/policy/self_test.go | 60 +++++++++++++++++++ internal/pkg/testing/bulk.go | 4 ++ 5 files changed, 120 insertions(+), 3 deletions(-) create mode 100644 changelog/fragments/1710157707-fix-self-monitor-output-health.yaml diff --git a/changelog/fragments/1710157707-fix-self-monitor-output-health.yaml b/changelog/fragments/1710157707-fix-self-monitor-output-health.yaml new file mode 100644 index 000000000..dc09f7a1c --- /dev/null +++ b/changelog/fragments/1710157707-fix-self-monitor-output-health.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Self monitor stops output health reporting if output config is not acked by agents + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 3335 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: 3334 diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index daf5572e4..5008d37e7 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -74,6 +74,7 @@ type Bulk interface { GetBulker(outputName string) Bulk GetBulkerMap() map[string]Bulk CancelFn() context.CancelFunc + RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool ReadSecrets(ctx context.Context, secretIds []string) (map[string]string, error) } @@ -247,17 +248,25 @@ func (b *Bulker) Client() *elasticsearch.Client { return client } -// check if remote output cfg changed -func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { +func (b *Bulker) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { curCfg := b.remoteOutputConfigMap[name] hasChanged := false // when output config first added, not reporting change if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) { - zlog.Info().Str("name", name).Msg("remote output configuration has changed") hasChanged = true } + return hasChanged +} + +// check if remote output cfg changed +func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { + hasChanged := b.RemoteOutputConfigChanged(zlog, name, newCfg) + if hasChanged { + zlog.Debug().Str("name", name).Msg("remote output configuration has changed") + } + newCfgCopy := make(map[string]interface{}) for k, v := range newCfg { newCfgCopy[k] = v diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index d468b0f39..672904cc8 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -250,10 +250,22 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return state, nil } +func isOutputCfgOutdated(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger, outputName string) bool { + policy, err := dl.QueryOutputFromPolicy(ctx, bulker, outputName) + if err != nil || policy == nil { + return true + } + hasChanged := bulker.RemoteOutputConfigChanged(zlog, outputName, policy.Data.Outputs[outputName]) + return hasChanged +} + func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger) { //pinging logic bulkerMap := bulker.GetBulkerMap() for outputName, outputBulker := range bulkerMap { + if isOutputCfgOutdated(ctx, bulker, zlog, outputName) { + continue + } doc := model.OutputHealth{ Output: outputName, State: client.UnitStateHealthy.String(), diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index 6baf67e56..34ade9fe7 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -667,6 +667,14 @@ func TestSelfMonitor_reportOutputHealthyState(t *testing.T) { } return doc.Message == "" && doc.State == client.UnitStateHealthy.String() }), mock.Anything).Return("", nil) + bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return( + &es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{ + {Source: []byte(`{"data": {"outputs":{"remote":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)}, + }, + }, + }, nil) reportOutputHealth(ctx, bulker, logger) @@ -696,6 +704,58 @@ func TestSelfMonitor_reportOutputDegradedState(t *testing.T) { } return doc.Message == "remote ES is not reachable due to error: error connecting" && doc.State == client.UnitStateDegraded.String() }), mock.Anything).Return("", nil) + bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return( + &es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{ + {Source: []byte(`{"data": {"outputs":{"remote":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)}, + }, + }, + }, nil) + + reportOutputHealth(ctx, bulker, logger) + + bulker.AssertExpectations(t) + outputBulker.AssertExpectations(t) +} + +func TestSelfMonitor_reportOutputSkipIfOutdated(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := testlog.SetLogger(t) + + bulker := ftesting.NewMockBulk() + bulkerMap := make(map[string]bulk.Bulk) + outputBulker := ftesting.NewMockBulk() + bulkerMap["outdated"] = outputBulker + bulker.On("GetBulkerMap").Return(bulkerMap) + bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return( + &es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{ + {Source: []byte(`{"data": {"outputs":{"outdated":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)}, + }, + }, + }, nil) + + reportOutputHealth(ctx, bulker, logger) + + bulker.AssertExpectations(t) + outputBulker.AssertExpectations(t) +} + +func TestSelfMonitor_reportOutputSkipIfNotFound(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := testlog.SetLogger(t) + + bulker := ftesting.NewMockBulk() + bulkerMap := make(map[string]bulk.Bulk) + outputBulker := ftesting.NewMockBulk() + bulkerMap["outdated"] = outputBulker + bulker.On("GetBulkerMap").Return(bulkerMap) + bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return( + &es.ResultT{}, errors.New("output not found")) reportOutputHealth(ctx, bulker, logger) diff --git a/internal/pkg/testing/bulk.go b/internal/pkg/testing/bulk.go index 42cec9903..72d2f409e 100644 --- a/internal/pkg/testing/bulk.go +++ b/internal/pkg/testing/bulk.go @@ -148,4 +148,8 @@ func (m *MockBulk) StartTransactionOptions(name, transactionType string, opts ap return nil } +func (m *MockBulk) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { + return name == "outdated" +} + var _ bulk.Bulk = (*MockBulk)(nil)