@@ -18,32 +18,39 @@ def process_message(message: Message[KafkaPayload]) -> None:
18
18
sampled = is_sampled (message .payload .headers )
19
19
20
20
if sampled or options .get ("profiling.profile_metrics.unsampled_profiles.enabled" ):
21
- b64encoded = b64encode (message .payload .value ).decode ("utf-8" )
22
- process_profile_task .delay (payload = b64encoded , sampled = sampled , compressed_profile = False )
21
+ b64encoded_uncompressed = b64encode (message .payload .value ).decode ("utf-8" )
23
22
24
- if random .random () < options .get ("taskworker.try_compress.profile_metrics" ):
23
+ if random .random () < options .get ("taskworker.try_compress.profile_metrics.rollout " ):
25
24
import time
26
25
import zlib
27
26
28
- metrics .distribution ("profiling.profile_metrics.uncompressed_bytes" , len (b64encoded ))
27
+ metrics .distribution (
28
+ "profiling.profile_metrics.uncompressed_bytes" , len (b64encoded_uncompressed )
29
+ )
29
30
30
31
start_time = time .perf_counter ()
32
+ b64encoded_compressed = b64encode (
33
+ zlib .compress (
34
+ message .payload .value ,
35
+ level = options .get ("taskworker.try_compress.profile_metrics.level" ),
36
+ )
37
+ )
31
38
metrics .distribution (
32
39
"profiling.profile_metrics.compressed_bytes" ,
33
- len (
34
- b64encode (
35
- zlib .compress (
36
- message .payload .value ,
37
- level = options .get ("taskworker.try_compress.profile_metrics.level" ),
38
- )
39
- )
40
- ),
40
+ len (b64encoded_compressed ),
41
41
)
42
42
end_time = time .perf_counter ()
43
43
metrics .distribution (
44
44
"profiling.profile_metrics.compression_time" ,
45
45
end_time - start_time ,
46
46
)
47
+ process_profile_task .delay (
48
+ payload = b64encoded_compressed , sampled = sampled , compressed_profile = True
49
+ )
50
+ else :
51
+ process_profile_task .delay (
52
+ payload = b64encoded_uncompressed , sampled = sampled , compressed_profile = False
53
+ )
47
54
48
55
49
56
class ProcessProfileStrategyFactory (ProcessingStrategyFactory [KafkaPayload ]):
0 commit comments