From 01b0a77f30a6aebeab35d9a6d72b106ec3f9a630 Mon Sep 17 00:00:00 2001 From: John Yang Date: Wed, 14 May 2025 19:41:32 -0700 Subject: [PATCH] tele(taskbroker): check zlib compression viability --- src/sentry/options/defaults.py | 6 +++++ .../profiles/consumers/process/factory.py | 23 ++++++++++++++++--- .../processing/backpressure/test_checking.py | 4 ++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 9e3a2ec5aa5812..4f65cb1bbf93ec 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3107,6 +3107,12 @@ ) # Taskbroker flags +register( + "taskworker.try_compress.profile_metrics", + default=0.0, + type=Float, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) register( "taskworker.route.overrides", diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index 3799fc31712062..4144e11204f0f9 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -1,3 +1,4 @@ +import random from base64 import b64encode from collections.abc import Iterable, Mapping @@ -10,15 +11,31 @@ from sentry import options from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step from sentry.profiles.task import process_profile_task +from sentry.utils import metrics def process_message(message: Message[KafkaPayload]) -> None: sampled = is_sampled(message.payload.headers) if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - process_profile_task.delay( - payload=b64encode(message.payload.value).decode("utf-8"), sampled=sampled - ) + b64encoded = b64encode(message.payload.value).decode("utf-8") + process_profile_task.delay(payload=b64encoded, sampled=sampled) + metrics.distribution("profiling.profile_metrics.uncompressed_bytes", len(b64encoded)) + + if random.random() < options.get("taskworker.try_compress.profile_metrics"): + import time + import zlib + + start_time = time.perf_counter() + metrics.distribution( + "profiling.profile_metrics.compressed_bytes", + len(b64encode(zlib.compress(message.payload.value))), + ) + end_time = time.perf_counter() + metrics.distribution( + "profiling.profile_metrics.compression_time", + end_time - start_time, + ) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): diff --git a/tests/sentry/processing/backpressure/test_checking.py b/tests/sentry/processing/backpressure/test_checking.py index fe33455f4e6a66..19bba3612c8274 100644 --- a/tests/sentry/processing/backpressure/test_checking.py +++ b/tests/sentry/processing/backpressure/test_checking.py @@ -30,6 +30,7 @@ "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": True, "backpressure.status_ttl": 60, + "taskworker.try_compress.profile_metrics": False, } ) def test_backpressure_unhealthy_profiles(): @@ -53,6 +54,7 @@ def test_backpressure_unhealthy_profiles(): "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": False, "backpressure.status_ttl": 60, + "taskworker.try_compress.profile_metrics": False, } ) def test_bad_config(): @@ -67,6 +69,7 @@ def test_bad_config(): "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": True, "backpressure.status_ttl": 60, + "taskworker.try_compress.profile_metrics": False, } ) def test_backpressure_healthy_profiles(process_profile_task): @@ -138,6 +141,7 @@ def test_backpressure_healthy_events(preprocess_event): { "backpressure.checking.enabled": False, "backpressure.checking.interval": 5, + "taskworker.try_compress.profile_metrics": False, } ) def test_backpressure_not_enabled(process_profile_task):