@@ -570,10 +570,10 @@ def save_error_events(
570
570
job ["event" ].data .bind_ref (job ["event" ])
571
571
572
572
_get_or_create_environment_many (jobs , projects )
573
- _get_or_create_group_environment_many (jobs , projects )
573
+ _get_or_create_group_environment_many (jobs )
574
574
_get_or_create_release_associated_models (jobs , projects )
575
575
_increment_release_associated_counts_many (jobs , projects )
576
- _get_or_create_group_release_many (jobs , projects )
576
+ _get_or_create_group_release_many (jobs )
577
577
_tsdb_record_all_metrics (jobs )
578
578
579
579
UserReport .objects .filter (project_id = project .id , event_id = job ["event" ].event_id ).update (
@@ -946,7 +946,7 @@ def _get_or_create_environment_many(jobs: Sequence[Job], projects: ProjectsMappi
946
946
947
947
948
948
@sentry_sdk .tracing .trace
949
- def _get_or_create_group_environment_many (jobs : Sequence [Job ], projects : ProjectsMapping ) -> None :
949
+ def _get_or_create_group_environment_many (jobs : Sequence [Job ]) -> None :
950
950
for job in jobs :
951
951
_get_or_create_group_environment (job ["environment" ], job ["release" ], job ["groups" ])
952
952
@@ -1029,7 +1029,7 @@ def _increment_release_associated_counts(
1029
1029
)
1030
1030
1031
1031
1032
- def _get_or_create_group_release_many (jobs : Sequence [Job ], projects : ProjectsMapping ) -> None :
1032
+ def _get_or_create_group_release_many (jobs : Sequence [Job ]) -> None :
1033
1033
for job in jobs :
1034
1034
_get_or_create_group_release (
1035
1035
job ["environment" ], job ["release" ], job ["event" ], job ["groups" ]
@@ -1443,19 +1443,35 @@ def _save_aggregate(
1443
1443
metrics .timer ("event_manager.create_group_transaction" ) as metric_tags ,
1444
1444
transaction .atomic (router .db_for_write (GroupHash )),
1445
1445
):
1446
+ # These values will get overridden with whatever happens inside the lock if we do manage
1447
+ # to acquire it, so it should only end up with `wait-for-lock` if we don't
1446
1448
span .set_tag ("outcome" , "wait_for_lock" )
1447
1449
metric_tags ["outcome" ] = "wait_for_lock"
1448
1450
1449
1451
all_grouphash_ids = [h .id for h in flat_grouphashes ]
1450
1452
if root_hierarchical_grouphash is not None :
1451
1453
all_grouphash_ids .append (root_hierarchical_grouphash .id )
1452
1454
1455
+ # If we're in this branch, we checked our grouphashes and didn't find one with a group
1456
+ # attached. We thus want to create a new group, but we need to guard against another
1457
+ # event with the same hash coming in before we're done here and also thinking it needs
1458
+ # to create a new group. To prevent this, we're using double-checked locking
1459
+ # (https://en.wikipedia.org/wiki/Double-checked_locking).
1460
+
1461
+ # First, try to lock the relevant rows in the `GroupHash` table. If another (identically
1462
+ # hashed) event is also in the process of creating a group and has grabbed the lock
1463
+ # before us, we'll block here until it's done. If not, we've now got the lock and other
1464
+ # identically-hashed events will have to wait for us.
1453
1465
all_grouphashes = list (
1454
1466
GroupHash .objects .filter (id__in = all_grouphash_ids ).select_for_update ()
1455
1467
)
1456
1468
1457
1469
flat_grouphashes = [gh for gh in all_grouphashes if gh .hash in hashes .hashes ]
1458
1470
1471
+ # Now check again to see if any of our grouphashes have a group. In the first race
1472
+ # condition scenario above, we'll have been blocked long enough for the other event to
1473
+ # have created the group and updated our grouphashes with a group id, which means this
1474
+ # time, we'll find something.
1459
1475
existing_grouphash , root_hierarchical_hash = find_existing_grouphash (
1460
1476
project , flat_grouphashes , hashes .hierarchical_hashes
1461
1477
)
@@ -1467,6 +1483,8 @@ def _save_aggregate(
1467
1483
else :
1468
1484
root_hierarchical_grouphash = None
1469
1485
1486
+ # If we still haven't found a matching grouphash, we're now safe to go ahead and create
1487
+ # the group.
1470
1488
if existing_grouphash is None :
1471
1489
group = _create_group (project , event , ** group_creation_kwargs )
1472
1490
@@ -1606,7 +1624,7 @@ def _save_aggregate_new(
1606
1624
group_processing_kwargs = _get_group_processing_kwargs (job )
1607
1625
1608
1626
# Try looking for an existing group using the current grouping config
1609
- primary = create_and_seek_grouphashes (job , run_primary_grouping , metric_tags )
1627
+ primary = get_hashes_and_grouphashes (job , run_primary_grouping , metric_tags )
1610
1628
1611
1629
# If we've found one, great. No need to do any more calculations
1612
1630
if primary .existing_grouphash :
@@ -1616,7 +1634,7 @@ def _save_aggregate_new(
1616
1634
result = "found_primary"
1617
1635
# If we haven't, try again using the secondary config
1618
1636
else :
1619
- secondary = create_and_seek_grouphashes (job , maybe_run_secondary_grouping , metric_tags )
1637
+ secondary = get_hashes_and_grouphashes (job , maybe_run_secondary_grouping , metric_tags )
1620
1638
all_grouphashes = primary .grouphashes + secondary .grouphashes
1621
1639
1622
1640
# Now we know for sure whether or not a group already exists, so handle both cases
@@ -1659,7 +1677,7 @@ def _save_aggregate_new(
1659
1677
return group_info
1660
1678
1661
1679
1662
- def create_and_seek_grouphashes (
1680
+ def get_hashes_and_grouphashes (
1663
1681
job : Job ,
1664
1682
hash_calculation_function : Callable [
1665
1683
[Project , Job , MutableTags ],
@@ -1768,6 +1786,8 @@ def create_group_with_grouphashes(
1768
1786
metrics .timer ("event_manager.create_group_transaction" ) as metrics_timer_tags ,
1769
1787
transaction .atomic (router .db_for_write (GroupHash )),
1770
1788
):
1789
+ # These values will get overridden with whatever happens inside the lock if we do manage to
1790
+ # acquire it, so it should only end up with `wait-for-lock` if we don't
1771
1791
span .set_tag ("outcome" , "wait_for_lock" )
1772
1792
metrics_timer_tags ["outcome" ] = "wait_for_lock"
1773
1793
0 commit comments