-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
ref(span-buffer/flusher): More metrics and more lenient backpressure. #92195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy | ||
from arroyo.types import FilteredPayload, Message | ||
|
||
from sentry import options | ||
from sentry.conf.types.kafka_definition import Topic | ||
from sentry.spans.buffer import SpansBuffer | ||
from sentry.utils import metrics | ||
|
@@ -51,7 +52,7 @@ def __init__( | |
self.stopped = multiprocessing.Value("i", 0) | ||
self.redis_was_full = False | ||
self.current_drift = multiprocessing.Value("i", 0) | ||
self.should_backpressure = multiprocessing.Value("i", 0) | ||
self.backpressure_since = multiprocessing.Value("i", 0) | ||
self.produce_to_pipe = produce_to_pipe | ||
|
||
self._create_process() | ||
|
@@ -73,7 +74,7 @@ def _create_process(self): | |
initializer, | ||
self.stopped, | ||
self.current_drift, | ||
self.should_backpressure, | ||
self.backpressure_since, | ||
self.buffer, | ||
self.max_flush_segments, | ||
self.produce_to_pipe, | ||
|
@@ -89,7 +90,7 @@ def main( | |
initializer: Callable | None, | ||
stopped, | ||
current_drift, | ||
should_backpressure, | ||
backpressure_since, | ||
buffer: SpansBuffer, | ||
max_flush_segments: int, | ||
produce_to_pipe: Callable[[KafkaPayload], None] | None, | ||
|
@@ -118,41 +119,50 @@ def produce(payload: KafkaPayload) -> None: | |
producer_futures.append(producer.produce(topic, payload)) | ||
|
||
while not stopped.value: | ||
now = int(time.time()) + current_drift.value | ||
flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now) | ||
with metrics.timer("spans.buffer.flusher.loop_body"): | ||
now = int(time.time()) + current_drift.value | ||
flushed_segments = buffer.flush_segments( | ||
max_segments=max_flush_segments, now=now | ||
) | ||
|
||
should_backpressure.value = len(flushed_segments) >= max_flush_segments * len( | ||
buffer.assigned_shards | ||
) | ||
if len(flushed_segments) >= max_flush_segments * len(buffer.assigned_shards): | ||
if backpressure_since.value == 0: | ||
backpressure_since.value = int(time.time()) | ||
else: | ||
backpressure_since.value = 0 | ||
|
||
if not flushed_segments: | ||
time.sleep(1) | ||
continue | ||
|
||
for _, flushed_segment in flushed_segments.items(): | ||
if not flushed_segment.spans: | ||
# This is a bug, most likely the input topic is not | ||
# partitioned by trace_id so multiple consumers are writing | ||
# over each other. The consequence is duplicated segments, | ||
# worst-case. | ||
metrics.incr("sentry.spans.buffer.empty_segments") | ||
if not flushed_segments: | ||
time.sleep(1) | ||
continue | ||
|
||
spans = [span.payload for span in flushed_segment.spans] | ||
with metrics.timer("spans.buffer.flusher.produce"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't the call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's mostly about the dumps, yeah. spawning the futures and everything that is necessary. open to suggestions to rename the metric.
|
||
for _, flushed_segment in flushed_segments.items(): | ||
if not flushed_segment.spans: | ||
# This is a bug, most likely the input topic is not | ||
# partitioned by trace_id so multiple consumers are writing | ||
# over each other. The consequence is duplicated segments, | ||
# worst-case. | ||
metrics.incr("sentry.spans.buffer.empty_segments") | ||
continue | ||
|
||
kafka_payload = KafkaPayload( | ||
None, rapidjson.dumps({"spans": spans}).encode("utf8"), [] | ||
) | ||
spans = [span.payload for span in flushed_segment.spans] | ||
|
||
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | ||
produce(kafka_payload) | ||
kafka_payload = KafkaPayload( | ||
None, rapidjson.dumps({"spans": spans}).encode("utf8"), [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While we're at it - most other code uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
) | ||
|
||
for future in producer_futures: | ||
future.result() | ||
metrics.timing( | ||
"spans.buffer.segment_size_bytes", len(kafka_payload.value) | ||
) | ||
produce(kafka_payload) | ||
|
||
producer_futures.clear() | ||
with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
for future in producer_futures: | ||
future.result() | ||
|
||
buffer.done_flush_segments(flushed_segments) | ||
producer_futures.clear() | ||
|
||
buffer.done_flush_segments(flushed_segments) | ||
|
||
if producer is not None: | ||
producer.close() | ||
|
@@ -187,12 +197,12 @@ def submit(self, message: Message[FilteredPayload | int]) -> None: | |
# efforts, it is still always going to be less durable than Kafka. | ||
# Minimizing our Redis memory usage also makes COGS easier to reason | ||
# about. | ||
# | ||
# should_backpressure is true if there are many segments to flush, but | ||
# the flusher can't get all of them out. | ||
if self.should_backpressure.value: | ||
metrics.incr("sentry.spans.buffer.flusher.backpressure") | ||
raise MessageRejected() | ||
if self.backpressure_since.value > 0: | ||
if int(time.time()) - self.backpressure_since.value > options.get( | ||
"standalone-spans.buffer.flusher.backpressure_seconds" | ||
): | ||
metrics.incr("sentry.spans.buffer.flusher.backpressure") | ||
raise MessageRejected() | ||
|
||
# We set the drift. The backpressure based on redis memory comes after. | ||
# If Redis is full for a long time, the drift will grow into a large | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This measurement includes the
time.sleep
below which seems unfortunate, since we're just idling - it doesn't tell us how fast the loop can actually process data. Otherwise, all the relevant measurements are already covered byflusher.produce
below. I'd opt to keep only the produce timings below, but not the entire loop.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done