Skip to content

Commit b75f430

Browse files
committed
tele(taskbroker): check zlib compression viability
1 parent e198bbe commit b75f430

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

src/sentry/options/defaults.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3107,6 +3107,12 @@
31073107
)
31083108

31093109
# Taskbroker flags
3110+
register(
3111+
"taskworker.try_compress.profile_metrics",
3112+
default=0.0,
3113+
type=Float,
3114+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3115+
)
31103116

31113117
register(
31123118
"taskworker.route.overrides",

src/sentry/profiles/consumers/process/factory.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
from base64 import b64encode
23
from collections.abc import Iterable, Mapping
34

@@ -10,15 +11,31 @@
1011
from sentry import options
1112
from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step
1213
from sentry.profiles.task import process_profile_task
14+
from sentry.utils import metrics
1315

1416

1517
def process_message(message: Message[KafkaPayload]) -> None:
1618
sampled = is_sampled(message.payload.headers)
1719

1820
if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
19-
process_profile_task.delay(
20-
payload=b64encode(message.payload.value).decode("utf-8"), sampled=sampled
21-
)
21+
b64encoded = b64encode(message.payload.value).decode("utf-8")
22+
process_profile_task.delay(payload=b64encoded, sampled=sampled)
23+
metrics.distribution("profiling.profile_metrics.uncompressed_bytes", len(b64encoded))
24+
25+
if random.random() < options.get("taskworker.try_compress.profile_metrics"):
26+
import time
27+
import zlib
28+
29+
start_time = time.perf_counter()
30+
metrics.distribution(
31+
"profiling.profile_metrics.compressed_bytes",
32+
len(b64encode(zlib.compress(message.payload.value))),
33+
)
34+
end_time = time.perf_counter()
35+
metrics.distribution(
36+
"profiling.profile_metrics.compression_time",
37+
end_time - start_time,
38+
)
2239

2340

2441
class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):

0 commit comments

Comments
 (0)