@@ -159,12 +159,12 @@ func (a *Aggregator) AggregateBatch(
159
159
160
160
var err error
161
161
if len (errs ) > 0 {
162
- a .metrics .BytesProcessed .Add (ctx , failBytes , metric .WithAttributeSet (
162
+ a .metrics .BytesProcessed .Add (context . Background () , failBytes , metric .WithAttributeSet (
163
163
attribute .NewSet (append (cmIDAttrs , telemetry .WithFailure ())... ),
164
164
))
165
165
err = fmt .Errorf ("failed batch aggregation:\n %w" , errors .Join (errs ... ))
166
166
}
167
- a .metrics .BytesProcessed .Add (ctx , successBytes , metric .WithAttributeSet (
167
+ a .metrics .BytesProcessed .Add (context . Background () , successBytes , metric .WithAttributeSet (
168
168
attribute .NewSet (append (cmIDAttrs , telemetry .WithSuccess ())... ),
169
169
))
170
170
return err
@@ -199,7 +199,7 @@ func (a *Aggregator) AggregateCombinedMetrics(
199
199
200
200
if cmk .ProcessingTime .Before (a .processingTime .Add (- a .cfg .Lookback )) {
201
201
a .metrics .EventsProcessed .Add (
202
- ctx , cm .EventsTotal ,
202
+ context . Background () , cm .EventsTotal ,
203
203
metric .WithAttributes (append (
204
204
a .cfg .CombinedMetricsIDToKVs (cmk .ID ),
205
205
attribute .String (aggregationIvlKey , formatDuration (cmk .Interval )),
@@ -228,7 +228,7 @@ func (a *Aggregator) AggregateCombinedMetrics(
228
228
229
229
span .SetAttributes (attribute .Int ("bytes_ingested" , bytesIn ))
230
230
a .cachedEvents .add (cmk .Interval , cmk .ID , cm .EventsTotal )
231
- a .metrics .BytesProcessed .Add (ctx , int64 (bytesIn ), attrSetOpt )
231
+ a .metrics .BytesProcessed .Add (context . Background () , int64 (bytesIn ), attrSetOpt )
232
232
return err
233
233
}
234
234
@@ -526,7 +526,7 @@ func (a *Aggregator) harvestForInterval(
526
526
if n == 0 {
527
527
return
528
528
}
529
- a .metrics .MetricsOverflowed .Add (ctx , int64 (n ), commonAttrsOpt , metric .WithAttributes (
529
+ a .metrics .MetricsOverflowed .Add (context . Background () , int64 (n ), commonAttrsOpt , metric .WithAttributes (
530
530
attribute .String (aggregationTypeKey , aggregationType ),
531
531
))
532
532
}
@@ -550,11 +550,11 @@ func (a *Aggregator) harvestForInterval(
550
550
// Negative values are possible at edges due to delays in running the
551
551
// harvest loop or time sync issues between agents and server.
552
552
queuedDelay := time .Since (harvestStats .youngestEventTimestamp ).Seconds ()
553
- a .metrics .MinQueuedDelay .Record (ctx , queuedDelay , commonAttrsOpt , outcomeAttrOpt )
554
- a .metrics .ProcessingLatency .Record (ctx , processingDelay , commonAttrsOpt , outcomeAttrOpt )
553
+ a .metrics .MinQueuedDelay .Record (context . Background () , queuedDelay , commonAttrsOpt , outcomeAttrOpt )
554
+ a .metrics .ProcessingLatency .Record (context . Background () , processingDelay , commonAttrsOpt , outcomeAttrOpt )
555
555
// Events harvested have been successfully processed, publish these
556
556
// as success. Update the map to keep track of events failed.
557
- a .metrics .EventsProcessed .Add (ctx , harvestStats .eventsTotal , commonAttrsOpt , outcomeAttrOpt )
557
+ a .metrics .EventsProcessed .Add (context . Background () , harvestStats .eventsTotal , commonAttrsOpt , outcomeAttrOpt )
558
558
cachedEventsStats [cmk .ID ] -= harvestStats .eventsTotal
559
559
}
560
560
err := a .db .DeleteRange (lb , ub , a .writeOptions )
@@ -589,7 +589,7 @@ func (a *Aggregator) harvestForInterval(
589
589
telemetry .WithFailure (),
590
590
)... ),
591
591
)
592
- a .metrics .EventsProcessed .Add (ctx , eventsTotal , attrSetOpt )
592
+ a .metrics .EventsProcessed .Add (context . Background () , eventsTotal , attrSetOpt )
593
593
}
594
594
return cmCount , err
595
595
}
0 commit comments