From ed1db20c56119705c47536afdf85078cd3630be9 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 21 Jan 2025 12:45:35 -0500 Subject: [PATCH 1/4] feat: make drop decision queue size configurable --- collect/cache/cuckoo.go | 9 ++++++--- config/file_config.go | 1 + config/metadata/configMeta.yaml | 10 ++++++++++ route/route.go | 4 +++- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/collect/cache/cuckoo.go b/collect/cache/cuckoo.go index a8e900e992..dfa517662a 100644 --- a/collect/cache/cuckoo.go +++ b/collect/cache/cuckoo.go @@ -39,7 +39,7 @@ type CuckooTraceChecker struct { const ( // This is how many items can be in the Add Queue before we start blocking on Add. - AddQueueDepth = 1000 + defaultAddQueueDepth = 1000 // This is how long we'll sleep between possible lock cycles. AddQueueSleepTime = 100 * time.Microsecond ) @@ -52,13 +52,16 @@ var cuckooTraceCheckerMetrics = []metrics.Metadata{ {Name: AddQueueLockTime, Type: metrics.Histogram, Unit: metrics.Microseconds, Description: "the time spent holding the add queue lock"}, } -func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker { +func NewCuckooTraceChecker(capacity uint, addQueueDepth uint, m metrics.Metrics) *CuckooTraceChecker { + if addQueueDepth == 0 { + addQueueDepth = defaultAddQueueDepth + } c := &CuckooTraceChecker{ capacity: capacity, current: cuckoo.NewFilter(capacity), future: nil, met: m, - addch: make(chan string, AddQueueDepth), + addch: make(chan string, defaultAddQueueDepth), } for _, metric := range cuckooTraceCheckerMetrics { m.Register(metric) diff --git a/config/file_config.go b/config/file_config.go index c82634f7b0..6f65332a49 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -402,6 +402,7 @@ type GRPCServerParameters struct { type SampleCacheConfig struct { KeptSize uint `yaml:"KeptSize" default:"10_000"` DroppedSize uint `yaml:"DroppedSize" default:"1_000_000"` + DroppedQueueSize uint `yaml:"DroppedQueueSize" default: "1000"` SizeCheckInterval Duration `yaml:"SizeCheckInterval" default:"10s"` } diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index a7ff328e73..3ee9514bcd 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -1774,6 +1774,16 @@ groups: Changing its size with live reload sets a future limit, but does not have an immediate effect. + - name: DroppedQueueSize + type: int + valuetype: nondefault + default: 1000 + reload: true + summary: is the maximum number of in-flight drop decision allowed before adding to the dropped trace cache. + description: > + The dropped decision queue is used to buffer drop decisions before they are stored in the decision cache. + If this queue fills up, then subsequent drop decisions will be discarded. + - name: SizeCheckInterval v1group: SampleCacheConfig/SampleCache v1name: SizeCheckInterval diff --git a/route/route.go b/route/route.go index ef02d9f82c..2183950b66 100644 --- a/route/route.go +++ b/route/route.go @@ -128,6 +128,8 @@ func (r *Router) SetVersion(ver string) { var routerMetrics = []metrics.Metadata{ {Name: "_router_proxied", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of events proxied to another refinery"}, {Name: "_router_event", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of events received"}, + {Name: "_router_batch", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of events received"}, + {Name: "_router_otlp", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of otlp requests received"}, {Name: "_router_span", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of spans received"}, {Name: "_router_dropped", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of events dropped because the channel was full"}, {Name: "_router_nonspan", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of non-span events received"}, @@ -537,6 +539,7 @@ func (router *Router) processOTLPRequest( batches []huskyotlp.Batch, apiKey string, incomingUserAgent string) error { + router.Metrics.Increment(router.incomingOrPeer + "_router_otlp") router.Metrics.Increment(router.incomingOrPeer + "_router_otlp") @@ -612,7 +615,6 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { if processed { if !kept { return nil - } // If the span was kept, we want to generate a probe that we'll forward From 65b9be3d5436924ff6f9bfb61bf0c97d46f8ffb0 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 21 Jan 2025 13:27:55 -0500 Subject: [PATCH 2/4] acutally pass in the config value --- collect/cache/cuckooSentCache.go | 2 +- collect/cache/cuckoo_test.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/collect/cache/cuckooSentCache.go b/collect/cache/cuckooSentCache.go index e03ee8ba93..756569a889 100644 --- a/collect/cache/cuckooSentCache.go +++ b/collect/cache/cuckooSentCache.go @@ -170,7 +170,7 @@ func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (Trac if err != nil { return nil, err } - dropped := NewCuckooTraceChecker(cfg.DroppedSize, met) + dropped := NewCuckooTraceChecker(cfg.DroppedSize, cfg.DroppedQueueSize, met) // we want to keep track of the most recent dropped traces so we can avoid // checking them in the dropped filter, which can have contention issues // under high load. So we use a cache with TTL to keep track of the most diff --git a/collect/cache/cuckoo_test.go b/collect/cache/cuckoo_test.go index 881a715448..9f7ffdc427 100644 --- a/collect/cache/cuckoo_test.go +++ b/collect/cache/cuckoo_test.go @@ -31,7 +31,7 @@ func BenchmarkCuckooTraceChecker_Add(b *testing.B) { traceIDs[i] = genID(32) } - c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{}) + c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{}) b.ResetTimer() for i := 0; i < b.N; i++ { c.Add(traceIDs[i]) @@ -57,7 +57,7 @@ func BenchmarkCuckooTraceChecker_AddParallel(b *testing.B) { } }) - c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{}) + c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{}) ch := make(chan int, numGoroutines) for i := 0; i < numGoroutines; i++ { p.Go(func() { @@ -89,7 +89,7 @@ func BenchmarkCuckooTraceChecker_Check(b *testing.B) { traceIDs[i] = genID(32) } - c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{}) + c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{}) // add every other one to the filter for i := 0; i < b.N; i += 2 { if i%10000 == 0 { @@ -111,7 +111,7 @@ func BenchmarkCuckooTraceChecker_CheckParallel(b *testing.B) { traceIDs[i] = genID(32) } - c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{}) + c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{}) for i := 0; i < b.N; i += 2 { if i%10000 == 0 { c.Maintain() @@ -165,7 +165,7 @@ func BenchmarkCuckooTraceChecker_CheckAddParallel(b *testing.B) { met := &metrics.MockMetrics{} met.Start() - c := NewCuckooTraceChecker(1000000, met) + c := NewCuckooTraceChecker(1000000, 10000, met) const numCheckers = 30 const numAdders = 30 From 553ca397da61e3358c8a919c2a5e0bf9bf21c200 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 21 Jan 2025 17:13:37 -0500 Subject: [PATCH 3/4] make default queue size 10000 --- config/file_config.go | 2 +- config/metadata/configMeta.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/file_config.go b/config/file_config.go index 6f65332a49..280a61d5e6 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -402,7 +402,7 @@ type GRPCServerParameters struct { type SampleCacheConfig struct { KeptSize uint `yaml:"KeptSize" default:"10_000"` DroppedSize uint `yaml:"DroppedSize" default:"1_000_000"` - DroppedQueueSize uint `yaml:"DroppedQueueSize" default: "1000"` + DroppedQueueSize uint `yaml:"DroppedQueueSize" default: "10000"` SizeCheckInterval Duration `yaml:"SizeCheckInterval" default:"10s"` } diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index 3ee9514bcd..4896d2e8ba 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -1777,7 +1777,7 @@ groups: - name: DroppedQueueSize type: int valuetype: nondefault - default: 1000 + default: 10000 reload: true summary: is the maximum number of in-flight drop decision allowed before adding to the dropped trace cache. description: > From 1a7ceb7a8b63693941a8fecbabf9b55706178fb5 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Thu, 23 Jan 2025 10:32:19 -0500 Subject: [PATCH 4/4] actually use the configured value --- collect/cache/cuckoo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collect/cache/cuckoo.go b/collect/cache/cuckoo.go index dfa517662a..5a7312b2aa 100644 --- a/collect/cache/cuckoo.go +++ b/collect/cache/cuckoo.go @@ -61,7 +61,7 @@ func NewCuckooTraceChecker(capacity uint, addQueueDepth uint, m metrics.Metrics) current: cuckoo.NewFilter(capacity), future: nil, met: m, - addch: make(chan string, defaultAddQueueDepth), + addch: make(chan string, addQueueDepth), } for _, metric := range cuckooTraceCheckerMetrics { m.Register(metric)