Skip to content

Commit 6dec121

Browse files
authored
feat(profiles): rollout data compression (#92133)
1 parent 7d1f684 commit 6dec121

File tree

3 files changed

+30
-16
lines changed

3 files changed

+30
-16
lines changed

src/sentry/options/defaults.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3113,6 +3113,13 @@
31133113
flags=FLAG_AUTOMATOR_MODIFIABLE,
31143114
)
31153115

3116+
register(
3117+
"taskworker.try_compress.profile_metrics.rollout",
3118+
default=0.0,
3119+
type=Float,
3120+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3121+
)
3122+
31163123
# Taskbroker flags
31173124
register(
31183125
"taskworker.try_compress.profile_metrics.level",

src/sentry/profiles/consumers/process/factory.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,39 @@ def process_message(message: Message[KafkaPayload]) -> None:
1818
sampled = is_sampled(message.payload.headers)
1919

2020
if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
21-
b64encoded = b64encode(message.payload.value).decode("utf-8")
22-
process_profile_task.delay(payload=b64encoded, sampled=sampled, compressed_profile=False)
21+
b64encoded_uncompressed = b64encode(message.payload.value).decode("utf-8")
2322

24-
if random.random() < options.get("taskworker.try_compress.profile_metrics"):
23+
if random.random() < options.get("taskworker.try_compress.profile_metrics.rollout"):
2524
import time
2625
import zlib
2726

28-
metrics.distribution("profiling.profile_metrics.uncompressed_bytes", len(b64encoded))
27+
metrics.distribution(
28+
"profiling.profile_metrics.uncompressed_bytes", len(b64encoded_uncompressed)
29+
)
2930

3031
start_time = time.perf_counter()
32+
b64encoded_compressed = b64encode(
33+
zlib.compress(
34+
message.payload.value,
35+
level=options.get("taskworker.try_compress.profile_metrics.level"),
36+
)
37+
)
3138
metrics.distribution(
3239
"profiling.profile_metrics.compressed_bytes",
33-
len(
34-
b64encode(
35-
zlib.compress(
36-
message.payload.value,
37-
level=options.get("taskworker.try_compress.profile_metrics.level"),
38-
)
39-
)
40-
),
40+
len(b64encoded_compressed),
4141
)
4242
end_time = time.perf_counter()
4343
metrics.distribution(
4444
"profiling.profile_metrics.compression_time",
4545
end_time - start_time,
4646
)
47+
process_profile_task.delay(
48+
payload=b64encoded_compressed, sampled=sampled, compressed_profile=True
49+
)
50+
else:
51+
process_profile_task.delay(
52+
payload=b64encoded_uncompressed, sampled=sampled, compressed_profile=False
53+
)
4754

4855

4956
class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):

tests/sentry/processing/backpressure/test_checking.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"backpressure.checking.interval": 5,
3131
"backpressure.monitoring.enabled": True,
3232
"backpressure.status_ttl": 60,
33-
"taskworker.try_compress.profile_metrics": False,
33+
"taskworker.try_compress.profile_metrics.rollout": 0,
3434
}
3535
)
3636
def test_backpressure_unhealthy_profiles():
@@ -54,7 +54,7 @@ def test_backpressure_unhealthy_profiles():
5454
"backpressure.checking.interval": 5,
5555
"backpressure.monitoring.enabled": False,
5656
"backpressure.status_ttl": 60,
57-
"taskworker.try_compress.profile_metrics": False,
57+
"taskworker.try_compress.profile_metrics.rollout": 0,
5858
}
5959
)
6060
def test_bad_config():
@@ -69,7 +69,7 @@ def test_bad_config():
6969
"backpressure.checking.interval": 5,
7070
"backpressure.monitoring.enabled": True,
7171
"backpressure.status_ttl": 60,
72-
"taskworker.try_compress.profile_metrics": False,
72+
"taskworker.try_compress.profile_metrics.rollout": 0,
7373
}
7474
)
7575
def test_backpressure_healthy_profiles(process_profile_task):
@@ -141,7 +141,7 @@ def test_backpressure_healthy_events(preprocess_event):
141141
{
142142
"backpressure.checking.enabled": False,
143143
"backpressure.checking.interval": 5,
144-
"taskworker.try_compress.profile_metrics": False,
144+
"taskworker.try_compress.profile_metrics.rollout": 0,
145145
}
146146
)
147147
def test_backpressure_not_enabled(process_profile_task):

0 commit comments

Comments
 (0)