From 99a3539e0f673dde7e0d0a22b41a10794e6f1c3e Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:54:20 +0100 Subject: [PATCH 1/2] fix aggregator benchmark ``` == warmup == 1 2.564000169513747e-06 1000 0.0003257179996580817 10000 0.003369195001141634 100000 0.03210452299936151 1000000 0.31846129199948336 10000000 3.215345189999425 == noop == 1 1.984999471460469e-06 1000 0.00032863300111785065 10000 0.0032671120006853016 100000 0.03219444500064128 1000000 0.32194886400066025 10000000 3.2092841119992954 == sum == 1 1.1226000424358062e-05 1000 0.000391326000681147 10000 0.0033218999997188803 100000 0.031986795000193524 1000000 0.32090956100000767 10000000 3.2212803769998573 == fake == 1 4.441999408300035e-06 1000 0.0003171700009261258 10000 0.0033069499986595474 100000 0.03180618599981244 1000000 0.3161616289999074 10000000 3.176653272999829 ``` --- lib/carbon/tests/benchmark_aggregator.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/carbon/tests/benchmark_aggregator.py b/lib/carbon/tests/benchmark_aggregator.py index 786a7ef84..ad1c1f102 100644 --- a/lib/carbon/tests/benchmark_aggregator.py +++ b/lib/carbon/tests/benchmark_aggregator.py @@ -3,7 +3,6 @@ from carbon.aggregator.processor import AggregationProcessor, RuleManager from carbon.aggregator.buffers import BufferManager -from carbon.tests.util import print_stats from carbon.conf import settings from carbon import state @@ -43,21 +42,23 @@ def _bench_aggregator(name): buf = None for n in [1, 1000, 10000, 100000, 1000000, 10000000]: processor = AggregationProcessor() - processor.process(METRIC, (now, 1)) + for x in processor.process(METRIC, (now, 1)): + pass def _process(): processor.process(METRIC, (now + _process.i, 1)) if (_process.i % FREQUENCY) == 0 and buf is not None: - buf.compute_values() + buf.compute_value() _process.i += 1 _process.i = 0 if buf is None: - buf = BufferManager.get_buffer(METRIC_AGGR, 1, None) - + buf = BufferManager.buffers.get(METRIC_AGGR) t = timeit.timeit(_process, number=n) - buf.close() - print_stats(n, t) + + if buf: + buf.close() + print(n, t) print("") From 7ea6ce9b2802ecf8c050e208c0abe30e0e2fee24 Mon Sep 17 00:00:00 2001 From: David Buckley Date: Wed, 21 Aug 2024 17:54:22 +0100 Subject: [PATCH 2/2] aggregator -- allow batches of metrics from the past to be sent ``` == warmup == 1 3.7179997889325023e-06 1000 0.00032810999982757494 10000 0.003592125000068336 100000 0.03169123900079285 1000000 0.31713802899867005 10000000 3.157578217000264 == noop == 1 2.0410006982274354e-06 1000 0.00042683100036811084 10000 0.003553029999238788 100000 0.031913518001601915 1000000 0.31525603199952457 10000000 3.1432045359997574 == sum == 1 9.91100023384206e-06 1000 0.00038492900057462975 10000 0.003426478999244864 100000 0.031299194999519386 1000000 0.3173040299989225 10000000 3.1426827399991453 == fake == 1 4.47000093117822e-06 1000 0.00035055999978794716 10000 0.0035677849991770927 100000 0.031690031000835006 1000000 0.31694292799875257 10000000 3.1394117309992 ``` No appreciable difference in benchmark output. If anything, it's faster. --- conf/carbon.conf.example | 13 ++++++-- lib/carbon/aggregator/buffers.py | 33 +++++++++++++-------- lib/carbon/tests/test_aggregator_buffers.py | 32 ++++++++++++++++---- 3 files changed, 56 insertions(+), 22 deletions(-) diff --git a/conf/carbon.conf.example b/conf/carbon.conf.example index e919de1d9..91a90ec2f 100644 --- a/conf/carbon.conf.example +++ b/conf/carbon.conf.example @@ -637,9 +637,16 @@ USE_FLOW_CONTROL = True # You shouldn't need to tune this unless you really know what you're doing. MAX_DATAPOINTS_PER_MESSAGE = 500 -# This defines how many datapoints the aggregator remembers for -# each metric. Aggregation only happens for datapoints that fall in -# the past MAX_AGGREGATION_INTERVALS * intervalSize seconds. +# This defines how many intervals behind the current one the aggregator +# remembers for each metric. Intervals are expired when either: +# * They have seen no new datapoints in the last +# (MAX_AGGREGATION_INTERVALS * configured_frequency) seconds, or +# * There are more than (MAX_AGGREGATION_INTERVALS + 2) intervals with +# datapoints in them. (This allows an application to replay past metrics with +# sensible behaviour.) +# Intervals are expired only when they are aggregated. +# Expired intervals will be treated as empty if new datapoints arrive. +# See WRITE_BACK_FREQUENCY to control the aggregation frequency. MAX_AGGREGATION_INTERVALS = 5 # Limit the number of open connections the receiver can handle as any time. diff --git a/lib/carbon/aggregator/buffers.py b/lib/carbon/aggregator/buffers.py index 096dd6c7b..baeb707f8 100644 --- a/lib/carbon/aggregator/buffers.py +++ b/lib/carbon/aggregator/buffers.py @@ -61,23 +61,30 @@ def configure_aggregation(self, frequency, func): def compute_value(self): now = int(time.time()) current_interval = now - (now % self.aggregation_frequency) + max_aggregation_intervals = settings['MAX_AGGREGATION_INTERVALS'] age_threshold = current_interval - ( - settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency) + max_aggregation_intervals * self.aggregation_frequency) for buffer in list(self.interval_buffers.values()): - if buffer.active: + if buffer.inactive_since is None: value = self.aggregation_func(buffer.values) datapoint = (buffer.interval, value) state.events.metricGenerated(self.metric_path, datapoint) state.instrumentation.increment('aggregateDatapointsSent') - buffer.mark_inactive() + buffer.mark_inactive(current_interval) - if buffer.interval < age_threshold: + elif buffer.inactive_since < age_threshold: del self.interval_buffers[buffer.interval] - if not self.interval_buffers: - self.close() - self.configured = False - del BufferManager.buffers[self.metric_path] + + if len(self.interval_buffers) > max_aggregation_intervals + 2: + ordered_intervals = sorted(self.interval_buffers) + for interval in ordered_intervals[:-max_aggregation_intervals - 2]: + del self.interval_buffers[interval] + + if not self.interval_buffers: + self.close() + self.configured = False + del BufferManager.buffers[self.metric_path] def close(self): if self.compute_task and self.compute_task.running: @@ -89,19 +96,19 @@ def size(self): class IntervalBuffer: - __slots__ = ('interval', 'values', 'active') + __slots__ = ('interval', 'values', 'inactive_since') def __init__(self, interval): self.interval = interval self.values = [] - self.active = True + self.inactive_since = None def input(self, datapoint): self.values.append(datapoint[1]) - self.active = True + self.inactive_since = None - def mark_inactive(self): - self.active = False + def mark_inactive(self, interval): + self.inactive_since = interval # Shared importable singleton diff --git a/lib/carbon/tests/test_aggregator_buffers.py b/lib/carbon/tests/test_aggregator_buffers.py index ffac75e5c..44a6669e2 100644 --- a/lib/carbon/tests/test_aggregator_buffers.py +++ b/lib/carbon/tests/test_aggregator_buffers.py @@ -109,7 +109,7 @@ def test_compute_value_marks_buffer_inactive(self): with patch.object(IntervalBuffer, 'mark_inactive') as mark_inactive_mock: self.metric_buffer.compute_value() - mark_inactive_mock.assert_called_once_with() + mark_inactive_mock.assert_called_once_with(600) @patch("time.time", new=Mock(return_value=600)) @patch("carbon.state.events.metricGenerated", new=Mock()) @@ -129,7 +129,7 @@ def test_compute_value_computes_aggregate(self): def test_compute_value_skips_inactive_buffers(self, metric_generated_mock): interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer self.metric_buffer.compute_value() @@ -177,26 +177,46 @@ def test_compute_value_deletes_expired_buffers(self): interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer # 2nd interval for current time interval_buffer = IntervalBuffer(current_interval) interval_buffer.input((current_interval, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(current_interval) self.metric_buffer.interval_buffers[current_interval] = interval_buffer with patch("time.time", new=Mock(return_value=current_interval + 60)): self.metric_buffer.compute_value() self.assertFalse(600 in self.metric_buffer.interval_buffers) + @patch("carbon.state.events.metricGenerated") + def test_compute_value_deletes_too_many_buffers(self, metric_generated_mock): + from carbon.conf import settings + + # We should keep 2 more than MAX_AGGREGATION_INTERVALS. + calls = [] + for i in range(settings['MAX_AGGREGATION_INTERVALS'] + 4): + interval = 600 + i * 60 + interval_buffer = IntervalBuffer(interval) + interval_buffer.input((interval, 1.0)) + self.metric_buffer.interval_buffers[interval] = interval_buffer + calls = [call("carbon.foo.bar", (interval, 1.0))] + + with patch("time.time", new=Mock(return_value=600)): + self.metric_buffer.compute_value() + metric_generated_mock.assert_has_calls(calls) + self.assertFalse(600 in self.metric_buffer.interval_buffers) + self.assertFalse(660 in self.metric_buffer.interval_buffers) + self.assertTrue(720 in self.metric_buffer.interval_buffers) + def test_compute_value_closes_metric_if_last_buffer_deleted(self): from carbon.conf import settings current_interval = 600 + 60 * settings['MAX_AGGREGATION_INTERVALS'] interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer BufferManager.buffers['carbon.foo.bar'] = self.metric_buffer @@ -211,7 +231,7 @@ def test_compute_value_unregisters_metric_if_last_buffer_deleted(self): interval_buffer = IntervalBuffer(600) interval_buffer.input((600, 1.0)) - interval_buffer.mark_inactive() + interval_buffer.mark_inactive(600) self.metric_buffer.interval_buffers[600] = interval_buffer BufferManager.buffers['carbon.foo.bar'] = self.metric_buffer