Skip to content

Commit

Permalink
fix(libbeat): mitigate race condition in ratelimit processor (#42966)
Browse files Browse the repository at this point in the history
* fix(libbeat): mitigate race condition in ratelimit processor

* use right changelog file

* remove allocation on the hot path

* use past tense in changelog entry

* add test for allocs
  • Loading branch information
mauri870 authored Mar 7, 2025
1 parent ca4ad40 commit acce29c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Dropped fields: `syslog.priority` and `syslog.facility` while keeping their dupl
already present as `container.id` and `container.log.tag` is dropped because it is already present as
`log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`,
otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Fixed race conditions in the global ratelimit processor that could drop events or apply rate limiting incorrectly.

*Heartbeat*

Expand Down
28 changes: 28 additions & 0 deletions libbeat/processors/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,31 @@ func TestRateLimit(t *testing.T) {
})
}
}

func TestAllocs(t *testing.T) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(t, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

allocs := testing.AllocsPerRun(1000, func() {
p.Run(&event) //nolint:errcheck // ignore
})
if allocs > 0 {
t.Errorf("allocs = %v; want 0", allocs)
}
}

func BenchmarkRateLimit(b *testing.B) {
p, err := new(conf.MustNewConfigFrom(mapstr.M{
"limit": "100/s",
}))
require.NoError(b, err)
event := beat.Event{Fields: mapstr.M{"field": 1}}

b.ResetTimer()
for i := 0; i < b.N; i++ {
p.Run(&event) //nolint:errcheck // ignore
}
}
66 changes: 38 additions & 28 deletions libbeat/processors/ratelimit/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,36 @@ func init() {
}

type bucket struct {
mu sync.Mutex

tokens float64
lastReplenish time.Time
}

func (b *bucket) withdraw() bool {
b.mu.Lock()
defer b.mu.Unlock()

if b.tokens < 1 {
return false
}

b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) float64 {
b.mu.Lock()
defer b.mu.Unlock()

secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
return b.tokens
}

type tokenBucket struct {
mu unison.Mutex

Expand Down Expand Up @@ -122,35 +148,20 @@ func (t *tokenBucket) setClock(c clockwork.Clock) {
}

func (t *tokenBucket) getBucket(key uint64) *bucket {
v, exists := t.buckets.LoadOrStore(key, &bucket{
tokens: t.depth,
lastReplenish: t.clock.Now(),
})
//nolint:errcheck // ignore
b := v.(*bucket)

v, exists := t.buckets.Load(key)
if exists {
//nolint:errcheck // ignore
b := v.(*bucket)
b.replenish(t.limit, t.clock)
return b
}

return b
}

func (b *bucket) withdraw() bool {
if b.tokens < 1 {
return false
}
b.tokens--
return true
}

func (b *bucket) replenish(rate rate, clock clockwork.Clock) {
secsSinceLastReplenish := clock.Now().Sub(b.lastReplenish).Seconds()
tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond()

b.tokens += tokensToReplenish
b.lastReplenish = clock.Now()
v, _ = t.buckets.LoadOrStore(key, &bucket{
tokens: t.depth,
lastReplenish: t.clock.Now(),
})
//nolint:errcheck // ignore
return v.(*bucket)
}

func (t *tokenBucket) runGC() {
Expand All @@ -177,9 +188,8 @@ func (t *tokenBucket) runGC() {
//nolint:errcheck // ignore
b := v.(*bucket)

b.replenish(t.limit, t.clock)

if b.tokens >= t.depth {
tokens := b.replenish(t.limit, t.clock)
if tokens >= t.depth {
toDelete = append(toDelete, key)
}

Expand All @@ -193,7 +203,7 @@ func (t *tokenBucket) runGC() {
}

// Reset GC metrics
t.gc.metrics.numCalls = atomic.Uint64{}
t.gc.metrics.numCalls.Store(0)

gcDuration := time.Since(gcStartTime)
numBucketsDeleted := len(toDelete)
Expand Down

0 comments on commit acce29c

Please sign in to comment.