Skip to content

perf: move assemble status to redis #70344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def env(
SENTRY_METRIC_META_REDIS_CLUSTER = "default"
SENTRY_ESCALATION_THRESHOLDS_REDIS_CLUSTER = "default"
SENTRY_SPAN_BUFFER_CLUSTER = "default"
SENTRY_ASSEMBLE_CLUSTER = "default"

# Hosts that are allowed to use system token authentication.
# http://en.wikipedia.org/wiki/Reserved_IP_addresses
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,10 @@
flags=FLAG_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
)


# Switch to read assemble status from Redis instead of memcache
register("assemble.read_from_redis", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE)

# Sampling rates for testing Rust-based grouping enhancers

# Rate at which to run the Rust implementation of `assemble_stacktrace_component`
Expand Down
36 changes: 32 additions & 4 deletions src/sentry/tasks/assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from abc import ABC, abstractmethod
from datetime import datetime
from os import path
from typing import IO, Generic, NamedTuple, Protocol, TypeVar
from typing import IO, TYPE_CHECKING, Generic, NamedTuple, Protocol, TypeVar

import orjson
import sentry_sdk
from django.conf import settings
from django.db import IntegrityError, router
from django.db.models import Q
from django.utils import timezone
Expand Down Expand Up @@ -39,13 +41,16 @@
from sentry.models.releasefile import ReleaseArchive, ReleaseFile, update_artifact_index
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils import metrics, redis
from sentry.utils.db import atomic_transaction
from sentry.utils.files import get_max_file_size
from sentry.utils.sdk import bind_organization_context, configure_scope

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from rediscluster import RedisCluster


class ChunkFileState:
OK = "ok" # File in database
Expand Down Expand Up @@ -164,12 +169,18 @@ def _get_cache_key(task, scope, checksum):
% (
str(scope).encode("ascii"),
checksum.encode("ascii"),
str(task).encode("utf-8"),
str(task).encode(),
)
).hexdigest()
)


def _get_redis_cluster_for_assemble() -> RedisCluster:
cluster_key = settings.SENTRY_ASSEMBLE_CLUSTER
return redis.redis_clusters.get(cluster_key) # type: ignore[return-value]


@sentry_sdk.tracing.trace
def get_assemble_status(task, scope, checksum):
"""
Checks the current status of an assembling task.
Expand All @@ -179,26 +190,43 @@ def get_assemble_status(task, scope, checksum):
notice or error message.
"""
cache_key = _get_cache_key(task, scope, checksum)
rv = default_cache.get(cache_key)

if options.get("assemble.read_from_redis"):
client = _get_redis_cluster_for_assemble()
rv = client.get(cache_key)

# It is stored as bytes with [state, detail] on Redis.
if rv:
[state, detail] = orjson.loads(rv)
rv = (state, detail)
else:
rv = default_cache.get(cache_key)

if rv is None:
return None, None
return tuple(rv)


@sentry_sdk.tracing.trace
def set_assemble_status(task, scope, checksum, state, detail=None):
"""
Updates the status of an assembling task. It is cached for 10 minutes.
"""
cache_key = _get_cache_key(task, scope, checksum)
default_cache.set(cache_key, (state, detail), 600)
redis_client = _get_redis_cluster_for_assemble()
redis_client.set(name=cache_key, value=orjson.dumps([state, detail]), ex=600)


@sentry_sdk.tracing.trace
def delete_assemble_status(task, scope, checksum):
"""
Deletes the status of an assembling task.
"""
cache_key = _get_cache_key(task, scope, checksum)
default_cache.delete(cache_key)
redis_client = _get_redis_cluster_for_assemble()
redis_client.delete(cache_key)


@instrumented_task(
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/testutils/helpers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def use_redis_cluster(
cluster_id: str = "cluster",
high_watermark: int = 100,
with_settings: dict[str, Any] | None = None,
with_options: dict[str, Any] | None = None,
) -> Generator[None, None, None]:
# Cluster id needs to be different than "default" to distinguish redis instance with redis cluster.

Expand All @@ -32,6 +33,9 @@ def use_redis_cluster(
},
}

if with_options:
options.update(with_options)

settings = dict(with_settings or {})
settings["SENTRY_PROCESSING_SERVICES"] = {"redis": {"redis": cluster_id}}

Expand Down
25 changes: 25 additions & 0 deletions tests/sentry/tasks/test_assemble.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import os
import uuid
from datetime import UTC, datetime, timedelta
from hashlib import sha1
from unittest import mock
Expand Down Expand Up @@ -28,10 +29,13 @@
assemble_artifacts,
assemble_dif,
assemble_file,
delete_assemble_status,
get_assemble_status,
set_assemble_status,
)
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.datetime import freeze_time
from sentry.testutils.helpers.redis import use_redis_cluster


class BaseAssembleTest(TestCase):
Expand Down Expand Up @@ -1047,3 +1051,24 @@ def test_index_if_needed_with_newer_bundle_already_stored(
organization_id=self.organization.id,
artifact_bundles=[(artifact_bundle_1, mock.ANY)],
)


@use_redis_cluster(with_options={"assemble.read_from_redis": True})
def test_redis_assemble_status():
task = AssembleTask.DIF
project_id = uuid.uuid4().hex
checksum = uuid.uuid4().hex

# If it doesn't exist, it should return correct values.
assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (None, None)

# Test setter
set_assemble_status(task, project_id, checksum, ChunkFileState.CREATED, detail="cylons")
assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (
"created",
"cylons",
)

# Deleting should actually delete it.
delete_assemble_status(task, project_id, checksum=checksum)
assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (None, None)
Loading