6
6
from datetime import datetime , timedelta , timezone
7
7
from typing import Any , DefaultDict , NamedTuple
8
8
9
+ import sentry_sdk
9
10
from celery import Task
10
11
from django .db .models import OuterRef , Subquery
11
12
@@ -584,24 +585,33 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k
584
585
"""
585
586
Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass
586
587
"""
587
- project = fetch_project (project_id )
588
- if not project :
589
- return
590
-
591
- rulegroup_to_event_data = fetch_rulegroup_to_event_data (project_id , batch_key )
592
- rules_to_groups = get_rules_to_groups (rulegroup_to_event_data )
593
- alert_rules = fetch_alert_rules (list (rules_to_groups .keys ()))
594
- condition_groups = get_condition_query_groups (alert_rules , rules_to_groups )
595
- logger .info (
596
- "delayed_processing.condition_groups" ,
597
- extra = {
598
- "condition_groups" : len (condition_groups ),
599
- "project_id" : project_id ,
600
- "rules_to_groups" : rules_to_groups ,
601
- },
602
- )
588
+ with sentry_sdk .start_span (
589
+ op = "delayed_processing.prepare_data" , name = "Fetch data from buffers in delayed processing"
590
+ ):
591
+ project = fetch_project (project_id )
592
+ if not project :
593
+ return
594
+
595
+ rulegroup_to_event_data = fetch_rulegroup_to_event_data (project_id , batch_key )
596
+ rules_to_groups = get_rules_to_groups (rulegroup_to_event_data )
597
+ alert_rules = fetch_alert_rules (list (rules_to_groups .keys ()))
598
+ condition_groups = get_condition_query_groups (alert_rules , rules_to_groups )
599
+ logger .info (
600
+ "delayed_processing.condition_groups" ,
601
+ extra = {
602
+ "condition_groups" : len (condition_groups ),
603
+ "project_id" : project_id ,
604
+ "rules_to_groups" : rules_to_groups ,
605
+ },
606
+ )
603
607
604
- with metrics .timer ("delayed_processing.get_condition_group_results.duration" ):
608
+ with (
609
+ metrics .timer ("delayed_processing.get_condition_group_results.duration" ),
610
+ sentry_sdk .start_span (
611
+ op = "delayed_processing.get_condition_group_results" ,
612
+ name = "Fetch condition group results in delayed processing" ,
613
+ ),
614
+ ):
605
615
condition_group_results = get_condition_group_results (condition_groups , project )
606
616
607
617
has_workflow_engine = features .has (
@@ -625,34 +635,48 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k
625
635
for rule in alert_rules :
626
636
rules_to_slow_conditions [rule ].extend (get_slow_conditions (rule ))
627
637
628
- rules_to_fire = defaultdict (set )
629
- if condition_group_results :
630
- rules_to_fire = get_rules_to_fire (
631
- condition_group_results , rules_to_slow_conditions , rules_to_groups , project .id
632
- )
633
- if has_workflow_engine or features .has ("projects:num-events-issue-debugging" , project ):
634
- logger .info (
635
- "delayed_processing.rules_to_fire" ,
636
- extra = {
637
- "rules_to_fire" : {rule .id : groups for rule , groups in rules_to_fire .items ()},
638
- "project_id" : project_id ,
639
- "rules_to_slow_conditions" : {
640
- rule .id : conditions for rule , conditions in rules_to_slow_conditions .items ()
641
- },
642
- "rules_to_groups" : rules_to_groups ,
643
- },
644
- )
645
- if random .random () < 0.01 :
646
- logger .info (
647
- "delayed_processing.rule_to_fire" ,
648
- extra = {"rules_to_fire" : list (rules_to_fire .keys ()), "project_id" : project_id },
638
+ with sentry_sdk .start_span (
639
+ op = "delayed_processing.get_rules_to_fire" ,
640
+ name = "Process rule conditions in delayed processing" ,
641
+ ):
642
+ rules_to_fire = defaultdict (set )
643
+ if condition_group_results :
644
+ rules_to_fire = get_rules_to_fire (
645
+ condition_group_results , rules_to_slow_conditions , rules_to_groups , project .id
649
646
)
647
+ if has_workflow_engine or features .has ("projects:num-events-issue-debugging" , project ):
648
+ logger .info (
649
+ "delayed_processing.rules_to_fire" ,
650
+ extra = {
651
+ "rules_to_fire" : {
652
+ rule .id : groups for rule , groups in rules_to_fire .items ()
653
+ },
654
+ "project_id" : project_id ,
655
+ "rules_to_slow_conditions" : {
656
+ rule .id : conditions
657
+ for rule , conditions in rules_to_slow_conditions .items ()
658
+ },
659
+ "rules_to_groups" : rules_to_groups ,
660
+ },
661
+ )
662
+ if random .random () < 0.01 :
663
+ logger .info (
664
+ "delayed_processing.rule_to_fire" ,
665
+ extra = {"rules_to_fire" : list (rules_to_fire .keys ()), "project_id" : project_id },
666
+ )
650
667
651
- parsed_rulegroup_to_event_data = parse_rulegroup_to_event_data (rulegroup_to_event_data )
652
- with metrics .timer ("delayed_processing.fire_rules.duration" ):
653
- fire_rules (rules_to_fire , parsed_rulegroup_to_event_data , alert_rules , project )
654
-
655
- cleanup_redis_buffer (project , rules_to_groups , batch_key )
668
+ with sentry_sdk .start_span (
669
+ op = "delayed_processing.fire_rules" , name = "Fire rules in delayed processing"
670
+ ):
671
+ parsed_rulegroup_to_event_data = parse_rulegroup_to_event_data (rulegroup_to_event_data )
672
+ with metrics .timer ("delayed_processing.fire_rules.duration" ):
673
+ fire_rules (rules_to_fire , parsed_rulegroup_to_event_data , alert_rules , project )
674
+
675
+ with sentry_sdk .start_span (
676
+ op = "delayed_processing.cleanup_redis_buffer" ,
677
+ name = "Clean up redis buffer in delayed processing" ,
678
+ ):
679
+ cleanup_redis_buffer (project , rules_to_groups , batch_key )
656
680
657
681
658
682
@delayed_processing_registry .register ("delayed_processing" ) # default delayed processing
0 commit comments