Skip to content

Commit 07e915f

Browse files
authored
chore(aci): manually add spans for delayed workflow processing (#91908)
1 parent 896150c commit 07e915f

File tree

1 file changed

+49
-33
lines changed

1 file changed

+49
-33
lines changed

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import timedelta
55
from typing import Any
66

7+
import sentry_sdk
78
from celery import Task
89
from django.utils import timezone
910

@@ -512,21 +513,24 @@ def process_delayed_workflows(
512513
"""
513514
Grab workflows, groups, and data condition groups from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass
514515
"""
515-
project = fetch_project(project_id)
516-
if not project:
517-
return
516+
with sentry_sdk.start_span(op="delayed_workflow.prepare_data"):
517+
project = fetch_project(project_id)
518+
if not project:
519+
return
518520

519-
workflow_event_dcg_data = fetch_group_to_event_data(project_id, Workflow, batch_key)
521+
workflow_event_dcg_data = fetch_group_to_event_data(project_id, Workflow, batch_key)
520522

521-
# Get mappings from DataConditionGroups to other info
522-
dcg_to_groups, trigger_group_to_dcg_model = get_dcg_group_workflow_detector_data(
523-
workflow_event_dcg_data
524-
)
525-
dcg_to_workflow = trigger_group_to_dcg_model[DataConditionHandler.Group.WORKFLOW_TRIGGER].copy()
526-
dcg_to_workflow.update(trigger_group_to_dcg_model[DataConditionHandler.Group.ACTION_FILTER])
523+
# Get mappings from DataConditionGroups to other info
524+
dcg_to_groups, trigger_group_to_dcg_model = get_dcg_group_workflow_detector_data(
525+
workflow_event_dcg_data
526+
)
527+
dcg_to_workflow = trigger_group_to_dcg_model[
528+
DataConditionHandler.Group.WORKFLOW_TRIGGER
529+
].copy()
530+
dcg_to_workflow.update(trigger_group_to_dcg_model[DataConditionHandler.Group.ACTION_FILTER])
527531

528-
_, workflows_to_envs = fetch_workflows_envs(list(dcg_to_workflow.values()))
529-
data_condition_groups = fetch_data_condition_groups(list(dcg_to_groups.keys()))
532+
_, workflows_to_envs = fetch_workflows_envs(list(dcg_to_workflow.values()))
533+
data_condition_groups = fetch_data_condition_groups(list(dcg_to_groups.keys()))
530534

531535
logger.info(
532536
"delayed_workflow.workflows",
@@ -537,10 +541,11 @@ def process_delayed_workflows(
537541
},
538542
)
539543

540-
# Get unique query groups to query Snuba
541-
condition_groups = get_condition_query_groups(
542-
data_condition_groups, dcg_to_groups, dcg_to_workflow, workflows_to_envs
543-
)
544+
with sentry_sdk.start_span(op="delayed_workflow.get_condition_query_groups"):
545+
# Get unique query groups to query Snuba
546+
condition_groups = get_condition_query_groups(
547+
data_condition_groups, dcg_to_groups, dcg_to_workflow, workflows_to_envs
548+
)
544549

545550
if not condition_groups:
546551
return
@@ -556,7 +561,8 @@ def process_delayed_workflows(
556561
"project_id": project_id,
557562
},
558563
)
559-
condition_group_results = get_condition_group_results(condition_groups)
564+
with sentry_sdk.start_span(op="delayed_workflow.get_condition_group_results"):
565+
condition_group_results = get_condition_group_results(condition_groups)
560566

561567
serialized_results = {
562568
str(query): count_dict for query, count_dict in condition_group_results.items()
@@ -566,30 +572,40 @@ def process_delayed_workflows(
566572
extra={"condition_group_results": serialized_results, "project_id": project_id},
567573
)
568574

569-
# Evaluate DCGs
570-
groups_to_dcgs = get_groups_to_fire(
571-
data_condition_groups,
572-
workflows_to_envs,
573-
dcg_to_workflow,
574-
dcg_to_groups,
575-
condition_group_results,
576-
)
575+
with sentry_sdk.start_span(op="delayed_workflow.get_groups_to_fire"):
576+
# Evaluate DCGs
577+
groups_to_dcgs = get_groups_to_fire(
578+
data_condition_groups,
579+
workflows_to_envs,
580+
dcg_to_workflow,
581+
dcg_to_groups,
582+
condition_group_results,
583+
)
577584

578585
logger.info(
579586
"delayed_workflow.groups_to_fire",
580587
extra={"groups_to_dcgs": groups_to_dcgs, "project_id": project_id},
581588
)
582589

583-
dcg_group_to_event_data, event_ids, occurrence_ids = parse_dcg_group_event_data(
584-
workflow_event_dcg_data, groups_to_dcgs
585-
)
586-
group_to_groupevent = get_group_to_groupevent(
587-
dcg_group_to_event_data, list(groups_to_dcgs.keys()), event_ids, occurrence_ids, project_id
588-
)
590+
with sentry_sdk.start_span(op="delayed_workflow.get_group_to_groupevent"):
591+
dcg_group_to_event_data, event_ids, occurrence_ids = parse_dcg_group_event_data(
592+
workflow_event_dcg_data, groups_to_dcgs
593+
)
594+
group_to_groupevent = get_group_to_groupevent(
595+
dcg_group_to_event_data,
596+
list(groups_to_dcgs.keys()),
597+
event_ids,
598+
occurrence_ids,
599+
project_id,
600+
)
589601

590-
fire_actions_for_groups(groups_to_dcgs, trigger_group_to_dcg_model, group_to_groupevent)
602+
with sentry_sdk.start_span(op="delayed_workflow.fire_actions"):
603+
fire_actions_for_groups(groups_to_dcgs, trigger_group_to_dcg_model, group_to_groupevent)
591604

592-
cleanup_redis_buffer(project_id, workflow_event_dcg_data, batch_key)
605+
with sentry_sdk.start_span(
606+
op="delayed_workflow.cleanup_redis_buffer",
607+
):
608+
cleanup_redis_buffer(project_id, workflow_event_dcg_data, batch_key)
593609

594610

595611
@delayed_processing_registry.register("delayed_workflow")

0 commit comments

Comments
 (0)