Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17](backport #42966) fix(libbeat): mitigate race condition in ratelimit processor #43114

Open
wants to merge 1 commit into
base: 8.17
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Redis: Added replication role as a field to submitted slowlogs
- Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099]

- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changelog needs fixed

- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]
- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]
- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954]
- The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762]
- Filebeat fails to start when its configuration contains usage of the deprecated `log` or `container` inputs. However, they can still be using while setting `allow_deprecated_use: true` in their configuration {pull}42295[42295]
- The fields produced by the Journald input are updated to better match ECS. Renamed fields:
Dropped fields: `syslog.priority` and `syslog.facility` while keeping their duplicated equivalent:
`log.syslog.priority`,`log.syslog.facility.code`. Renamed fields: `syslog.identifier` -> `log.syslog.appname`,
`syslog.pid` -> `log.syslog.procid`. `container.id_truncated` is dropped because the full container ID is
already present as `container.id` and `container.log.tag` is dropped because it is already present as
`log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`,
otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Fixed race conditions in the global ratelimit processor that could drop events or apply rate limiting incorrectly.

*Heartbeat*

Expand Down
28 changes: 28 additions & 0 deletions libbeat/processors/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,31 @@ func TestRateLimit(t *testing.T) {
})
}
}

func TestAllocs(t *testing.T) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(t, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

allocs := testing.AllocsPerRun(1000, func() {
p.Run(&event) //nolint:errcheck // ignore
})
if allocs > 0 {
t.Errorf("allocs = %v; want 0", allocs)
}
}

func BenchmarkRateLimit(b *testing.B) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(b, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

b.ResetTimer()
for i := 0; i < b.N; i++ {
p.Run(&event) //nolint:errcheck // ignore
}
}
64 changes: 44 additions & 20 deletions libbeat/processors/ratelimit/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,36 @@
}

type bucket struct {
mu sync.Mutex

tokens float64
lastReplenish time.Time
}

func (b *bucket) withdraw() bool {
b.mu.Lock()
defer b.mu.Unlock()

if b.tokens < 1 {
return false
}

b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 {
b.mu.Lock()
defer b.mu.Unlock()

secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
return b.tokens
}

type tokenBucket struct {
mu unison.Mutex

Expand Down Expand Up @@ -122,34 +148,29 @@
}

func (t *tokenBucket) getBucket(key uint64) *bucket {
<<<<<<< HEAD

Check failure on line 151 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
v, exists := t.buckets.LoadOrStore(key, &bucket{
tokens: t.depth,
lastReplenish: t.clock.Now(),
})
b := v.(*bucket)

=======
v, exists := t.buckets.Load(key)
>>>>>>> acce29cf2 (fix(libbeat): mitigate race condition in ratelimit processor (#42966))

Check failure on line 160 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
if exists {
//nolint:errcheck // ignore
b := v.(*bucket)
b.replenish(t.limit, t.clock)
return b
}

return b
}

func (b *bucket) withdraw() bool {
if b.tokens < 1 {
return false
}
b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) {
secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
v, _ = t.buckets.LoadOrStore(key, &bucket{
tokens: t.depth,
lastReplenish: t.clock.Now(),
})
//nolint:errcheck // ignore
return v.(*bucket)
}

func (t *tokenBucket) runGC() {
Expand All @@ -174,9 +195,8 @@
key := k.(uint64)
b := v.(*bucket)

b.replenish(t.limit, t.clock)

if b.tokens >= t.depth {
tokens := b.replenish(t.limit, t.clock)
if tokens >= t.depth {
toDelete = append(toDelete, key)
}

Expand All @@ -190,7 +210,11 @@
}

// Reset GC metrics
<<<<<<< HEAD

Check failure on line 213 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
t.gc.metrics.numCalls = atomic.MakeUint(0)
=======
t.gc.metrics.numCalls.Store(0)
>>>>>>> acce29cf2 (fix(libbeat): mitigate race condition in ratelimit processor (#42966))

Check failure on line 217 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)

gcDuration := time.Now().Sub(gcStartTime)
numBucketsDeleted := len(toDelete)
Expand All @@ -198,4 +222,4 @@
t.logger.Debugf("gc duration: %v, buckets: (before: %v, deleted: %v, after: %v)",
gcDuration, numBucketsBefore, numBucketsDeleted, numBucketsAfter)
}()
}

Check failure on line 225 in libbeat/processors/ratelimit/token_bucket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected '}', found 'EOF' (typecheck)
Loading