Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.17](backport #42966) fix(libbeat): mitigate race condition in ratelimit processor #43117

Open
wants to merge 1 commit into
base: 7.17
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,47 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Convert netflow input to API v2 and disable event normalisation {pull}37901[37901]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broken changelog merge should be fixed (along with the explicit broken merges below)

- Removed deprecated Squid from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Sonicwall from Beats. Use the https://docs.elastic.co/integrations/sonicwall[SonicWall Firewall] Elastic integration instead. {pull}38037[38037]
- Removed deprecated Radware from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Netscout from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Juniper Netscreen from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Impreva from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Cylance from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Bluecoat from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]
- Added support for hyphens in extension keys in `decode_cef` Filebeat processor. {pull}40427[40427]
- Journald: removed configuration options `include_matches.or`, `include_matches.and`, `backoff`, `max_backoff`, `cursor_seek_fallback`. {pull}40061[40061]
- Journald: `include_matches.match` now behaves in the same way as matchers in `journalctl`. Users should carefully update their input configuration. {pull}40061[40061]
- Journald: `seek` and `since` behaviour have been simplified, if there is a cursor (state) `seek` and `since` are ignored and the cursor is used. {pull}40061[40061]
- Redis: Added replication role as a field to submitted slowlogs
- Added `container.image.name` to `journald` Filebeat input's Docker-specific translated fields. {pull}40450[40450]
- Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]
- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]
- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954]
- The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762]
- Filebeat fails to start when its configuration contains usage of the deprecated `log` or `container` inputs. However, they can still be using while setting `allow_deprecated_use: true` in their configuration {pull}42295[42295]
- The fields produced by the Journald input are updated to better match ECS. Renamed fields:
Dropped fields: `syslog.priority` and `syslog.facility` while keeping their duplicated equivalent:
`log.syslog.priority`,`log.syslog.facility.code`. Renamed fields: `syslog.identifier` -> `log.syslog.appname`,
`syslog.pid` -> `log.syslog.procid`. `container.id_truncated` is dropped because the full container ID is
already present as `container.id` and `container.log.tag` is dropped because it is already present as
`log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`,
otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Fixed race conditions in the global ratelimit processor that could drop events or apply rate limiting incorrectly.

*Heartbeat*

Expand Down
28 changes: 28 additions & 0 deletions libbeat/processors/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,31 @@ func TestRateLimit(t *testing.T) {
})
}
}

func TestAllocs(t *testing.T) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(t, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

allocs := testing.AllocsPerRun(1000, func() {
p.Run(&event) //nolint:errcheck // ignore
})
if allocs > 0 {
t.Errorf("allocs = %v; want 0", allocs)
}
}

func BenchmarkRateLimit(b *testing.B) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(b, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

b.ResetTimer()
for i := 0; i < b.N; i++ {
p.Run(&event) //nolint:errcheck // ignore
}
}
64 changes: 44 additions & 20 deletions libbeat/processors/ratelimit/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,36 @@
}

type bucket struct {
mu sync.Mutex

tokens float64
lastReplenish time.Time
}

func (b *bucket) withdraw() bool {
b.mu.Lock()
defer b.mu.Unlock()

if b.tokens < 1 {
return false
}

b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 {
b.mu.Lock()
defer b.mu.Unlock()

secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
return b.tokens
}

type tokenBucket struct {
mu unison.Mutex

Expand Down Expand Up @@ -122,34 +148,29 @@
}

func (t *tokenBucket) getBucket(key uint64) *bucket {
<<<<<<< HEAD

Check failure on line 151 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (windows)

expected statement, found '<<' (typecheck)

Check failure on line 151 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (linux)

expected statement, found '<<' (typecheck)

Check failure on line 151 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
v, exists := t.buckets.LoadOrStore(key, &bucket{
tokens: t.depth,
lastReplenish: t.clock.Now(),
})
b := v.(*bucket)

=======
v, exists := t.buckets.Load(key)
>>>>>>> acce29cf2 (fix(libbeat): mitigate race condition in ratelimit processor (#42966))

Check failure on line 160 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (windows)

illegal character U+0023 '#' (typecheck)

Check failure on line 160 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (linux)

illegal character U+0023 '#' (typecheck)

Check failure on line 160 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
if exists {
//nolint:errcheck // ignore
b := v.(*bucket)
b.replenish(t.limit, t.clock)
return b
}

return b
}

func (b *bucket) withdraw() bool {
if b.tokens < 1 {
return false
}
b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) {
secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
v, _ = t.buckets.LoadOrStore(key, &bucket{
tokens: t.depth,
lastReplenish: t.clock.Now(),
})
//nolint:errcheck // ignore
return v.(*bucket)
}

func (t *tokenBucket) runGC() {
Expand All @@ -174,9 +195,8 @@
key := k.(uint64)
b := v.(*bucket)

b.replenish(t.limit, t.clock)

if b.tokens >= t.depth {
tokens := b.replenish(t.limit, t.clock)
if tokens >= t.depth {
toDelete = append(toDelete, key)
}

Expand All @@ -190,7 +210,11 @@
}

// Reset GC metrics
<<<<<<< HEAD

Check failure on line 213 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (windows)

expected statement, found '<<' (typecheck)

Check failure on line 213 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (linux)

expected statement, found '<<' (typecheck)

Check failure on line 213 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
t.gc.metrics.numCalls = atomic.MakeUint(0)
=======
t.gc.metrics.numCalls.Store(0)
>>>>>>> acce29cf2 (fix(libbeat): mitigate race condition in ratelimit processor (#42966))

Check failure on line 217 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (windows)

illegal character U+0023 '#' (typecheck)

Check failure on line 217 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (linux)

illegal character U+0023 '#' (typecheck)

Check failure on line 217 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)

gcDuration := time.Now().Sub(gcStartTime)
numBucketsDeleted := len(toDelete)
Expand All @@ -198,4 +222,4 @@
t.logger.Debugf("gc duration: %v, buckets: (before: %v, deleted: %v, after: %v)",
gcDuration, numBucketsBefore, numBucketsDeleted, numBucketsAfter)
}()
}

Check failure on line 225 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (windows)

expected '}', found 'EOF' (typecheck)

Check failure on line 225 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (linux)

expected '}', found 'EOF' (typecheck)

Check failure on line 225 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected '}', found 'EOF' (typecheck)
Loading