From 96443004bddb7424e1b9302dc231567fe2c3b2ef Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 23 May 2025 12:55:42 +0200 Subject: [PATCH 1/2] ref(span-buffer/flusher): More metrics and more lenient backpressure. --- src/sentry/options/defaults.py | 5 ++ src/sentry/spans/buffer.py | 13 ++- src/sentry/spans/consumers/process/flusher.py | 80 +++++++++++-------- .../spans/consumers/process/test_flusher.py | 2 +- 4 files changed, 60 insertions(+), 40 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 558071745a5d5d..17ec2a416ea1d5 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2663,6 +2663,11 @@ default=False, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "standalone-spans.buffer.flusher.backpressure_seconds", + default=10, + flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, +) register( "indexed-spans.agg-span-waterfall.enable", default=False, diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index b60ccea8455ffb..7f6088214d3a5f 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -344,15 +344,17 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl result = p.execute() - segment_keys: list[tuple[QueueKey, SegmentKey]] = [] + segment_keys: list[tuple[int, QueueKey, SegmentKey]] = [] with metrics.timer("spans.buffer.flush_segments.load_segment_data"): with self.client.pipeline(transaction=False) as p: # ZRANGEBYSCORE output - for queue_key, segment_span_ids in zip(queue_keys, result): + for shard, queue_key, segment_span_ids in zip( + self.assigned_shards, queue_keys, result + ): # process return value of zrevrangebyscore for segment_key in segment_span_ids: - segment_keys.append((queue_key, segment_key)) + segment_keys.append((shard, queue_key, segment_key)) p.smembers(segment_key) segments = p.execute() @@ -361,7 +363,7 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl num_has_root_spans = 0 - for (queue_key, segment_key), segment in zip(segment_keys, segments): + for (shard, queue_key, segment_key), segment in zip(segment_keys, segments): segment_span_id = _segment_key_to_span_id(segment_key).decode("ascii") output_spans = [] @@ -396,6 +398,9 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl output_spans.append(OutputSpan(payload=val)) + metrics.incr( + "spans.buffer.flush_segments.num_segments_per_shard", tags={"shard_i": shard} + ) return_segments[segment_key] = FlushedSegment(queue_key=queue_key, spans=output_spans) num_has_root_spans += int(has_root_span) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index 914f644fd065d6..b1b878d0398e81 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -11,6 +11,7 @@ from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy from arroyo.types import FilteredPayload, Message +from sentry import options from sentry.conf.types.kafka_definition import Topic from sentry.spans.buffer import SpansBuffer from sentry.utils import metrics @@ -51,7 +52,7 @@ def __init__( self.stopped = multiprocessing.Value("i", 0) self.redis_was_full = False self.current_drift = multiprocessing.Value("i", 0) - self.should_backpressure = multiprocessing.Value("i", 0) + self.backpressure_since = multiprocessing.Value("i", 0) self.produce_to_pipe = produce_to_pipe self._create_process() @@ -73,7 +74,7 @@ def _create_process(self): initializer, self.stopped, self.current_drift, - self.should_backpressure, + self.backpressure_since, self.buffer, self.max_flush_segments, self.produce_to_pipe, @@ -89,7 +90,7 @@ def main( initializer: Callable | None, stopped, current_drift, - should_backpressure, + backpressure_since, buffer: SpansBuffer, max_flush_segments: int, produce_to_pipe: Callable[[KafkaPayload], None] | None, @@ -118,41 +119,50 @@ def produce(payload: KafkaPayload) -> None: producer_futures.append(producer.produce(topic, payload)) while not stopped.value: - now = int(time.time()) + current_drift.value - flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now) + with metrics.timer("spans.buffer.flusher.loop_body"): + now = int(time.time()) + current_drift.value + flushed_segments = buffer.flush_segments( + max_segments=max_flush_segments, now=now + ) - should_backpressure.value = len(flushed_segments) >= max_flush_segments * len( - buffer.assigned_shards - ) + if len(flushed_segments) >= max_flush_segments * len(buffer.assigned_shards): + if backpressure_since.value == 0: + backpressure_since.value = int(time.time()) + else: + backpressure_since.value = 0 - if not flushed_segments: - time.sleep(1) - continue - - for _, flushed_segment in flushed_segments.items(): - if not flushed_segment.spans: - # This is a bug, most likely the input topic is not - # partitioned by trace_id so multiple consumers are writing - # over each other. The consequence is duplicated segments, - # worst-case. - metrics.incr("sentry.spans.buffer.empty_segments") + if not flushed_segments: + time.sleep(1) continue - spans = [span.payload for span in flushed_segment.spans] + with metrics.timer("spans.buffer.flusher.produce"): + for _, flushed_segment in flushed_segments.items(): + if not flushed_segment.spans: + # This is a bug, most likely the input topic is not + # partitioned by trace_id so multiple consumers are writing + # over each other. The consequence is duplicated segments, + # worst-case. + metrics.incr("sentry.spans.buffer.empty_segments") + continue - kafka_payload = KafkaPayload( - None, rapidjson.dumps({"spans": spans}).encode("utf8"), [] - ) + spans = [span.payload for span in flushed_segment.spans] - metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) - produce(kafka_payload) + kafka_payload = KafkaPayload( + None, rapidjson.dumps({"spans": spans}).encode("utf8"), [] + ) - for future in producer_futures: - future.result() + metrics.timing( + "spans.buffer.segment_size_bytes", len(kafka_payload.value) + ) + produce(kafka_payload) - producer_futures.clear() + with metrics.timer("spans.buffer.flusher.wait_produce"): + for future in producer_futures: + future.result() - buffer.done_flush_segments(flushed_segments) + producer_futures.clear() + + buffer.done_flush_segments(flushed_segments) if producer is not None: producer.close() @@ -187,12 +197,12 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: # efforts, it is still always going to be less durable than Kafka. # Minimizing our Redis memory usage also makes COGS easier to reason # about. - # - # should_backpressure is true if there are many segments to flush, but - # the flusher can't get all of them out. - if self.should_backpressure.value: - metrics.incr("sentry.spans.buffer.flusher.backpressure") - raise MessageRejected() + if self.backpressure_since.value > 0: + if int(time.time()) - self.backpressure_since.value > options.get( + "standalone-spans.buffer.flusher.backpressure_seconds" + ): + metrics.incr("sentry.spans.buffer.flusher.backpressure") + raise MessageRejected() # We set the drift. The backpressure based on redis memory comes after. # If Redis is full for a long time, the drift will grow into a large diff --git a/tests/sentry/spans/consumers/process/test_flusher.py b/tests/sentry/spans/consumers/process/test_flusher.py index 4654865f8284fd..49f9982dcb29cc 100644 --- a/tests/sentry/spans/consumers/process/test_flusher.py +++ b/tests/sentry/spans/consumers/process/test_flusher.py @@ -80,4 +80,4 @@ def append(msg): assert messages - assert flusher.should_backpressure.value + assert flusher.backpressure_since.value From cb34e6ef282d9377c8a48f74994ed642a189da79 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 23 May 2025 13:53:50 +0200 Subject: [PATCH 2/2] address review feedbakc --- src/sentry/spans/consumers/process/flusher.py | 83 +++++++++---------- 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index b1b878d0398e81..be4129875d8e65 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -4,7 +4,7 @@ import time from collections.abc import Callable -import rapidjson +import orjson import sentry_sdk from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration @@ -119,50 +119,43 @@ def produce(payload: KafkaPayload) -> None: producer_futures.append(producer.produce(topic, payload)) while not stopped.value: - with metrics.timer("spans.buffer.flusher.loop_body"): - now = int(time.time()) + current_drift.value - flushed_segments = buffer.flush_segments( - max_segments=max_flush_segments, now=now - ) - - if len(flushed_segments) >= max_flush_segments * len(buffer.assigned_shards): - if backpressure_since.value == 0: - backpressure_since.value = int(time.time()) - else: - backpressure_since.value = 0 - - if not flushed_segments: - time.sleep(1) - continue - - with metrics.timer("spans.buffer.flusher.produce"): - for _, flushed_segment in flushed_segments.items(): - if not flushed_segment.spans: - # This is a bug, most likely the input topic is not - # partitioned by trace_id so multiple consumers are writing - # over each other. The consequence is duplicated segments, - # worst-case. - metrics.incr("sentry.spans.buffer.empty_segments") - continue - - spans = [span.payload for span in flushed_segment.spans] - - kafka_payload = KafkaPayload( - None, rapidjson.dumps({"spans": spans}).encode("utf8"), [] - ) - - metrics.timing( - "spans.buffer.segment_size_bytes", len(kafka_payload.value) - ) - produce(kafka_payload) - - with metrics.timer("spans.buffer.flusher.wait_produce"): - for future in producer_futures: - future.result() - - producer_futures.clear() - - buffer.done_flush_segments(flushed_segments) + now = int(time.time()) + current_drift.value + flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now) + + if len(flushed_segments) >= max_flush_segments * len(buffer.assigned_shards): + if backpressure_since.value == 0: + backpressure_since.value = int(time.time()) + else: + backpressure_since.value = 0 + + if not flushed_segments: + time.sleep(1) + continue + + with metrics.timer("spans.buffer.flusher.produce"): + for _, flushed_segment in flushed_segments.items(): + if not flushed_segment.spans: + # This is a bug, most likely the input topic is not + # partitioned by trace_id so multiple consumers are writing + # over each other. The consequence is duplicated segments, + # worst-case. + metrics.incr("sentry.spans.buffer.empty_segments") + continue + + spans = [span.payload for span in flushed_segment.spans] + + kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) + + metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) + produce(kafka_payload) + + with metrics.timer("spans.buffer.flusher.wait_produce"): + for future in producer_futures: + future.result() + + producer_futures.clear() + + buffer.done_flush_segments(flushed_segments) if producer is not None: producer.close()