diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 6ef2a4ce02d6a0..558071745a5d5d 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3113,6 +3113,13 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "taskworker.try_compress.profile_metrics.rollout", + default=0.0, + type=Float, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) + # Taskbroker flags register( "taskworker.try_compress.profile_metrics.level", diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index f87ff191518d2e..2b21cdc2da4b61 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -18,32 +18,39 @@ def process_message(message: Message[KafkaPayload]) -> None: sampled = is_sampled(message.payload.headers) if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - b64encoded = b64encode(message.payload.value).decode("utf-8") - process_profile_task.delay(payload=b64encoded, sampled=sampled, compressed_profile=False) + b64encoded_uncompressed = b64encode(message.payload.value).decode("utf-8") - if random.random() < options.get("taskworker.try_compress.profile_metrics"): + if random.random() < options.get("taskworker.try_compress.profile_metrics.rollout"): import time import zlib - metrics.distribution("profiling.profile_metrics.uncompressed_bytes", len(b64encoded)) + metrics.distribution( + "profiling.profile_metrics.uncompressed_bytes", len(b64encoded_uncompressed) + ) start_time = time.perf_counter() + b64encoded_compressed = b64encode( + zlib.compress( + message.payload.value, + level=options.get("taskworker.try_compress.profile_metrics.level"), + ) + ) metrics.distribution( "profiling.profile_metrics.compressed_bytes", - len( - b64encode( - zlib.compress( - message.payload.value, - level=options.get("taskworker.try_compress.profile_metrics.level"), - ) - ) - ), + len(b64encoded_compressed), ) end_time = time.perf_counter() metrics.distribution( "profiling.profile_metrics.compression_time", end_time - start_time, ) + process_profile_task.delay( + payload=b64encoded_compressed, sampled=sampled, compressed_profile=True + ) + else: + process_profile_task.delay( + payload=b64encoded_uncompressed, sampled=sampled, compressed_profile=False + ) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): diff --git a/tests/sentry/processing/backpressure/test_checking.py b/tests/sentry/processing/backpressure/test_checking.py index 19bba3612c8274..6fa411bc9f7fef 100644 --- a/tests/sentry/processing/backpressure/test_checking.py +++ b/tests/sentry/processing/backpressure/test_checking.py @@ -30,7 +30,7 @@ "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": True, "backpressure.status_ttl": 60, - "taskworker.try_compress.profile_metrics": False, + "taskworker.try_compress.profile_metrics.rollout": 0, } ) def test_backpressure_unhealthy_profiles(): @@ -54,7 +54,7 @@ def test_backpressure_unhealthy_profiles(): "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": False, "backpressure.status_ttl": 60, - "taskworker.try_compress.profile_metrics": False, + "taskworker.try_compress.profile_metrics.rollout": 0, } ) def test_bad_config(): @@ -69,7 +69,7 @@ def test_bad_config(): "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": True, "backpressure.status_ttl": 60, - "taskworker.try_compress.profile_metrics": False, + "taskworker.try_compress.profile_metrics.rollout": 0, } ) def test_backpressure_healthy_profiles(process_profile_task): @@ -141,7 +141,7 @@ def test_backpressure_healthy_events(preprocess_event): { "backpressure.checking.enabled": False, "backpressure.checking.interval": 5, - "taskworker.try_compress.profile_metrics": False, + "taskworker.try_compress.profile_metrics.rollout": 0, } ) def test_backpressure_not_enabled(process_profile_task):