Skip to content

ref(span-buffer/flusher): More metrics and more lenient backpressure. #92195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2663,6 +2663,11 @@
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"standalone-spans.buffer.flusher.backpressure_seconds",
default=10,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"indexed-spans.agg-span-waterfall.enable",
default=False,
Expand Down
13 changes: 9 additions & 4 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,17 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl

result = p.execute()

segment_keys: list[tuple[QueueKey, SegmentKey]] = []
segment_keys: list[tuple[int, QueueKey, SegmentKey]] = []

with metrics.timer("spans.buffer.flush_segments.load_segment_data"):
with self.client.pipeline(transaction=False) as p:
# ZRANGEBYSCORE output
for queue_key, segment_span_ids in zip(queue_keys, result):
for shard, queue_key, segment_span_ids in zip(
self.assigned_shards, queue_keys, result
):
# process return value of zrevrangebyscore
for segment_key in segment_span_ids:
segment_keys.append((queue_key, segment_key))
segment_keys.append((shard, queue_key, segment_key))
p.smembers(segment_key)

segments = p.execute()
Expand All @@ -361,7 +363,7 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl

num_has_root_spans = 0

for (queue_key, segment_key), segment in zip(segment_keys, segments):
for (shard, queue_key, segment_key), segment in zip(segment_keys, segments):
segment_span_id = _segment_key_to_span_id(segment_key).decode("ascii")

output_spans = []
Expand Down Expand Up @@ -396,6 +398,9 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl

output_spans.append(OutputSpan(payload=val))

metrics.incr(
"spans.buffer.flush_segments.num_segments_per_shard", tags={"shard_i": shard}
)
return_segments[segment_key] = FlushedSegment(queue_key=queue_key, spans=output_spans)
num_has_root_spans += int(has_root_span)

Expand Down
61 changes: 32 additions & 29 deletions src/sentry/spans/consumers/process/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import time
from collections.abc import Callable

import rapidjson
import orjson
import sentry_sdk
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy
from arroyo.types import FilteredPayload, Message

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.spans.buffer import SpansBuffer
from sentry.utils import metrics
Expand Down Expand Up @@ -51,7 +52,7 @@ def __init__(
self.stopped = multiprocessing.Value("i", 0)
self.redis_was_full = False
self.current_drift = multiprocessing.Value("i", 0)
self.should_backpressure = multiprocessing.Value("i", 0)
self.backpressure_since = multiprocessing.Value("i", 0)
self.produce_to_pipe = produce_to_pipe

self._create_process()
Expand All @@ -73,7 +74,7 @@ def _create_process(self):
initializer,
self.stopped,
self.current_drift,
self.should_backpressure,
self.backpressure_since,
self.buffer,
self.max_flush_segments,
self.produce_to_pipe,
Expand All @@ -89,7 +90,7 @@ def main(
initializer: Callable | None,
stopped,
current_drift,
should_backpressure,
backpressure_since,
buffer: SpansBuffer,
max_flush_segments: int,
produce_to_pipe: Callable[[KafkaPayload], None] | None,
Expand Down Expand Up @@ -121,34 +122,36 @@ def produce(payload: KafkaPayload) -> None:
now = int(time.time()) + current_drift.value
flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now)

should_backpressure.value = len(flushed_segments) >= max_flush_segments * len(
buffer.assigned_shards
)
if len(flushed_segments) >= max_flush_segments * len(buffer.assigned_shards):
if backpressure_since.value == 0:
backpressure_since.value = int(time.time())
else:
backpressure_since.value = 0

if not flushed_segments:
time.sleep(1)
continue

for _, flushed_segment in flushed_segments.items():
if not flushed_segment.spans:
# This is a bug, most likely the input topic is not
# partitioned by trace_id so multiple consumers are writing
# over each other. The consequence is duplicated segments,
# worst-case.
metrics.incr("sentry.spans.buffer.empty_segments")
continue
with metrics.timer("spans.buffer.flusher.produce"):
for _, flushed_segment in flushed_segments.items():
if not flushed_segment.spans:
# This is a bug, most likely the input topic is not
# partitioned by trace_id so multiple consumers are writing
# over each other. The consequence is duplicated segments,
# worst-case.
metrics.incr("sentry.spans.buffer.empty_segments")
continue

spans = [span.payload for span in flushed_segment.spans]
spans = [span.payload for span in flushed_segment.spans]

kafka_payload = KafkaPayload(
None, rapidjson.dumps({"spans": spans}).encode("utf8"), []
)
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])

metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
produce(kafka_payload)
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
produce(kafka_payload)

for future in producer_futures:
future.result()
with metrics.timer("spans.buffer.flusher.wait_produce"):
for future in producer_futures:
future.result()

producer_futures.clear()

Expand Down Expand Up @@ -187,12 +190,12 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
# efforts, it is still always going to be less durable than Kafka.
# Minimizing our Redis memory usage also makes COGS easier to reason
# about.
#
# should_backpressure is true if there are many segments to flush, but
# the flusher can't get all of them out.
if self.should_backpressure.value:
metrics.incr("sentry.spans.buffer.flusher.backpressure")
raise MessageRejected()
if self.backpressure_since.value > 0:
if int(time.time()) - self.backpressure_since.value > options.get(
"standalone-spans.buffer.flusher.backpressure_seconds"
):
metrics.incr("sentry.spans.buffer.flusher.backpressure")
raise MessageRejected()

# We set the drift. The backpressure based on redis memory comes after.
# If Redis is full for a long time, the drift will grow into a large
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/spans/consumers/process/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ def append(msg):

assert messages

assert flusher.should_backpressure.value
assert flusher.backpressure_since.value
Loading