diff --git a/src/sentry/workflow_engine/processors/delayed_workflow.py b/src/sentry/workflow_engine/processors/delayed_workflow.py index d4afbf6a917f90..10e9739a7b09b6 100644 --- a/src/sentry/workflow_engine/processors/delayed_workflow.py +++ b/src/sentry/workflow_engine/processors/delayed_workflow.py @@ -4,6 +4,7 @@ from datetime import timedelta from typing import Any +import sentry_sdk from celery import Task from django.utils import timezone @@ -512,21 +513,24 @@ def process_delayed_workflows( """ Grab workflows, groups, and data condition groups from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass """ - project = fetch_project(project_id) - if not project: - return + with sentry_sdk.start_span(op="delayed_workflow.prepare_data"): + project = fetch_project(project_id) + if not project: + return - workflow_event_dcg_data = fetch_group_to_event_data(project_id, Workflow, batch_key) + workflow_event_dcg_data = fetch_group_to_event_data(project_id, Workflow, batch_key) - # Get mappings from DataConditionGroups to other info - dcg_to_groups, trigger_group_to_dcg_model = get_dcg_group_workflow_detector_data( - workflow_event_dcg_data - ) - dcg_to_workflow = trigger_group_to_dcg_model[DataConditionHandler.Group.WORKFLOW_TRIGGER].copy() - dcg_to_workflow.update(trigger_group_to_dcg_model[DataConditionHandler.Group.ACTION_FILTER]) + # Get mappings from DataConditionGroups to other info + dcg_to_groups, trigger_group_to_dcg_model = get_dcg_group_workflow_detector_data( + workflow_event_dcg_data + ) + dcg_to_workflow = trigger_group_to_dcg_model[ + DataConditionHandler.Group.WORKFLOW_TRIGGER + ].copy() + dcg_to_workflow.update(trigger_group_to_dcg_model[DataConditionHandler.Group.ACTION_FILTER]) - _, workflows_to_envs = fetch_workflows_envs(list(dcg_to_workflow.values())) - data_condition_groups = fetch_data_condition_groups(list(dcg_to_groups.keys())) + _, workflows_to_envs = fetch_workflows_envs(list(dcg_to_workflow.values())) + data_condition_groups = fetch_data_condition_groups(list(dcg_to_groups.keys())) logger.info( "delayed_workflow.workflows", @@ -537,10 +541,11 @@ def process_delayed_workflows( }, ) - # Get unique query groups to query Snuba - condition_groups = get_condition_query_groups( - data_condition_groups, dcg_to_groups, dcg_to_workflow, workflows_to_envs - ) + with sentry_sdk.start_span(op="delayed_workflow.get_condition_query_groups"): + # Get unique query groups to query Snuba + condition_groups = get_condition_query_groups( + data_condition_groups, dcg_to_groups, dcg_to_workflow, workflows_to_envs + ) if not condition_groups: return @@ -556,7 +561,8 @@ def process_delayed_workflows( "project_id": project_id, }, ) - condition_group_results = get_condition_group_results(condition_groups) + with sentry_sdk.start_span(op="delayed_workflow.get_condition_group_results"): + condition_group_results = get_condition_group_results(condition_groups) serialized_results = { str(query): count_dict for query, count_dict in condition_group_results.items() @@ -566,30 +572,40 @@ def process_delayed_workflows( extra={"condition_group_results": serialized_results, "project_id": project_id}, ) - # Evaluate DCGs - groups_to_dcgs = get_groups_to_fire( - data_condition_groups, - workflows_to_envs, - dcg_to_workflow, - dcg_to_groups, - condition_group_results, - ) + with sentry_sdk.start_span(op="delayed_workflow.get_groups_to_fire"): + # Evaluate DCGs + groups_to_dcgs = get_groups_to_fire( + data_condition_groups, + workflows_to_envs, + dcg_to_workflow, + dcg_to_groups, + condition_group_results, + ) logger.info( "delayed_workflow.groups_to_fire", extra={"groups_to_dcgs": groups_to_dcgs, "project_id": project_id}, ) - dcg_group_to_event_data, event_ids, occurrence_ids = parse_dcg_group_event_data( - workflow_event_dcg_data, groups_to_dcgs - ) - group_to_groupevent = get_group_to_groupevent( - dcg_group_to_event_data, list(groups_to_dcgs.keys()), event_ids, occurrence_ids, project_id - ) + with sentry_sdk.start_span(op="delayed_workflow.get_group_to_groupevent"): + dcg_group_to_event_data, event_ids, occurrence_ids = parse_dcg_group_event_data( + workflow_event_dcg_data, groups_to_dcgs + ) + group_to_groupevent = get_group_to_groupevent( + dcg_group_to_event_data, + list(groups_to_dcgs.keys()), + event_ids, + occurrence_ids, + project_id, + ) - fire_actions_for_groups(groups_to_dcgs, trigger_group_to_dcg_model, group_to_groupevent) + with sentry_sdk.start_span(op="delayed_workflow.fire_actions"): + fire_actions_for_groups(groups_to_dcgs, trigger_group_to_dcg_model, group_to_groupevent) - cleanup_redis_buffer(project_id, workflow_event_dcg_data, batch_key) + with sentry_sdk.start_span( + op="delayed_workflow.cleanup_redis_buffer", + ): + cleanup_redis_buffer(project_id, workflow_event_dcg_data, batch_key) @delayed_processing_registry.register("delayed_workflow")