3
3
import uuid
4
4
from collections import defaultdict
5
5
from collections .abc import Sequence
6
+ from dataclasses import dataclass
6
7
from datetime import datetime , timedelta , timezone
7
8
from typing import Any , DefaultDict , NamedTuple
8
9
@@ -137,6 +138,29 @@ def get_slow_conditions(rule: Rule) -> list[EventFrequencyConditionData]:
137
138
return slow_conditions # type: ignore[return-value]
138
139
139
140
141
+ @dataclass (frozen = True )
142
+ class LogConfig :
143
+ """
144
+ LogConfig efficiently caches the results of features.has calls; these are project/org
145
+ based, should be stable within our task, and caching them helps avoid generating
146
+ excessive spans and saves a bit of time.
147
+ """
148
+
149
+ # Cached value of features.has("projects:num-events-issue-debugging", project)
150
+ num_events_issue_debugging : bool
151
+ # Cached value of features.has("organizations:workflow-engine-process-workflows", organization)
152
+ workflow_engine_process_workflows : bool
153
+
154
+ @classmethod
155
+ def create (cls , project : Project ) -> "LogConfig" :
156
+ return cls (
157
+ num_events_issue_debugging = features .has ("projects:num-events-issue-debugging" , project ),
158
+ workflow_engine_process_workflows = features .has (
159
+ "organizations:workflow-engine-process-workflows" , project .organization
160
+ ),
161
+ )
162
+
163
+
140
164
def generate_unique_queries (
141
165
condition_data : EventFrequencyConditionData , environment_id : int
142
166
) -> list [UniqueConditionQuery ]:
@@ -229,6 +253,7 @@ def parse_rulegroup_to_event_data(
229
253
230
254
231
255
def build_group_to_groupevent (
256
+ log_config : LogConfig ,
232
257
parsed_rulegroup_to_event_data : dict [tuple [int , int ], dict [str , str ]],
233
258
bulk_event_id_to_events : dict [str , Event ],
234
259
bulk_occurrence_id_to_occurrence : dict [str , IssueOccurrence ],
@@ -238,7 +263,7 @@ def build_group_to_groupevent(
238
263
239
264
project = fetch_project (project_id )
240
265
if project :
241
- if features . has ( "projects:num-events-issue-debugging" , project ) :
266
+ if log_config . num_events_issue_debugging :
242
267
logger .info (
243
268
"delayed_processing.build_group_to_groupevent_input" ,
244
269
extra = {
@@ -268,7 +293,7 @@ def build_group_to_groupevent(
268
293
group = group_id_to_group .get (int (rule_group [1 ]))
269
294
270
295
if not group or not event :
271
- if features . has ( "projects:num-events-issue-debugging" , project ) :
296
+ if log_config . num_events_issue_debugging :
272
297
logger .info (
273
298
"delayed_processing.missing_event_or_group" ,
274
299
extra = {
@@ -290,6 +315,7 @@ def build_group_to_groupevent(
290
315
291
316
292
317
def get_group_to_groupevent (
318
+ log_config : LogConfig ,
293
319
parsed_rulegroup_to_event_data : dict [tuple [int , int ], dict [str , str ]],
294
320
project_id : int ,
295
321
group_ids : set [int ],
@@ -330,6 +356,7 @@ def get_group_to_groupevent(
330
356
}
331
357
332
358
return build_group_to_groupevent (
359
+ log_config ,
333
360
relevant_rulegroup_to_event_data ,
334
361
bulk_event_id_to_events ,
335
362
bulk_occurrence_id_to_occurrence ,
@@ -457,6 +484,7 @@ def get_rules_to_fire(
457
484
458
485
459
486
def fire_rules (
487
+ log_config : LogConfig ,
460
488
rules_to_fire : DefaultDict [Rule , set [int ]],
461
489
parsed_rulegroup_to_event_data : dict [tuple [int , int ], dict [str , str ]],
462
490
alert_rules : list [Rule ],
@@ -475,11 +503,12 @@ def fire_rules(
475
503
frequency = rule .data .get ("frequency" ) or Rule .DEFAULT_FREQUENCY
476
504
freq_offset = now - timedelta (minutes = frequency )
477
505
group_to_groupevent = get_group_to_groupevent (
478
- parsed_rulegroup_to_event_data , project .id , group_ids
506
+ log_config , parsed_rulegroup_to_event_data , project .id , group_ids
479
507
)
480
- if features .has (
481
- "organizations:workflow-engine-process-workflows" , project .organization
482
- ) or features .has ("projects:num-events-issue-debugging" , project ):
508
+ if (
509
+ log_config .num_events_issue_debugging
510
+ or log_config .workflow_engine_process_workflows
511
+ ):
483
512
serialized_groups = {
484
513
group .id : group_event .event_id
485
514
for group , group_event in group_to_groupevent .items ()
@@ -530,10 +559,10 @@ def fire_rules(
530
559
rule , group , groupevent .event_id , notification_uuid
531
560
)
532
561
533
- if features . has (
534
- "organizations:workflow-engine-process-workflows-logs" ,
535
- project . organization ,
536
- ) or features . has ( "projects:num-events-issue-debugging" , project ) :
562
+ if (
563
+ log_config . workflow_engine_process_workflows
564
+ or log_config . num_events_issue_debugging
565
+ ):
537
566
logger .info (
538
567
"post_process.delayed_processing.triggered_rule" ,
539
568
extra = {
@@ -558,7 +587,7 @@ def fire_rules(
558
587
if results is None :
559
588
not_sent += 1
560
589
561
- if features . has ( "projects:num-events-issue-debugging" , project ) :
590
+ if log_config . num_events_issue_debugging :
562
591
logger .info (
563
592
"delayed_processing.rules_fired" ,
564
593
extra = {
@@ -571,7 +600,10 @@ def fire_rules(
571
600
572
601
573
602
def cleanup_redis_buffer (
574
- project : Project , rules_to_groups : DefaultDict [int , set [int ]], batch_key : str | None
603
+ log_config : LogConfig ,
604
+ project : Project ,
605
+ rules_to_groups : DefaultDict [int , set [int ]],
606
+ batch_key : str | None ,
575
607
) -> None :
576
608
hashes_to_delete = [
577
609
f"{ rule } :{ group } " for rule , groups in rules_to_groups .items () for group in groups
@@ -581,7 +613,7 @@ def cleanup_redis_buffer(
581
613
filters ["batch_key" ] = batch_key
582
614
583
615
buffer .backend .delete_hash (model = Project , filters = filters , fields = hashes_to_delete )
584
- if features . has ( "projects:num-events-issue-debugging" , project ) :
616
+ if log_config . num_events_issue_debugging :
585
617
logger .info (
586
618
"delayed_processing.cleanup_redis_buffer" ,
587
619
extra = {"hashes_to_delete" : hashes_to_delete , "project_id" : project .id },
@@ -616,6 +648,8 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k
616
648
if not project :
617
649
return
618
650
651
+ log_config = LogConfig .create (project )
652
+
619
653
rulegroup_to_event_data = fetch_rulegroup_to_event_data (project_id , batch_key )
620
654
rules_to_groups = get_rules_to_groups (rulegroup_to_event_data )
621
655
alert_rules = fetch_alert_rules (list (rules_to_groups .keys ()))
@@ -638,10 +672,7 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k
638
672
):
639
673
condition_group_results = get_condition_group_results (condition_groups , project )
640
674
641
- has_workflow_engine = features .has (
642
- "organizations:workflow-engine-process-workflows" , project .organization
643
- )
644
- if has_workflow_engine or features .has ("projects:num-events-issue-debugging" , project ):
675
+ if log_config .workflow_engine_process_workflows or log_config .num_events_issue_debugging :
645
676
serialized_results = (
646
677
{str (query ): count_dict for query , count_dict in condition_group_results .items ()}
647
678
if condition_group_results
@@ -668,7 +699,10 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k
668
699
rules_to_fire = get_rules_to_fire (
669
700
condition_group_results , rules_to_slow_conditions , rules_to_groups , project .id
670
701
)
671
- if has_workflow_engine or features .has ("projects:num-events-issue-debugging" , project ):
702
+ if (
703
+ log_config .workflow_engine_process_workflows
704
+ or log_config .num_events_issue_debugging
705
+ ):
672
706
logger .info (
673
707
"delayed_processing.rules_to_fire" ,
674
708
extra = {
@@ -694,13 +728,15 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k
694
728
):
695
729
parsed_rulegroup_to_event_data = parse_rulegroup_to_event_data (rulegroup_to_event_data )
696
730
with metrics .timer ("delayed_processing.fire_rules.duration" ):
697
- fire_rules (rules_to_fire , parsed_rulegroup_to_event_data , alert_rules , project )
731
+ fire_rules (
732
+ log_config , rules_to_fire , parsed_rulegroup_to_event_data , alert_rules , project
733
+ )
698
734
699
735
with sentry_sdk .start_span (
700
736
op = "delayed_processing.cleanup_redis_buffer" ,
701
737
name = "Clean up redis buffer in delayed processing" ,
702
738
):
703
- cleanup_redis_buffer (project , rules_to_groups , batch_key )
739
+ cleanup_redis_buffer (log_config , project , rules_to_groups , batch_key )
704
740
705
741
706
742
@delayed_processing_registry .register ("delayed_processing" ) # default delayed processing
0 commit comments