Skip to content

Commit c6cafe6

Browse files
committed
feat(profiles): accept compressed data
1 parent ee2a56f commit c6cafe6

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

src/sentry/profiles/consumers/process/factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def process_message(message: Message[KafkaPayload]) -> None:
1919

2020
if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
2121
b64encoded = b64encode(message.payload.value).decode("utf-8")
22-
process_profile_task.delay(payload=b64encoded, sampled=sampled)
22+
process_profile_task.delay(payload=b64encoded, sampled=sampled, compressed_profile=False)
2323

2424
if random.random() < options.get("taskworker.try_compress.profile_metrics"):
2525
import time

src/sentry/profiles/task.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import annotations
22

33
import io
4+
import logging
5+
import zlib
46
from base64 import b64decode, b64encode
57
from copy import deepcopy
68
from datetime import datetime, timezone
@@ -86,8 +88,23 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
8688
max_futures=settings.SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT,
8789
)
8890

91+
logger = logging.getLogger(__name__)
8992

90-
def decode_payload(encoded: str) -> dict[str, Any]:
93+
94+
def decode_payload(encoded: str, compressed_profile: bool) -> dict[str, Any]:
95+
if compressed_profile:
96+
try:
97+
res = msgpack.unpackb(
98+
zlib.decompress(b64decode(encoded.encode("utf-8"))), use_list=False
99+
)
100+
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "ok"})
101+
return res
102+
except Exception as e:
103+
logger.exception("Failed to decompress compressed profile", extra={"error": e})
104+
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "err"})
105+
raise
106+
107+
# not compressed
91108
return msgpack.unpackb(b64decode(encoded.encode("utf-8")), use_list=False)
92109

93110

@@ -117,18 +134,16 @@ def encode_payload(message: dict[str, Any]) -> str:
117134
)
118135
def process_profile_task(
119136
profile: Profile | None = None,
120-
payload: str | bytes | None = None,
137+
payload: str | None = None,
121138
sampled: bool = True,
139+
compressed_profile: bool = False,
122140
**kwargs: Any,
123141
) -> None:
124142
if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
125143
return
126144

127145
if payload:
128-
if isinstance(payload, str): # It's been b64encoded for taskworker
129-
message_dict = decode_payload(payload)
130-
else:
131-
message_dict = msgpack.unpackb(payload, use_list=False)
146+
message_dict = decode_payload(payload, compressed_profile)
132147

133148
profile = json.loads(message_dict["payload"], use_rapid_json=True)
134149

0 commit comments

Comments
 (0)