4
4
import time
5
5
from collections .abc import Callable
6
6
7
- import rapidjson
7
+ import orjson
8
8
import sentry_sdk
9
9
from arroyo import Topic as ArroyoTopic
10
10
from arroyo .backends .kafka import KafkaPayload , KafkaProducer , build_kafka_configuration
11
11
from arroyo .processing .strategies .abstract import MessageRejected , ProcessingStrategy
12
12
from arroyo .types import FilteredPayload , Message
13
13
14
+ from sentry import options
14
15
from sentry .conf .types .kafka_definition import Topic
15
16
from sentry .spans .buffer import SpansBuffer
16
17
from sentry .utils import metrics
@@ -51,7 +52,7 @@ def __init__(
51
52
self .stopped = multiprocessing .Value ("i" , 0 )
52
53
self .redis_was_full = False
53
54
self .current_drift = multiprocessing .Value ("i" , 0 )
54
- self .should_backpressure = multiprocessing .Value ("i" , 0 )
55
+ self .backpressure_since = multiprocessing .Value ("i" , 0 )
55
56
self .produce_to_pipe = produce_to_pipe
56
57
57
58
self ._create_process ()
@@ -73,7 +74,7 @@ def _create_process(self):
73
74
initializer ,
74
75
self .stopped ,
75
76
self .current_drift ,
76
- self .should_backpressure ,
77
+ self .backpressure_since ,
77
78
self .buffer ,
78
79
self .max_flush_segments ,
79
80
self .produce_to_pipe ,
@@ -89,7 +90,7 @@ def main(
89
90
initializer : Callable | None ,
90
91
stopped ,
91
92
current_drift ,
92
- should_backpressure ,
93
+ backpressure_since ,
93
94
buffer : SpansBuffer ,
94
95
max_flush_segments : int ,
95
96
produce_to_pipe : Callable [[KafkaPayload ], None ] | None ,
@@ -121,34 +122,36 @@ def produce(payload: KafkaPayload) -> None:
121
122
now = int (time .time ()) + current_drift .value
122
123
flushed_segments = buffer .flush_segments (max_segments = max_flush_segments , now = now )
123
124
124
- should_backpressure .value = len (flushed_segments ) >= max_flush_segments * len (
125
- buffer .assigned_shards
126
- )
125
+ if len (flushed_segments ) >= max_flush_segments * len (buffer .assigned_shards ):
126
+ if backpressure_since .value == 0 :
127
+ backpressure_since .value = int (time .time ())
128
+ else :
129
+ backpressure_since .value = 0
127
130
128
131
if not flushed_segments :
129
132
time .sleep (1 )
130
133
continue
131
134
132
- for _ , flushed_segment in flushed_segments .items ():
133
- if not flushed_segment .spans :
134
- # This is a bug, most likely the input topic is not
135
- # partitioned by trace_id so multiple consumers are writing
136
- # over each other. The consequence is duplicated segments,
137
- # worst-case.
138
- metrics .incr ("sentry.spans.buffer.empty_segments" )
139
- continue
135
+ with metrics .timer ("spans.buffer.flusher.produce" ):
136
+ for _ , flushed_segment in flushed_segments .items ():
137
+ if not flushed_segment .spans :
138
+ # This is a bug, most likely the input topic is not
139
+ # partitioned by trace_id so multiple consumers are writing
140
+ # over each other. The consequence is duplicated segments,
141
+ # worst-case.
142
+ metrics .incr ("sentry.spans.buffer.empty_segments" )
143
+ continue
140
144
141
- spans = [span .payload for span in flushed_segment .spans ]
145
+ spans = [span .payload for span in flushed_segment .spans ]
142
146
143
- kafka_payload = KafkaPayload (
144
- None , rapidjson .dumps ({"spans" : spans }).encode ("utf8" ), []
145
- )
147
+ kafka_payload = KafkaPayload (None , orjson .dumps ({"spans" : spans }), [])
146
148
147
- metrics .timing ("spans.buffer.segment_size_bytes" , len (kafka_payload .value ))
148
- produce (kafka_payload )
149
+ metrics .timing ("spans.buffer.segment_size_bytes" , len (kafka_payload .value ))
150
+ produce (kafka_payload )
149
151
150
- for future in producer_futures :
151
- future .result ()
152
+ with metrics .timer ("spans.buffer.flusher.wait_produce" ):
153
+ for future in producer_futures :
154
+ future .result ()
152
155
153
156
producer_futures .clear ()
154
157
@@ -187,12 +190,12 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
187
190
# efforts, it is still always going to be less durable than Kafka.
188
191
# Minimizing our Redis memory usage also makes COGS easier to reason
189
192
# about.
190
- #
191
- # should_backpressure is true if there are many segments to flush, but
192
- # the flusher can't get all of them out.
193
- if self . should_backpressure . value :
194
- metrics .incr ("sentry.spans.buffer.flusher.backpressure" )
195
- raise MessageRejected ()
193
+ if self . backpressure_since . value > 0 :
194
+ if int ( time . time ()) - self . backpressure_since . value > options . get (
195
+ "standalone-spans.buffer.flusher.backpressure_seconds"
196
+ ) :
197
+ metrics .incr ("sentry.spans.buffer.flusher.backpressure" )
198
+ raise MessageRejected ()
196
199
197
200
# We set the drift. The backpressure based on redis memory comes after.
198
201
# If Redis is full for a long time, the drift will grow into a large
0 commit comments