Skip to content

Commit a00d034

Browse files
authored
feat(replays): Update delete script to query clickhouse less (#92163)
1 parent d858161 commit a00d034

File tree

3 files changed

+130
-79
lines changed

3 files changed

+130
-79
lines changed
Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
from __future__ import annotations
22

3-
import contextlib
43
import logging
54
from collections.abc import Sequence
65
from datetime import datetime, timezone
76

7+
from snuba_sdk import Column, Condition, Entity, Function, Granularity, Limit, Offset, Op, Query
8+
89
from sentry.api.event_search import QueryToken, parse_search_query
910
from sentry.models.organization import Organization
1011
from sentry.replays.lib.kafka import initialize_replays_publisher
11-
from sentry.replays.post_process import generate_normalized_output
12-
from sentry.replays.query import query_replays_collection_paginated, replay_url_parser_config
13-
from sentry.replays.tasks import archive_replay, delete_replay_recording_async
12+
from sentry.replays.query import replay_url_parser_config
13+
from sentry.replays.tasks import archive_replay, delete_replays_script_async
14+
from sentry.replays.usecases.query import execute_query, handle_search_filters
15+
from sentry.replays.usecases.query.configs.scalar import scalar_search_config
1416

1517
logger = logging.getLogger()
1618

@@ -33,21 +35,15 @@ def delete_replays(
3335

3436
has_more = True
3537
while has_more:
36-
response = query_replays_collection_paginated(
37-
project_ids=[project_id],
38+
replays, has_more = _get_rows_matching_deletion_pattern(
39+
project_id=project_id,
3840
start=start_utc,
3941
end=end_utc,
40-
fields=["id"],
4142
limit=batch_size,
42-
environment=environment,
4343
offset=offset,
4444
search_filters=search_filters,
45-
sort="started_at",
46-
organization=Organization.objects.filter(project__id=project_id).get(),
47-
preferred_source="scalar",
45+
environment=environment,
4846
)
49-
replays = list(generate_normalized_output(response.response))
50-
has_more = response.has_more
5147

5248
# Exit early if no replays were found.
5349
if not replays:
@@ -58,16 +54,16 @@ def delete_replays(
5854
if dry_run:
5955
print(f"Replays to be deleted (dry run): {len(replays)}") # NOQA
6056
else:
61-
delete_replay_ids(project_id, replay_ids=[r["id"] for r in replays])
57+
delete_replay_ids(project_id, replays)
6258

6359

6460
def translate_cli_tags_param_to_snuba_tag_param(tags: list[str]) -> Sequence[QueryToken]:
6561
return parse_search_query(" AND ".join(tags), config=replay_url_parser_config)
6662

6763

68-
def delete_replay_ids(project_id: int, replay_ids: list[str]) -> None:
64+
def delete_replay_ids(project_id: int, rows: list[tuple[int, str, int]]) -> None:
6965
"""Delete a set of replay-ids for a specific project."""
70-
logger.info("Archiving %d replays.", len(replay_ids))
66+
logger.info("Archiving %d replays.", len(rows))
7167

7268
# Bulk produce archived replay rows to the ingest-replay-events topic before flushing.
7369
#
@@ -79,30 +75,75 @@ def delete_replay_ids(project_id: int, replay_ids: list[str]) -> None:
7975
#
8076
# This also gives us reasonable assurances that if the script ran to completion the customer
8177
# will not be able to access their deleted data even if the actual deletion takes place some
82-
# time later.
83-
with _bulk_produce_then_flush() as publisher:
84-
for replay_id in replay_ids:
85-
archive_replay(publisher, project_id, replay_id)
78+
# time later
79+
publisher = initialize_replays_publisher(is_async=True)
80+
for _, replay_id, _ in rows:
81+
archive_replay(publisher, project_id, replay_id)
82+
publisher.flush()
8683

87-
logger.info("Scheduling %d replays for deletion.", len(replay_ids))
84+
logger.info("Scheduling %d replays for deletion.", len(rows))
8885

8986
# Asynchronously delete RRWeb recording data.
9087
#
9188
# Because this operation could involve millions of requests to the blob storage provider we
9289
# schedule the tasks to run on a cluster of workers. This allows us to parallelize the work
9390
# and complete the task as quickly as possible.
94-
for replay_id in replay_ids:
95-
delete_replay_recording_async.delay(project_id, replay_id)
91+
for retention_days, replay_id, max_segment_id in rows:
92+
delete_replays_script_async.delay(retention_days, project_id, replay_id, max_segment_id)
9693

97-
logger.info("%d replays were successfully deleted.", len(replay_ids))
94+
logger.info("%d replays were successfully deleted.", len(rows))
9895
logger.info(
9996
"The customer will no longer have access to the replays passed to this function. Deletion "
10097
"of RRWeb data will complete asynchronously."
10198
)
10299

103100

104-
@contextlib.contextmanager
105-
def _bulk_produce_then_flush():
106-
publisher = initialize_replays_publisher(is_async=True)
107-
yield publisher
108-
publisher.flush()
101+
def _get_rows_matching_deletion_pattern(
102+
project_id: int,
103+
limit: int,
104+
offset: int,
105+
end: datetime,
106+
start: datetime,
107+
search_filters: Sequence[QueryToken],
108+
environment: list[str],
109+
) -> tuple[list[tuple[int, str, int]], bool]:
110+
where = handle_search_filters(scalar_search_config, search_filters)
111+
112+
if environment:
113+
where.append(Condition(Column("environment"), Op.IN, environment))
114+
115+
query = Query(
116+
match=Entity("replays"),
117+
select=[
118+
Function("any", parameters=[Column("retention_days")], alias="retention_days"),
119+
Column("replay_id"),
120+
Function("max", parameters=[Column("segment_id")], alias="max_segment_id"),
121+
],
122+
where=[
123+
Condition(Column("project_id"), Op.EQ, project_id),
124+
Condition(Column("timestamp"), Op.LT, end),
125+
Condition(Column("timestamp"), Op.GTE, start),
126+
*where,
127+
],
128+
groupby=[Column("replay_id")],
129+
granularity=Granularity(3600),
130+
limit=Limit(limit),
131+
offset=Offset(offset),
132+
)
133+
134+
response = execute_query(
135+
query,
136+
{"tenant_id": Organization.objects.filter(project__id=project_id).get().id},
137+
"replays.scripts.delete_replays",
138+
)
139+
140+
data = response.get("data", [])
141+
has_more = len(data) == limit
142+
143+
return (
144+
[
145+
(item["retention_days"], item["replay_id"].replace("-", ""), item["max_segment_id"])
146+
for item in data
147+
],
148+
has_more,
149+
)

src/sentry/replays/tasks.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66
from google.cloud.exceptions import NotFound
77

88
from sentry.replays.lib.kafka import initialize_replays_publisher
9-
from sentry.replays.lib.storage import filestore, make_video_filename, storage, storage_kv
9+
from sentry.replays.lib.storage import (
10+
RecordingSegmentStorageMeta,
11+
filestore,
12+
make_recording_filename,
13+
make_video_filename,
14+
storage,
15+
storage_kv,
16+
)
1017
from sentry.replays.models import ReplayRecordingSegment
1118
from sentry.replays.usecases.events import archive_event
1219
from sentry.replays.usecases.reader import fetch_segments_metadata
@@ -59,6 +66,54 @@ def delete_replay_recording_async(project_id: int, replay_id: str) -> None:
5966
delete_replay_recording(project_id, replay_id)
6067

6168

69+
@instrumented_task(
70+
name="sentry.replays.tasks.delete_recording_async",
71+
queue="replays.delete_replay",
72+
default_retry_delay=5,
73+
max_retries=5,
74+
silo_mode=SiloMode.REGION,
75+
taskworker_config=TaskworkerConfig(
76+
namespace=replays_tasks,
77+
retry=Retry(
78+
times=5,
79+
delay=5,
80+
),
81+
),
82+
)
83+
def delete_replays_script_async(
84+
retention_days: int,
85+
project_id: int,
86+
replay_id: str,
87+
max_segment_id: int,
88+
) -> None:
89+
segments = [
90+
RecordingSegmentStorageMeta(
91+
project_id=project_id,
92+
replay_id=replay_id,
93+
segment_id=i,
94+
retention_days=retention_days,
95+
)
96+
for i in range(0, max_segment_id)
97+
]
98+
99+
rrweb_filenames = []
100+
video_filenames = []
101+
for segment in segments:
102+
video_filenames.append(make_video_filename(segment))
103+
rrweb_filenames.append(make_recording_filename(segment))
104+
105+
with cf.ThreadPoolExecutor(max_workers=100) as pool:
106+
pool.map(_delete_if_exists, video_filenames)
107+
pool.map(_delete_if_exists, rrweb_filenames)
108+
109+
# Backwards compatibility. Should be deleted one day.
110+
segments_from_django_models = ReplayRecordingSegment.objects.filter(
111+
replay_id=replay_id, project_id=project_id
112+
).all()
113+
for segment_model in segments_from_django_models:
114+
segment_model.delete()
115+
116+
62117
def delete_replay_recording(project_id: int, replay_id: str) -> None:
63118
"""Delete all recording-segments associated with a Replay."""
64119
segments_from_metadata = fetch_segments_metadata(project_id, replay_id, offset=0, limit=10000)

tests/sentry/replays/scripts/test_delete_replays.py

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
from zlib import compress
77

88
from sentry.models.file import File
9-
from sentry.replays.lib.storage import RecordingSegmentStorageMeta, storage
109
from sentry.replays.models import ReplayRecordingSegment
11-
from sentry.replays.scripts.delete_replays import delete_replay_ids, delete_replays
10+
from sentry.replays.scripts.delete_replays import delete_replays
1211
from sentry.replays.testutils import (
1312
mock_replay,
1413
mock_rrweb_div_helloworld,
@@ -35,7 +34,9 @@ def store_replay_segments(
3534
tags = {}
3635

3736
self.store_replays(
38-
mock_replay(timestamp, project_id, replay_id, environment=environment, tags=tags)
37+
mock_replay(
38+
timestamp, project_id, replay_id, environment=environment, tags=tags, segment_id=5
39+
)
3940
)
4041

4142
segments = [
@@ -280,49 +281,3 @@ def test_deletion_replays_batch_size_all_deleted(self):
280281

281282
replay_recordings = ReplayRecordingSegment.objects.all()
282283
assert len(replay_recordings) == 0
283-
284-
def test_delete_replays_by_id(self):
285-
# Deleted.
286-
deleted_replay_id = uuid4().hex
287-
self.store_replays(
288-
mock_replay(
289-
datetime.datetime.now() - datetime.timedelta(seconds=10),
290-
self.project.id,
291-
deleted_replay_id,
292-
)
293-
)
294-
295-
metadata1 = RecordingSegmentStorageMeta(
296-
project_id=self.project.id,
297-
replay_id=deleted_replay_id,
298-
segment_id=0,
299-
retention_days=30,
300-
file_id=None,
301-
)
302-
storage.set(metadata1, b"hello, world!")
303-
304-
# Kept
305-
kept_replay_id = uuid4().hex
306-
self.store_replays(
307-
mock_replay(
308-
datetime.datetime.now() - datetime.timedelta(seconds=10),
309-
self.project.id,
310-
kept_replay_id,
311-
)
312-
)
313-
314-
metadata2 = RecordingSegmentStorageMeta(
315-
project_id=self.project.id,
316-
replay_id=kept_replay_id,
317-
segment_id=0,
318-
retention_days=30,
319-
file_id=None,
320-
)
321-
storage.set(metadata2, b"hello, world!")
322-
323-
with TaskRunner():
324-
delete_replay_ids(project_id=self.project.id, replay_ids=[deleted_replay_id])
325-
326-
# Assert stored data was deleted.
327-
assert storage.get(metadata1) is None
328-
assert storage.get(metadata2) is not None

0 commit comments

Comments
 (0)