Skip to content

Commit 2b93ffe

Browse files
fix(hybrid-cloud): Adds support for cross database tombstones (#69480)
Adds cross-database tombstone deletion support The current tombstone code performs a join between the model and tombstone tables to construct a minimal set of IDs to update or delete from the target table. Because this isn't possible when the target model lives in a different database (e.g. crons monitors), we have to take a 2 step approach: 1. Query the tombstone table for the current IDs list matching the watermarking batch. 2. Query the model table for all rows with a matching identifier on the target field There are possible performance concerns with the model-side query, so I've placed this behind a new option named `hybrid_cloud.allow_cross_db_tombstones`. If the flag is disabled and a cross db tombstone cleanup is attempted, an explicit exception is raised noting why the cleanup failed.
1 parent 4420324 commit 2b93ffe

File tree

3 files changed

+281
-24
lines changed

3 files changed

+281
-24
lines changed

src/sentry/options/defaults.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,6 +1726,7 @@
17261726
register(
17271727
"hybrid_cloud.disable_relative_upload_urls", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE
17281728
)
1729+
register("hybrid_cloud.allow_cross_db_tombstones", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE)
17291730

17301731
# Retry controls
17311732
register("hybridcloud.regionsiloclient.retries", default=5, flags=FLAG_AUTOMATOR_MODIFIABLE)

src/sentry/tasks/deletion/hybrid_cloud.py

Lines changed: 89 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
from django.db.models.manager import BaseManager
2323
from django.utils import timezone
2424

25+
from sentry import options
26+
from sentry.db.models import Model
2527
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
2628
from sentry.models.tombstone import TombstoneBase
2729
from sentry.silo.base import SiloMode
@@ -247,30 +249,14 @@ def _process_tombstone_reconciliation(
247249
prefix, field, watermark_manager, batch_size=get_batch_size()
248250
)
249251
has_more = watermark_batch.has_more
250-
to_delete_ids: list[int] = []
251-
252252
if watermark_batch.low < watermark_batch.up:
253-
oldest_seen: datetime.datetime = timezone.now()
254-
255-
with connections[router.db_for_read(model)].cursor() as conn:
256-
conn.execute(
257-
f"""
258-
SELECT r.id, t.created_at
259-
FROM {model._meta.db_table} r
260-
JOIN {tombstone_cls._meta.db_table} t
261-
ON t.table_name = %(table_name)s AND t.object_identifier = r.{field.name}
262-
WHERE {watermark_target}.id > %(low)s AND {watermark_target}.id <= %(up)s
263-
""",
264-
{
265-
"table_name": field.foreign_table_name,
266-
"low": watermark_batch.low,
267-
"up": watermark_batch.up,
268-
},
269-
)
270-
271-
for (row_id, tomb_created) in conn.fetchall():
272-
to_delete_ids.append(row_id)
273-
oldest_seen = min(oldest_seen, tomb_created)
253+
to_delete_ids, oldest_seen = _get_model_ids_for_tombstone_cascade(
254+
tombstone_cls=tombstone_cls,
255+
model=model,
256+
field=field,
257+
watermark_batch=watermark_batch,
258+
watermark_target=watermark_target,
259+
)
274260

275261
if field.on_delete == "CASCADE":
276262
task = deletions.get(
@@ -306,3 +292,83 @@ def _process_tombstone_reconciliation(
306292
)
307293

308294
return has_more
295+
296+
297+
def _get_model_ids_for_tombstone_cascade(
298+
tombstone_cls: type[TombstoneBase],
299+
model: type[Model],
300+
field: HybridCloudForeignKey,
301+
watermark_target,
302+
watermark_batch: WatermarkBatch,
303+
) -> tuple[list[int], datetime.datetime]:
304+
"""
305+
Queries the database or databases if spanning multiple, and returns
306+
a tuple with a list of row IDs to delete, and the oldest
307+
tombstone timestamp for the batch.
308+
309+
:param tombstone_cls: Either a RegionTombstone or ControlTombstone, depending on
310+
which silo the tombstone process is running.
311+
:param model: The model with a HybridCloudForeignKey.
312+
:param field: The HybridCloudForeignKey field from the model to process.
313+
:return:
314+
"""
315+
316+
to_delete_ids = []
317+
oldest_seen = timezone.now()
318+
tombstone_and_model_in_same_db = router.db_for_read(model) == router.db_for_read(tombstone_cls)
319+
320+
if tombstone_and_model_in_same_db:
321+
with connections[router.db_for_read(model)].cursor() as conn:
322+
conn.execute(
323+
f"""
324+
SELECT r.id, t.created_at
325+
FROM {model._meta.db_table} r
326+
JOIN {tombstone_cls._meta.db_table} t
327+
ON t.table_name = %(table_name)s AND t.object_identifier = r.{field.name}
328+
WHERE {watermark_target}.id > %(low)s AND {watermark_target}.id <= %(up)s
329+
""",
330+
{
331+
"table_name": field.foreign_table_name,
332+
"low": watermark_batch.low,
333+
"up": watermark_batch.up,
334+
},
335+
)
336+
337+
for (row_id, tomb_created) in conn.fetchall():
338+
to_delete_ids.append(row_id)
339+
oldest_seen = min(oldest_seen, tomb_created)
340+
341+
return to_delete_ids, oldest_seen
342+
343+
return get_ids_for_tombstone_cascade_cross_db(
344+
tombstone_cls=tombstone_cls, model=model, field=field, watermark_batch=watermark_batch
345+
)
346+
347+
348+
def get_ids_for_tombstone_cascade_cross_db(
349+
tombstone_cls: type[TombstoneBase],
350+
model: type[Model],
351+
field: HybridCloudForeignKey,
352+
watermark_batch: WatermarkBatch,
353+
) -> tuple[list[int], datetime.datetime]:
354+
if not options.get("hybrid_cloud.allow_cross_db_tombstones"):
355+
raise Exception("Cannot process tombstones due to model living in separate database.")
356+
357+
oldest_seen = timezone.now()
358+
# Because tombstones can span multiple databases, we can't always rely on
359+
# the join code above. Instead, we have to manually query the tombstone IDs
360+
# first, then query the model for matching rows.
361+
tombstone_entries = tombstone_cls.objects.filter(
362+
id__lte=watermark_batch.up, id__gt=watermark_batch.low, table_name=field.foreign_table_name
363+
).values_list("object_identifier", "created_at")
364+
365+
ids_to_check = []
366+
for object_id, created_at in tombstone_entries:
367+
ids_to_check.append(object_id)
368+
oldest_seen = min(oldest_seen, created_at)
369+
370+
field_name = f"{field.name}__in"
371+
query_kwargs = {field_name: ids_to_check}
372+
affected_rows = list(model.objects.filter(**query_kwargs).values_list("id", flat=True))
373+
374+
return affected_rows, oldest_seen

tests/sentry/tasks/deletion/test_hybrid_cloud.py

Lines changed: 191 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,29 @@
1212
from sentry.models.integrations.integration import Integration
1313
from sentry.models.outbox import ControlOutbox, OutboxScope, outbox_context
1414
from sentry.models.savedsearch import SavedSearch
15+
from sentry.models.tombstone import RegionTombstone
1516
from sentry.models.user import User
17+
from sentry.monitors.models import Monitor
1618
from sentry.silo.base import SiloMode
1719
from sentry.tasks.deletion.hybrid_cloud import (
20+
WatermarkBatch,
21+
get_ids_for_tombstone_cascade_cross_db,
1822
get_watermark,
1923
schedule_hybrid_cloud_foreign_key_jobs,
2024
schedule_hybrid_cloud_foreign_key_jobs_control,
2125
set_watermark,
2226
)
2327
from sentry.testutils.factories import Factories
28+
from sentry.testutils.helpers import override_options
2429
from sentry.testutils.helpers.task_runner import BurstTaskRunner
2530
from sentry.testutils.outbox import outbox_runner
2631
from sentry.testutils.pytest.fixtures import django_db_all
27-
from sentry.testutils.silo import assume_test_silo_mode, control_silo_test
32+
from sentry.testutils.silo import (
33+
assume_test_silo_mode,
34+
assume_test_silo_mode_of,
35+
control_silo_test,
36+
region_silo_test,
37+
)
2838
from sentry.types.region import find_regions_for_user
2939

3040

@@ -274,3 +284,183 @@ def test_set_null_deletion_behavior(task_runner):
274284
# Deletion set field to null
275285
saved_query = DiscoverSavedQuery.objects.get(id=saved_query.id)
276286
assert saved_query.created_by_id is None
287+
288+
289+
def setup_cross_db_deletion_data():
290+
user = Factories.create_user()
291+
organization = Factories.create_organization(owner=user, name="Delete Me")
292+
project = Factories.create_project(organization=organization)
293+
group = Factories.create_group(project=project)
294+
with assume_test_silo_mode_of(DiscoverSavedQuery, Monitor):
295+
saved_query = DiscoverSavedQuery.objects.create(
296+
name="disco-query",
297+
organization=organization,
298+
created_by_id=user.id,
299+
)
300+
monitor = Monitor.objects.create(
301+
organization_id=organization.id,
302+
project_id=project.id,
303+
slug="test-monitor",
304+
name="Test Monitor",
305+
owner_user_id=user.id,
306+
)
307+
308+
return dict(
309+
user=user,
310+
organization=organization,
311+
project=project,
312+
monitor=monitor,
313+
group=group,
314+
saved_query=saved_query,
315+
)
316+
317+
318+
@django_db_all
319+
@region_silo_test
320+
def test_get_ids_for_tombstone_cascade_cross_db(task_runner):
321+
data = setup_cross_db_deletion_data()
322+
323+
unaffected_data = []
324+
for i in range(3):
325+
unaffected_data.append(setup_cross_db_deletion_data())
326+
327+
user = data["user"]
328+
user_id = user.id
329+
monitor = data["monitor"]
330+
with assume_test_silo_mode_of(User), outbox_runner():
331+
user.delete()
332+
333+
tombstone = RegionTombstone.objects.get(
334+
object_identifier=user_id, table_name=User._meta.db_table
335+
)
336+
337+
highest_tombstone_id = RegionTombstone.objects.aggregate(Max("id"))
338+
monitor_owner_field = Monitor._meta.get_field("owner_user_id")
339+
340+
with override_options({"hybrid_cloud.allow_cross_db_tombstones": True}):
341+
ids, oldest_obj = get_ids_for_tombstone_cascade_cross_db(
342+
tombstone_cls=RegionTombstone,
343+
model=Monitor,
344+
field=monitor_owner_field,
345+
watermark_batch=WatermarkBatch(
346+
low=0,
347+
up=highest_tombstone_id["id__max"] + 1,
348+
has_more=False,
349+
transaction_id="foobar",
350+
),
351+
)
352+
assert ids == [monitor.id]
353+
assert oldest_obj == tombstone.created_at
354+
355+
356+
@django_db_all
357+
@region_silo_test
358+
def test_get_ids_for_tombstone_cascade_cross_db_watermark_bounds(task_runner):
359+
cascade_data = []
360+
for i in range(3):
361+
cascade_data.append(setup_cross_db_deletion_data())
362+
363+
unaffected_data = []
364+
for i in range(3):
365+
unaffected_data.append(setup_cross_db_deletion_data())
366+
367+
in_order_tombstones = []
368+
for data in cascade_data:
369+
user = data["user"]
370+
user_id = user.id
371+
with assume_test_silo_mode_of(User), outbox_runner():
372+
user.delete()
373+
374+
in_order_tombstones.append(
375+
RegionTombstone.objects.get(object_identifier=user_id, table_name=User._meta.db_table)
376+
)
377+
378+
bounds_with_expected_results = [
379+
(
380+
{"low": 0, "up": in_order_tombstones[1].id},
381+
[cascade_data[0]["monitor"].id, cascade_data[1]["monitor"].id],
382+
),
383+
(
384+
{"low": in_order_tombstones[1].id, "up": in_order_tombstones[2].id},
385+
[cascade_data[2]["monitor"].id],
386+
),
387+
(
388+
{"low": 0, "up": in_order_tombstones[0].id - 1},
389+
[],
390+
),
391+
(
392+
{"low": in_order_tombstones[2].id + 1, "up": in_order_tombstones[2].id + 5},
393+
[],
394+
),
395+
(
396+
{"low": -1, "up": in_order_tombstones[2].id + 1},
397+
[
398+
cascade_data[0]["monitor"].id,
399+
cascade_data[1]["monitor"].id,
400+
cascade_data[2]["monitor"].id,
401+
],
402+
),
403+
]
404+
405+
for bounds, bounds_with_expected_results in bounds_with_expected_results:
406+
monitor_owner_field = Monitor._meta.get_field("owner_user_id")
407+
408+
with override_options({"hybrid_cloud.allow_cross_db_tombstones": True}):
409+
ids, oldest_obj = get_ids_for_tombstone_cascade_cross_db(
410+
tombstone_cls=RegionTombstone,
411+
model=Monitor,
412+
field=monitor_owner_field,
413+
watermark_batch=WatermarkBatch(
414+
low=bounds["low"],
415+
up=bounds["up"],
416+
has_more=False,
417+
transaction_id="foobar",
418+
),
419+
)
420+
assert ids == bounds_with_expected_results
421+
422+
423+
@django_db_all
424+
@region_silo_test
425+
def test_get_ids_for_tombstone_cascade_cross_db_with_multiple_tombstone_types():
426+
data = setup_cross_db_deletion_data()
427+
428+
unaffected_data = []
429+
for i in range(3):
430+
unaffected_data.append(setup_cross_db_deletion_data())
431+
432+
# Pollute the tombstone data with references to relationships in other
433+
# tables matching other User IDs just to ensure we are filtering on the
434+
# correct table name.
435+
for udata in unaffected_data:
436+
unaffected_user = udata["user"]
437+
RegionTombstone.objects.create(
438+
table_name="something_table", object_identifier=unaffected_user.id
439+
)
440+
441+
user = data["user"]
442+
user_id = user.id
443+
monitor = data["monitor"]
444+
with assume_test_silo_mode_of(User), outbox_runner():
445+
user.delete()
446+
447+
tombstone = RegionTombstone.objects.get(
448+
object_identifier=user_id, table_name=User._meta.db_table
449+
)
450+
451+
highest_tombstone_id = RegionTombstone.objects.aggregate(Max("id"))
452+
453+
with override_options({"hybrid_cloud.allow_cross_db_tombstones": True}):
454+
ids, oldest_obj = get_ids_for_tombstone_cascade_cross_db(
455+
tombstone_cls=RegionTombstone,
456+
model=Monitor,
457+
field=Monitor._meta.get_field("owner_user_id"),
458+
watermark_batch=WatermarkBatch(
459+
low=0,
460+
up=highest_tombstone_id["id__max"] + 1,
461+
has_more=False,
462+
transaction_id="foobar",
463+
),
464+
)
465+
assert ids == [monitor.id]
466+
assert oldest_obj == tombstone.created_at

0 commit comments

Comments
 (0)