Skip to content

ref(span-buffer/flusher): More metrics and more lenient backpressure. #92195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2663,6 +2663,11 @@
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"standalone-spans.buffer.flusher.backpressure_seconds",
default=10,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"indexed-spans.agg-span-waterfall.enable",
default=False,
Expand Down
13 changes: 9 additions & 4 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,17 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl

result = p.execute()

segment_keys: list[tuple[QueueKey, SegmentKey]] = []
segment_keys: list[tuple[int, QueueKey, SegmentKey]] = []

with metrics.timer("spans.buffer.flush_segments.load_segment_data"):
with self.client.pipeline(transaction=False) as p:
# ZRANGEBYSCORE output
for queue_key, segment_span_ids in zip(queue_keys, result):
for shard, queue_key, segment_span_ids in zip(
self.assigned_shards, queue_keys, result
):
# process return value of zrevrangebyscore
for segment_key in segment_span_ids:
segment_keys.append((queue_key, segment_key))
segment_keys.append((shard, queue_key, segment_key))
p.smembers(segment_key)

segments = p.execute()
Expand All @@ -361,7 +363,7 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl

num_has_root_spans = 0

for (queue_key, segment_key), segment in zip(segment_keys, segments):
for (shard, queue_key, segment_key), segment in zip(segment_keys, segments):
segment_span_id = _segment_key_to_span_id(segment_key).decode("ascii")

output_spans = []
Expand Down Expand Up @@ -396,6 +398,9 @@ def flush_segments(self, now: int, max_segments: int = 0) -> dict[SegmentKey, Fl

output_spans.append(OutputSpan(payload=val))

metrics.incr(
"spans.buffer.flush_segments.num_segments_per_shard", tags={"shard_i": shard}
)
return_segments[segment_key] = FlushedSegment(queue_key=queue_key, spans=output_spans)
num_has_root_spans += int(has_root_span)

Expand Down
80 changes: 45 additions & 35 deletions src/sentry/spans/consumers/process/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy
from arroyo.types import FilteredPayload, Message

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.spans.buffer import SpansBuffer
from sentry.utils import metrics
Expand Down Expand Up @@ -51,7 +52,7 @@ def __init__(
self.stopped = multiprocessing.Value("i", 0)
self.redis_was_full = False
self.current_drift = multiprocessing.Value("i", 0)
self.should_backpressure = multiprocessing.Value("i", 0)
self.backpressure_since = multiprocessing.Value("i", 0)
self.produce_to_pipe = produce_to_pipe

self._create_process()
Expand All @@ -73,7 +74,7 @@ def _create_process(self):
initializer,
self.stopped,
self.current_drift,
self.should_backpressure,
self.backpressure_since,
self.buffer,
self.max_flush_segments,
self.produce_to_pipe,
Expand All @@ -89,7 +90,7 @@ def main(
initializer: Callable | None,
stopped,
current_drift,
should_backpressure,
backpressure_since,
buffer: SpansBuffer,
max_flush_segments: int,
produce_to_pipe: Callable[[KafkaPayload], None] | None,
Expand Down Expand Up @@ -118,41 +119,50 @@ def produce(payload: KafkaPayload) -> None:
producer_futures.append(producer.produce(topic, payload))

while not stopped.value:
now = int(time.time()) + current_drift.value
flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now)
with metrics.timer("spans.buffer.flusher.loop_body"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This measurement includes the time.sleep below which seems unfortunate, since we're just idling - it doesn't tell us how fast the loop can actually process data. Otherwise, all the relevant measurements are already covered by flusher.produce below. I'd opt to keep only the produce timings below, but not the entire loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

now = int(time.time()) + current_drift.value
flushed_segments = buffer.flush_segments(
max_segments=max_flush_segments, now=now
)

should_backpressure.value = len(flushed_segments) >= max_flush_segments * len(
buffer.assigned_shards
)
if len(flushed_segments) >= max_flush_segments * len(buffer.assigned_shards):
if backpressure_since.value == 0:
backpressure_since.value = int(time.time())
else:
backpressure_since.value = 0

if not flushed_segments:
time.sleep(1)
continue

for _, flushed_segment in flushed_segments.items():
if not flushed_segment.spans:
# This is a bug, most likely the input topic is not
# partitioned by trace_id so multiple consumers are writing
# over each other. The consequence is duplicated segments,
# worst-case.
metrics.incr("sentry.spans.buffer.empty_segments")
if not flushed_segments:
time.sleep(1)
continue

spans = [span.payload for span in flushed_segment.spans]
with metrics.timer("spans.buffer.flusher.produce"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the call to produce fully async below, or can it block when the buffer is full?
Either way, a significant portion of this is the dumps - Can we split this timing into serializing and producing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's mostly about the dumps, yeah. spawning the futures and everything that is necessary. open to suggestions to rename the metric.

wait_produce is then joining on those futures

for _, flushed_segment in flushed_segments.items():
if not flushed_segment.spans:
# This is a bug, most likely the input topic is not
# partitioned by trace_id so multiple consumers are writing
# over each other. The consequence is duplicated segments,
# worst-case.
metrics.incr("sentry.spans.buffer.empty_segments")
continue

kafka_payload = KafkaPayload(
None, rapidjson.dumps({"spans": spans}).encode("utf8"), []
)
spans = [span.payload for span in flushed_segment.spans]

metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
produce(kafka_payload)
kafka_payload = KafkaPayload(
None, rapidjson.dumps({"spans": spans}).encode("utf8"), []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're at it - most other code uses orjson. Can you double-check and update this, if it's valid?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

for future in producer_futures:
future.result()
metrics.timing(
"spans.buffer.segment_size_bytes", len(kafka_payload.value)
)
produce(kafka_payload)

producer_futures.clear()
with metrics.timer("spans.buffer.flusher.wait_produce"):
for future in producer_futures:
future.result()

buffer.done_flush_segments(flushed_segments)
producer_futures.clear()

buffer.done_flush_segments(flushed_segments)

if producer is not None:
producer.close()
Expand Down Expand Up @@ -187,12 +197,12 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
# efforts, it is still always going to be less durable than Kafka.
# Minimizing our Redis memory usage also makes COGS easier to reason
# about.
#
# should_backpressure is true if there are many segments to flush, but
# the flusher can't get all of them out.
if self.should_backpressure.value:
metrics.incr("sentry.spans.buffer.flusher.backpressure")
raise MessageRejected()
if self.backpressure_since.value > 0:
if int(time.time()) - self.backpressure_since.value > options.get(
"standalone-spans.buffer.flusher.backpressure_seconds"
):
metrics.incr("sentry.spans.buffer.flusher.backpressure")
raise MessageRejected()

# We set the drift. The backpressure based on redis memory comes after.
# If Redis is full for a long time, the drift will grow into a large
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/spans/consumers/process/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ def append(msg):

assert messages

assert flusher.should_backpressure.value
assert flusher.backpressure_since.value
Loading