@@ -25,9 +25,40 @@ import (
25
25
)
26
26
27
27
const (
28
- dbCommitThresholdBytes = 10 * 1024 * 1024 // commit every 10MB
29
- aggregationIvlKey = "aggregation_interval"
30
- aggregationTypeKey = "aggregation_type"
28
+ // Batch grows in multiples of 2 based on the initial size. For
29
+ // example, if the initial size is 1MB then the batch will grow as
30
+ // {2, 4, 8, 16, ...}. If a batch of size greater than 4MBs is
31
+ // consistently committed then that batch will never be retained
32
+ // if the max retained size is smaller than 8MBs as the batch capacity
33
+ // will always grow to 8MB.
34
+ initialPebbleBatchSize = 64 << 10 // 64KB
35
+ maxRetainedPebbleBatchSize = 8 << 20 // 8MB
36
+
37
+ // pebbleMemTableSize defines the max stead state size of a memtable.
38
+ // There can be more than 1 memtable in memory at a time as it takes
39
+ // time for old memtable to flush. The memtable size also defines
40
+ // the size for large batches. A large batch is a batch which will
41
+ // take atleast half of the memtable size. Note that the Batch#Len
42
+ // is not the same as the memtable size that the batch will occupy
43
+ // as data in batches are encoded differently. In general, the
44
+ // memtable size of the batch will be higher than the length of the
45
+ // batch data.
46
+ //
47
+ // On commit, data in the large batch maybe kept by pebble and thus
48
+ // large batches will need to be reallocated. Note that large batch
49
+ // classification uses the memtable size that a batch will occupy
50
+ // rather than the length of data slice backing the batch.
51
+ pebbleMemTableSize = 32 << 20 // 32MB
52
+
53
+ // dbCommitThresholdBytes is a soft limit and the batch is committed
54
+ // to the DB as soon as it crosses this threshold. To make sure that
55
+ // the commit threshold plays will with the max retained batch size
56
+ // the threshold should be kept smaller than the sum of max retained
57
+ // batch size and encoded size of aggregated data to be committed.
58
+ dbCommitThresholdBytes = 8000 << 10 // 8000KB
59
+
60
+ aggregationIvlKey = "aggregation_interval"
61
+ aggregationTypeKey = "aggregation_type"
31
62
)
32
63
33
64
var (
@@ -86,6 +117,7 @@ func New(opts ...Option) (*Aggregator, error) {
86
117
return & merger , nil
87
118
},
88
119
},
120
+ MemTableSize : pebbleMemTableSize ,
89
121
}
90
122
writeOptions := pebble .Sync
91
123
if cfg .InMemory {
@@ -111,6 +143,7 @@ func New(opts ...Option) (*Aggregator, error) {
111
143
writeOptions : writeOptions ,
112
144
cfg : cfg ,
113
145
processingTime : time .Now ().Truncate (cfg .AggregationIntervals [0 ]),
146
+ batch : newBatch (pb ),
114
147
closed : make (chan struct {}),
115
148
metrics : metrics ,
116
149
}, nil
@@ -246,6 +279,8 @@ func (a *Aggregator) Run(ctx context.Context) error {
246
279
a .mu .Unlock ()
247
280
defer close (a .runStopped )
248
281
282
+ harvestBatch := newBatch (a .db )
283
+ defer func () { harvestBatch .Close () }()
249
284
to := a .processingTime .Add (a .cfg .AggregationIntervals [0 ])
250
285
timer := time .NewTimer (time .Until (to .Add (a .cfg .HarvestDelay )))
251
286
defer timer .Stop ()
@@ -259,14 +294,16 @@ func (a *Aggregator) Run(ctx context.Context) error {
259
294
}
260
295
261
296
a .mu .Lock ()
262
- batch := a .batch
263
- a .batch = nil
297
+ harvestBatch , a .batch = a .batch , harvestBatch
264
298
a .processingTime = to
265
299
cachedEventsStats := a .cachedEvents .loadAndDelete (to )
266
300
a .mu .Unlock ()
267
301
268
- if err := a .commitAndHarvest (ctx , batch , to , cachedEventsStats ); err != nil {
269
- a .cfg .Logger .Warn ("failed to commit and harvest metrics" , zap .Error (err ))
302
+ if err := commitAndReset (harvestBatch , a .writeOptions ); err != nil {
303
+ a .cfg .Logger .Warn ("failed to commit batch" , zap .Error (err ))
304
+ }
305
+ if err := a .harvest (ctx , harvestBatch , to , cachedEventsStats ); err != nil {
306
+ a .cfg .Logger .Warn ("failed to harvest aggregated metrics" , zap .Error (err ))
270
307
}
271
308
to = to .Add (a .cfg .AggregationIntervals [0 ])
272
309
timer .Reset (time .Until (to .Add (a .cfg .HarvestDelay )))
@@ -300,42 +337,47 @@ func (a *Aggregator) Close(ctx context.Context) error {
300
337
}
301
338
302
339
if a .db != nil {
303
- a .cfg .Logger .Info ("running final aggregation" )
304
340
if a .batch != nil {
305
- if err := a .batch .Commit (a .writeOptions ); err != nil {
341
+ a .cfg .Logger .Info ("running final aggregation" )
342
+ if err := commitAndReset (a .batch , a .writeOptions ); err != nil {
306
343
span .RecordError (err )
307
344
return fmt .Errorf ("failed to commit batch: %w" , err )
308
345
}
309
- if err := a .batch .Close (); err != nil {
310
- span .RecordError (err )
311
- return fmt .Errorf ("failed to close batch: %w" , err )
346
+ var errs []error
347
+ for _ , ivl := range a .cfg .AggregationIntervals {
348
+ // At any particular time there will be 1 harvest candidate for
349
+ // each aggregation interval. We will align the end time and
350
+ // process each of these.
351
+ //
352
+ // TODO (lahsivjar): It is possible to harvest the same
353
+ // time multiple times, not an issue but can be optimized.
354
+ to := a .processingTime .Truncate (ivl ).Add (ivl )
355
+ if err := a .harvest (ctx , a .batch , to , a .cachedEvents .loadAndDelete (to )); err != nil {
356
+ span .RecordError (err )
357
+ errs = append (errs , fmt .Errorf (
358
+ "failed to harvest metrics for interval %s: %w" , formatDuration (ivl ), err ),
359
+ )
360
+ }
312
361
}
313
- a .batch = nil
314
- }
315
- var errs []error
316
- for _ , ivl := range a .cfg .AggregationIntervals {
317
- // At any particular time there will be 1 harvest candidate for
318
- // each aggregation interval. We will align the end time and
319
- // process each of these.
320
- //
321
- // TODO (lahsivjar): It is possible to harvest the same
322
- // time multiple times, not an issue but can be optimized.
323
- to := a .processingTime .Truncate (ivl ).Add (ivl )
324
- if err := a .harvest (ctx , to , a .cachedEvents .loadAndDelete (to )); err != nil {
325
- span .RecordError (err )
326
- errs = append (errs , fmt .Errorf (
327
- "failed to harvest metrics for interval %s: %w" , formatDuration (ivl ), err ),
328
- )
362
+ if len (errs ) > 0 {
363
+ return fmt .Errorf ("failed while running final harvest: %w" , errors .Join (errs ... ))
329
364
}
330
365
}
331
- if len (errs ) > 0 {
332
- return fmt .Errorf ("failed while running final harvest: %w" , errors .Join (errs ... ))
366
+ if err := a .batch .Close (); err != nil {
367
+ // Failing to close batch is a non-fatal error as we are simply failing to return
368
+ // the batch to the pool. This error should not be retried so it is ignored but
369
+ // recorded for telemetry.
370
+ span .RecordError (err )
371
+ a .cfg .Logger .Warn ("failed to close batch, this is non-fatal and doesn't lead to data loss" )
333
372
}
373
+ // No need to retry final aggregation.
374
+ a .batch = nil
375
+
334
376
if err := a .db .Close (); err != nil {
335
377
span .RecordError (err )
336
378
return fmt .Errorf ("failed to close pebble: %w" , err )
337
379
}
338
- // All future operations are invalid after db is closed
380
+ // All future operations are invalid after db is closed.
339
381
a .db = nil
340
382
}
341
383
if err := a .metrics .CleanUp (); err != nil {
@@ -370,12 +412,6 @@ func (a *Aggregator) aggregate(
370
412
cmk CombinedMetricsKey ,
371
413
cm * aggregationpb.CombinedMetrics ,
372
414
) (int , error ) {
373
- if a .batch == nil {
374
- // Batch is backed by a sync pool. After each commit we will release the batch
375
- // back to the pool by calling Batch#Close and subsequently acquire a new batch.
376
- a .batch = a .db .NewBatch ()
377
- }
378
-
379
415
op := a .batch .MergeDeferred (cmk .SizeBinary (), cm .SizeVT ())
380
416
if err := cmk .MarshalBinaryToSizedBuffer (op .Key ); err != nil {
381
417
return 0 , fmt .Errorf ("failed to marshal combined metrics key: %w" , err )
@@ -389,52 +425,19 @@ func (a *Aggregator) aggregate(
389
425
390
426
bytesIn := cm .SizeVT ()
391
427
if a .batch .Len () >= dbCommitThresholdBytes {
392
- if err := a .batch . Commit ( a .writeOptions ); err != nil {
428
+ if err := commitAndReset ( a .batch , a .writeOptions ); err != nil {
393
429
return bytesIn , fmt .Errorf ("failed to commit pebble batch: %w" , err )
394
430
}
395
- if err := a .batch .Close (); err != nil {
396
- return bytesIn , fmt .Errorf ("failed to close pebble batch: %w" , err )
397
- }
398
- a .batch = nil
399
431
}
400
432
return bytesIn , nil
401
433
}
402
434
403
- func (a * Aggregator ) commitAndHarvest (
404
- ctx context.Context ,
405
- batch * pebble.Batch ,
406
- to time.Time ,
407
- cachedEventsStats map [time.Duration ]map [[16 ]byte ]float64 ,
408
- ) error {
409
- ctx , span := a .cfg .Tracer .Start (ctx , "commitAndHarvest" )
410
- defer span .End ()
411
-
412
- var errs []error
413
- if batch != nil {
414
- if err := batch .Commit (a .writeOptions ); err != nil {
415
- span .RecordError (err )
416
- errs = append (errs , fmt .Errorf ("failed to commit batch before harvest: %w" , err ))
417
- }
418
- if err := batch .Close (); err != nil {
419
- span .RecordError (err )
420
- errs = append (errs , fmt .Errorf ("failed to close batch before harvest: %w" , err ))
421
- }
422
- }
423
- if err := a .harvest (ctx , to , cachedEventsStats ); err != nil {
424
- span .RecordError (err )
425
- errs = append (errs , fmt .Errorf ("failed to harvest aggregated metrics: %w" , err ))
426
- }
427
- if len (errs ) > 0 {
428
- return errors .Join (errs ... )
429
- }
430
- return nil
431
- }
432
-
433
435
// harvest collects the mature metrics for all aggregation intervals and
434
436
// deletes the entries in db once the metrics are fully harvested. Harvest
435
437
// takes an end time denoting the exclusive upper bound for harvesting.
436
438
func (a * Aggregator ) harvest (
437
439
ctx context.Context ,
440
+ batch * pebble.Batch ,
438
441
end time.Time ,
439
442
cachedEventsStats map [time.Duration ]map [[16 ]byte ]float64 ,
440
443
) error {
@@ -447,7 +450,7 @@ func (a *Aggregator) harvest(
447
450
if end .Truncate (ivl ).Equal (end ) {
448
451
start := end .Add (- ivl ).Add (- a .cfg .Lookback )
449
452
cmCount , err := a .harvestForInterval (
450
- ctx , snap , start , end , ivl , cachedEventsStats [ivl ],
453
+ ctx , batch , snap , start , end , ivl , cachedEventsStats [ivl ],
451
454
)
452
455
if err != nil {
453
456
errs = append (errs , fmt .Errorf (
@@ -473,6 +476,7 @@ func (a *Aggregator) harvest(
473
476
// combined metrics if some of the combined metrics failed harvest.
474
477
func (a * Aggregator ) harvestForInterval (
475
478
ctx context.Context ,
479
+ batch * pebble.Batch ,
476
480
snap * pebble.Snapshot ,
477
481
start , end time.Time ,
478
482
ivl time.Duration ,
@@ -491,11 +495,14 @@ func (a *Aggregator) harvestForInterval(
491
495
from .MarshalBinaryToSizedBuffer (lb )
492
496
to .MarshalBinaryToSizedBuffer (ub )
493
497
494
- iter := snap .NewIter (& pebble.IterOptions {
498
+ iter , err := snap .NewIter (& pebble.IterOptions {
495
499
LowerBound : lb ,
496
500
UpperBound : ub ,
497
501
KeyTypes : pebble .IterKeyTypePointsOnly ,
498
502
})
503
+ if err != nil {
504
+ return 0 , fmt .Errorf ("failed to create pebble iterator: %w" , err )
505
+ }
499
506
defer iter .Close ()
500
507
501
508
var errs []error
@@ -557,7 +564,12 @@ func (a *Aggregator) harvestForInterval(
557
564
a .metrics .EventsProcessed .Add (context .Background (), harvestStats .eventsTotal , commonAttrsOpt , outcomeAttrOpt )
558
565
cachedEventsStats [cmk .ID ] -= harvestStats .eventsTotal
559
566
}
560
- err := a .db .DeleteRange (lb , ub , a .writeOptions )
567
+ if err := batch .DeleteRange (lb , ub , a .writeOptions ); err != nil {
568
+ errs = append (errs , fmt .Errorf ("failed to delete harvested interval: %w" , err ))
569
+ }
570
+ if err := commitAndReset (batch , a .writeOptions ); err != nil {
571
+ errs = append (errs , fmt .Errorf ("failed to commit batch: %w" , err ))
572
+ }
561
573
if len (errs ) > 0 {
562
574
err = errors .Join (err , fmt .Errorf (
563
575
"failed to process %d out of %d metrics:\n %w" ,
@@ -768,3 +780,21 @@ func (hs *harvestStats) addOverflows(cm *aggregationpb.CombinedMetrics, limits L
768
780
addOverflow (ksm .GetMetrics ().GetOverflowGroups (), ksm )
769
781
}
770
782
}
783
+
784
+ // commitAndReset commits and resets a pebble batch. Note that the batch
785
+ // is reset even if the commit fails dropping any data in the batch and
786
+ // resetting it for reuse.
787
+ func commitAndReset (b * pebble.Batch , opts * pebble.WriteOptions ) error {
788
+ defer b .Reset ()
789
+ if err := b .Commit (opts ); err != nil {
790
+ return fmt .Errorf ("failed to commit batch: %w" , err )
791
+ }
792
+ return nil
793
+ }
794
+
795
+ func newBatch (db * pebble.DB ) * pebble.Batch {
796
+ return db .NewBatch (
797
+ pebble .WithInitialSizeBytes (initialPebbleBatchSize ),
798
+ pebble .WithMaxRetainedSizeBytes (maxRetainedPebbleBatchSize ),
799
+ )
800
+ }
0 commit comments