|
22 | 22 | from django.db.models.manager import BaseManager
|
23 | 23 | from django.utils import timezone
|
24 | 24 |
|
| 25 | +from sentry.db.models import Model |
25 | 26 | from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
|
26 | 27 | from sentry.models.tombstone import TombstoneBase
|
27 | 28 | from sentry.silo import SiloMode
|
@@ -247,30 +248,14 @@ def _process_tombstone_reconciliation(
|
247 | 248 | prefix, field, watermark_manager, batch_size=get_batch_size()
|
248 | 249 | )
|
249 | 250 | has_more = watermark_batch.has_more
|
250 |
| - to_delete_ids: list[int] = [] |
251 |
| - |
252 | 251 | if watermark_batch.low < watermark_batch.up:
|
253 |
| - oldest_seen: datetime.datetime = timezone.now() |
254 |
| - |
255 |
| - with connections[router.db_for_read(model)].cursor() as conn: |
256 |
| - conn.execute( |
257 |
| - f""" |
258 |
| - SELECT r.id, t.created_at |
259 |
| - FROM {model._meta.db_table} r |
260 |
| - JOIN {tombstone_cls._meta.db_table} t |
261 |
| - ON t.table_name = %(table_name)s AND t.object_identifier = r.{field.name} |
262 |
| - WHERE {watermark_target}.id > %(low)s AND {watermark_target}.id <= %(up)s |
263 |
| - """, |
264 |
| - { |
265 |
| - "table_name": field.foreign_table_name, |
266 |
| - "low": watermark_batch.low, |
267 |
| - "up": watermark_batch.up, |
268 |
| - }, |
269 |
| - ) |
270 |
| - |
271 |
| - for (row_id, tomb_created) in conn.fetchall(): |
272 |
| - to_delete_ids.append(row_id) |
273 |
| - oldest_seen = min(oldest_seen, tomb_created) |
| 252 | + to_delete_ids, oldest_seen = _get_ids_to_delete( |
| 253 | + tombstone_cls=tombstone_cls, |
| 254 | + model=model, |
| 255 | + field=field, |
| 256 | + watermark_batch=watermark_batch, |
| 257 | + watermark_target=watermark_target, |
| 258 | + ) |
274 | 259 |
|
275 | 260 | if field.on_delete == "CASCADE":
|
276 | 261 | task = deletions.get(
|
@@ -306,3 +291,48 @@ def _process_tombstone_reconciliation(
|
306 | 291 | )
|
307 | 292 |
|
308 | 293 | return has_more
|
| 294 | + |
| 295 | + |
| 296 | +def _get_ids_to_delete( |
| 297 | + tombstone_cls: type[TombstoneBase], |
| 298 | + model: type[Model], |
| 299 | + field: HybridCloudForeignKey, |
| 300 | + watermark_target, |
| 301 | + watermark_batch: WatermarkBatch, |
| 302 | +) -> tuple[list[int], datetime]: |
| 303 | + """ |
| 304 | + Queries the database or databases if spanning multiple), and returns |
| 305 | + a list of tuples containing row ids and tombstone creation time for |
| 306 | + any rows requiring cleanup. |
| 307 | +
|
| 308 | + :param self: |
| 309 | + :param tombstone_cls: Either a RegionTombstone or ControlTombstone, depending on |
| 310 | + which silo the tombstone process is running. |
| 311 | + :param model: The model with a HybridCloudForeignKey. |
| 312 | + :param field: The HybridCloudForeignKey field from the model to process. |
| 313 | + :return: |
| 314 | + """ |
| 315 | + |
| 316 | + to_delete_ids = [] |
| 317 | + oldest_seen: datetime.datetime = timezone.now() |
| 318 | + with connections[router.db_for_read(model)].cursor() as conn: |
| 319 | + conn.execute( |
| 320 | + f""" |
| 321 | + SELECT r.id, t.created_at |
| 322 | + FROM {model._meta.db_table} r |
| 323 | + JOIN {tombstone_cls._meta.db_table} t |
| 324 | + ON t.table_name = %(table_name)s AND t.object_identifier = r.{field.name} |
| 325 | + WHERE {watermark_target}.id > %(low)s AND {watermark_target}.id <= %(up)s |
| 326 | + """, |
| 327 | + { |
| 328 | + "table_name": field.foreign_table_name, |
| 329 | + "low": watermark_batch.low, |
| 330 | + "up": watermark_batch.up, |
| 331 | + }, |
| 332 | + ) |
| 333 | + |
| 334 | + for (row_id, tomb_created) in conn.fetchall(): |
| 335 | + to_delete_ids.append(row_id) |
| 336 | + oldest_seen = min(oldest_seen, tomb_created) |
| 337 | + |
| 338 | + return to_delete_ids, oldest_seen |
0 commit comments