diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 06046bdab211a2..c94db6cb701a3e 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -135,6 +135,7 @@ def env( SENTRY_METRIC_META_REDIS_CLUSTER = "default" SENTRY_ESCALATION_THRESHOLDS_REDIS_CLUSTER = "default" SENTRY_SPAN_BUFFER_CLUSTER = "default" +SENTRY_ASSEMBLE_CLUSTER = "default" # Hosts that are allowed to use system token authentication. # http://en.wikipedia.org/wiki/Reserved_IP_addresses diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index ac39b1a3a94c1e..0eb45b0b25ab23 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2298,6 +2298,10 @@ flags=FLAG_BOOL | FLAG_AUTOMATOR_MODIFIABLE, ) + +# Switch to read assemble status from Redis instead of memcache +register("assemble.read_from_redis", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE) + # Sampling rates for testing Rust-based grouping enhancers # Rate at which to run the Rust implementation of `assemble_stacktrace_component` diff --git a/src/sentry/tasks/assemble.py b/src/sentry/tasks/assemble.py index dcfdde8895f3bd..cce0b046e1394a 100644 --- a/src/sentry/tasks/assemble.py +++ b/src/sentry/tasks/assemble.py @@ -6,9 +6,11 @@ from abc import ABC, abstractmethod from datetime import datetime from os import path -from typing import IO, Generic, NamedTuple, Protocol, TypeVar +from typing import IO, TYPE_CHECKING, Generic, NamedTuple, Protocol, TypeVar +import orjson import sentry_sdk +from django.conf import settings from django.db import IntegrityError, router from django.db.models import Q from django.utils import timezone @@ -39,13 +41,16 @@ from sentry.models.releasefile import ReleaseArchive, ReleaseFile, update_artifact_index from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task -from sentry.utils import metrics +from sentry.utils import metrics, redis from sentry.utils.db import atomic_transaction from sentry.utils.files import get_max_file_size from sentry.utils.sdk import bind_organization_context, configure_scope logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from rediscluster import RedisCluster + class ChunkFileState: OK = "ok" # File in database @@ -164,12 +169,18 @@ def _get_cache_key(task, scope, checksum): % ( str(scope).encode("ascii"), checksum.encode("ascii"), - str(task).encode("utf-8"), + str(task).encode(), ) ).hexdigest() ) +def _get_redis_cluster_for_assemble() -> RedisCluster: + cluster_key = settings.SENTRY_ASSEMBLE_CLUSTER + return redis.redis_clusters.get(cluster_key) # type: ignore[return-value] + + +@sentry_sdk.tracing.trace def get_assemble_status(task, scope, checksum): """ Checks the current status of an assembling task. @@ -179,26 +190,43 @@ def get_assemble_status(task, scope, checksum): notice or error message. """ cache_key = _get_cache_key(task, scope, checksum) - rv = default_cache.get(cache_key) + + if options.get("assemble.read_from_redis"): + client = _get_redis_cluster_for_assemble() + rv = client.get(cache_key) + + # It is stored as bytes with [state, detail] on Redis. + if rv: + [state, detail] = orjson.loads(rv) + rv = (state, detail) + else: + rv = default_cache.get(cache_key) + if rv is None: return None, None return tuple(rv) +@sentry_sdk.tracing.trace def set_assemble_status(task, scope, checksum, state, detail=None): """ Updates the status of an assembling task. It is cached for 10 minutes. """ cache_key = _get_cache_key(task, scope, checksum) default_cache.set(cache_key, (state, detail), 600) + redis_client = _get_redis_cluster_for_assemble() + redis_client.set(name=cache_key, value=orjson.dumps([state, detail]), ex=600) +@sentry_sdk.tracing.trace def delete_assemble_status(task, scope, checksum): """ Deletes the status of an assembling task. """ cache_key = _get_cache_key(task, scope, checksum) default_cache.delete(cache_key) + redis_client = _get_redis_cluster_for_assemble() + redis_client.delete(cache_key) @instrumented_task( diff --git a/src/sentry/testutils/helpers/redis.py b/src/sentry/testutils/helpers/redis.py index 1537f02ade42f9..88da4e737e106c 100644 --- a/src/sentry/testutils/helpers/redis.py +++ b/src/sentry/testutils/helpers/redis.py @@ -12,6 +12,7 @@ def use_redis_cluster( cluster_id: str = "cluster", high_watermark: int = 100, with_settings: dict[str, Any] | None = None, + with_options: dict[str, Any] | None = None, ) -> Generator[None, None, None]: # Cluster id needs to be different than "default" to distinguish redis instance with redis cluster. @@ -32,6 +33,9 @@ def use_redis_cluster( }, } + if with_options: + options.update(with_options) + settings = dict(with_settings or {}) settings["SENTRY_PROCESSING_SERVICES"] = {"redis": {"redis": cluster_id}} diff --git a/tests/sentry/tasks/test_assemble.py b/tests/sentry/tasks/test_assemble.py index 786d1acd838976..0393c40836664f 100644 --- a/tests/sentry/tasks/test_assemble.py +++ b/tests/sentry/tasks/test_assemble.py @@ -1,5 +1,6 @@ import io import os +import uuid from datetime import UTC, datetime, timedelta from hashlib import sha1 from unittest import mock @@ -28,10 +29,13 @@ assemble_artifacts, assemble_dif, assemble_file, + delete_assemble_status, get_assemble_status, + set_assemble_status, ) from sentry.testutils.cases import TestCase from sentry.testutils.helpers.datetime import freeze_time +from sentry.testutils.helpers.redis import use_redis_cluster class BaseAssembleTest(TestCase): @@ -1047,3 +1051,24 @@ def test_index_if_needed_with_newer_bundle_already_stored( organization_id=self.organization.id, artifact_bundles=[(artifact_bundle_1, mock.ANY)], ) + + +@use_redis_cluster(with_options={"assemble.read_from_redis": True}) +def test_redis_assemble_status(): + task = AssembleTask.DIF + project_id = uuid.uuid4().hex + checksum = uuid.uuid4().hex + + # If it doesn't exist, it should return correct values. + assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (None, None) + + # Test setter + set_assemble_status(task, project_id, checksum, ChunkFileState.CREATED, detail="cylons") + assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == ( + "created", + "cylons", + ) + + # Deleting should actually delete it. + delete_assemble_status(task, project_id, checksum=checksum) + assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (None, None)