-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
ref(span-buffer/flusher): More metrics and more lenient backpressure. #92195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## master #92195 +/- ##
=======================================
Coverage 87.93% 87.93%
=======================================
Files 10174 10174
Lines 583376 583385 +9
Branches 22596 22596
=======================================
+ Hits 512975 512989 +14
+ Misses 69949 69944 -5
Partials 452 452 |
@@ -118,41 +119,50 @@ def produce(payload: KafkaPayload) -> None: | |||
producer_futures.append(producer.produce(topic, payload)) | |||
|
|||
while not stopped.value: | |||
now = int(time.time()) + current_drift.value | |||
flushed_segments = buffer.flush_segments(max_segments=max_flush_segments, now=now) | |||
with metrics.timer("spans.buffer.flusher.loop_body"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This measurement includes the time.sleep
below which seems unfortunate, since we're just idling - it doesn't tell us how fast the loop can actually process data. Otherwise, all the relevant measurements are already covered by flusher.produce
below. I'd opt to keep only the produce timings below, but not the entire loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
continue | ||
|
||
spans = [span.payload for span in flushed_segment.spans] | ||
with metrics.timer("spans.buffer.flusher.produce"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the call to produce
fully async below, or can it block when the buffer is full?
Either way, a significant portion of this is the dumps
- Can we split this timing into serializing and producing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's mostly about the dumps, yeah. spawning the futures and everything that is necessary. open to suggestions to rename the metric.
wait_produce
is then joining on those futures
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | ||
produce(kafka_payload) | ||
kafka_payload = KafkaPayload( | ||
None, rapidjson.dumps({"spans": spans}).encode("utf8"), [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we're at it - most other code uses orjson
. Can you double-check and update this, if it's valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
VIEPF-30