|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
3 | 3 | import io
|
| 4 | +import logging |
| 5 | +import zlib |
4 | 6 | from base64 import b64decode, b64encode
|
5 | 7 | from copy import deepcopy
|
6 | 8 | from datetime import datetime, timezone
|
@@ -86,8 +88,29 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
|
86 | 88 | max_futures=settings.SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT,
|
87 | 89 | )
|
88 | 90 |
|
| 91 | +logger = logging.getLogger(__name__) |
89 | 92 |
|
90 |
| -def decode_payload(encoded: str) -> dict[str, Any]: |
| 93 | + |
| 94 | +def decode_payload(encoded: str | bytes) -> dict[str, Any]: |
| 95 | + if isinstance(encoded, bytes): |
| 96 | + return msgpack.unpackb(encoded, use_list=False) |
| 97 | + |
| 98 | + # It's been b64encoded for taskworker |
| 99 | + |
| 100 | + # compressed |
| 101 | + if encoded.startswith("!"): |
| 102 | + try: |
| 103 | + res = msgpack.unpackb( |
| 104 | + zlib.decompress(b64decode(encoded[1:].encode("utf-8"))), use_list=False |
| 105 | + ) |
| 106 | + metrics.incr("profiling.profile_metrics.decompress", tags={"status": "ok"}) |
| 107 | + return res |
| 108 | + except Exception as e: |
| 109 | + logger.exception("Failed to decompress compressed profile", extra={"error": e}) |
| 110 | + metrics.incr("profiling.profile_metrics.decompress", tags={"status": "err"}) |
| 111 | + raise |
| 112 | + |
| 113 | + # not compressed |
91 | 114 | return msgpack.unpackb(b64decode(encoded.encode("utf-8")), use_list=False)
|
92 | 115 |
|
93 | 116 |
|
@@ -125,10 +148,7 @@ def process_profile_task(
|
125 | 148 | return
|
126 | 149 |
|
127 | 150 | if payload:
|
128 |
| - if isinstance(payload, str): # It's been b64encoded for taskworker |
129 |
| - message_dict = decode_payload(payload) |
130 |
| - else: |
131 |
| - message_dict = msgpack.unpackb(payload, use_list=False) |
| 151 | + message_dict = decode_payload(payload) |
132 | 152 |
|
133 | 153 | profile = json.loads(message_dict["payload"], use_rapid_json=True)
|
134 | 154 |
|
|
0 commit comments