Skip to content

Commit 4d39f18

Browse files
committed
feat(profiles): accept compressed data
1 parent ee2a56f commit 4d39f18

File tree

4 files changed

+26
-22
lines changed

4 files changed

+26
-22
lines changed

src/sentry/profiles/consumers/process/factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def process_message(message: Message[KafkaPayload]) -> None:
1919

2020
if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
2121
b64encoded = b64encode(message.payload.value).decode("utf-8")
22-
process_profile_task.delay(payload=b64encoded, sampled=sampled)
22+
process_profile_task.delay(payload=b64encoded, sampled=sampled, compressed_profile=False)
2323

2424
if random.random() < options.get("taskworker.try_compress.profile_metrics"):
2525
import time

src/sentry/profiles/task.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import annotations
22

33
import io
4+
import logging
5+
import zlib
46
from base64 import b64decode, b64encode
57
from copy import deepcopy
68
from datetime import datetime, timezone
@@ -86,8 +88,23 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
8688
max_futures=settings.SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT,
8789
)
8890

91+
logger = logging.getLogger(__name__)
8992

90-
def decode_payload(encoded: str) -> dict[str, Any]:
93+
94+
def decode_payload(encoded: str, compressed_profile: bool) -> dict[str, Any]:
95+
if compressed_profile:
96+
try:
97+
res = msgpack.unpackb(
98+
zlib.decompress(b64decode(encoded.encode("utf-8"))), use_list=False
99+
)
100+
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "ok"})
101+
return res
102+
except Exception as e:
103+
logger.exception("Failed to decompress compressed profile", extra={"error": e})
104+
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "err"})
105+
raise
106+
107+
# not compressed
91108
return msgpack.unpackb(b64decode(encoded.encode("utf-8")), use_list=False)
92109

93110

@@ -117,18 +134,16 @@ def encode_payload(message: dict[str, Any]) -> str:
117134
)
118135
def process_profile_task(
119136
profile: Profile | None = None,
120-
payload: str | bytes | None = None,
137+
payload: str | None = None,
121138
sampled: bool = True,
139+
compressed_profile: bool = False,
122140
**kwargs: Any,
123141
) -> None:
124142
if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
125143
return
126144

127145
if payload:
128-
if isinstance(payload, str): # It's been b64encoded for taskworker
129-
message_dict = decode_payload(payload)
130-
else:
131-
message_dict = msgpack.unpackb(payload, use_list=False)
146+
message_dict = decode_payload(payload, compressed_profile)
132147

133148
profile = json.loads(message_dict["payload"], use_rapid_json=True)
134149

tests/sentry/profiles/consumers/test_process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ def test_basic_profile_to_celery(self, process_profile_task):
5454
processing_strategy.terminate()
5555

5656
process_profile_task.assert_called_with(
57-
payload=b64encode(payload).decode("utf-8"), sampled=True
57+
payload=b64encode(payload).decode("utf-8"),
58+
sampled=True,
59+
compressed_profile=False,
5860
)
5961

6062

tests/sentry/profiles/test_task.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import Any
77
from unittest.mock import patch
88

9-
import msgpack
109
import pytest
1110
from django.core.files.uploadedfile import SimpleUploadedFile
1211
from django.urls import reverse
@@ -1033,19 +1032,11 @@ def test_unknown_sdk(
10331032
@patch("sentry.profiles.task._push_profile_to_vroom")
10341033
@patch("sentry.profiles.task._symbolicate_profile")
10351034
@patch("sentry.models.projectsdk.get_sdk_index")
1036-
@pytest.mark.parametrize(
1037-
["should_encode"],
1038-
[
1039-
(True,),
1040-
(False,),
1041-
],
1042-
)
10431035
@django_db_all
10441036
def test_track_latest_sdk_with_payload(
10451037
get_sdk_index: Any,
10461038
_symbolicate_profile: Any,
10471039
_push_profile_to_vroom: Any,
1048-
should_encode: bool,
10491040
organization: Organization,
10501041
project: Project,
10511042
request: Any,
@@ -1065,11 +1056,7 @@ def test_track_latest_sdk_with_payload(
10651056
"received": "2024-01-02T03:04:05",
10661057
"payload": json.dumps(profile),
10671058
}
1068-
payload: str | bytes
1069-
if should_encode:
1070-
payload = encode_payload(kafka_payload)
1071-
else:
1072-
payload = msgpack.packb(kafka_payload)
1059+
payload = encode_payload(kafka_payload)
10731060

10741061
with Feature("organizations:profiling-sdks"):
10751062
process_profile_task(payload=payload)

0 commit comments

Comments
 (0)