Skip to content

chore(ACI): Add 1:1 metrics for dual processing #91840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/sentry/incidents/subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,16 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
"organizations:anomaly-detection-rollout", self.subscription.project.organization
)

alert_triggered_tags = {
"detection_type": self.alert_rule.detection_type,
"organization_id": None,
}
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.subscription.project.organization,
):
alert_triggered_tags["organization_id"] = self.alert_rule.organization_id

potential_anomalies = None
if (
has_anomaly_detection
Expand Down Expand Up @@ -505,7 +515,7 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
) and not self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE):
metrics.incr(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": self.alert_rule.detection_type},
tags=alert_triggered_tags,
)
incident_trigger = self.trigger_alert_threshold(
trigger, aggregation_value
Expand All @@ -522,7 +532,7 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
):
metrics.incr(
"incidents.alert_rules.threshold.resolve",
tags={"detection_type": self.alert_rule.detection_type},
tags=alert_triggered_tags,
)
incident_trigger = self.trigger_resolve_threshold(
trigger, aggregation_value
Expand All @@ -544,7 +554,7 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
# And the trigger is not yet active
metrics.incr(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": self.alert_rule.detection_type},
tags=alert_triggered_tags,
)
# triggering a threshold will create an incident and set the status to active
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
Expand All @@ -562,7 +572,7 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
):
metrics.incr(
"incidents.alert_rules.threshold.resolve",
tags={"detection_type": self.alert_rule.detection_type},
tags=alert_triggered_tags,
)
incident_trigger = self.trigger_resolve_threshold(
trigger, aggregation_value
Expand Down
21 changes: 17 additions & 4 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ def process_workflows(event_data: WorkflowEventData) -> set[Workflow]:

# TODO: remove fetching org, only used for feature flag checks
organization = detector.project.organization
workflow_metric_tags = {
"detector_type": detector.type,
"organization_id": None,
}
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs", organization
):
workflow_metric_tags["organization_id"] = organization.id

# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
workflows = set(
Expand All @@ -231,7 +239,7 @@ def process_workflows(event_data: WorkflowEventData) -> set[Workflow]:
metrics.incr(
"workflow_engine.process_workflows",
len(workflows),
tags={"detector_type": detector.type},
tags=workflow_metric_tags,
)

logger.info(
Expand All @@ -254,7 +262,7 @@ def process_workflows(event_data: WorkflowEventData) -> set[Workflow]:
metrics.incr(
"workflow_engine.process_workflows.triggered_workflows",
len(triggered_workflows),
tags={"detector_type": detector.type},
tags=workflow_metric_tags,
)

logger.info(
Expand All @@ -275,7 +283,7 @@ def process_workflows(event_data: WorkflowEventData) -> set[Workflow]:
metrics.incr(
"workflow_engine.process_workflows.actions",
amount=len(actions),
tags={"detector_type": detector.type},
tags=workflow_metric_tags,
)

logger.info(
Expand All @@ -300,7 +308,7 @@ def process_workflows(event_data: WorkflowEventData) -> set[Workflow]:
metrics.incr(
"workflow_engine.process_workflows.triggered_actions",
amount=len(actions),
tags={"detector_type": detector.type},
tags=workflow_metric_tags,
)
logger.info(
"workflow_engine.process_workflows.triggered_actions (batch)",
Expand All @@ -312,5 +320,10 @@ def process_workflows(event_data: WorkflowEventData) -> set[Workflow]:
"detector_type": detector.type,
},
)
# in order to check if workflow engine is firing 1:1 with the old system, we must only count once rather than each action
metrics.incr(
"workflow_engine.process_workflows.fired_actions",
tags=workflow_metric_tags,
)

return triggered_workflows
67 changes: 65 additions & 2 deletions tests/sentry/incidents/test_subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,9 @@ def test_seer_call_failed_parse(self, mock_logger, mock_seer_request):
)
assert result is None

@patch("sentry.incidents.subscription_processor.metrics")
@patch("sentry.incidents.utils.metric_issue_poc.create_or_update_metric_issue")
def test_alert(self, create_metric_issue_mock):
def test_alert(self, create_metric_issue_mock, mock_metrics):
# Verify that an alert rule that only expects a single update to be over the
# alert threshold triggers correctly
rule = self.rule
Expand Down Expand Up @@ -1054,6 +1055,31 @@ def test_alert(self, create_metric_issue_mock):
],
)
create_metric_issue_mock.assert_not_called()
mock_metrics.incr.assert_has_calls(
[
call(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": "static", "organization_id": None},
),
call("incidents.alert_rules.trigger", tags={"type": "fire"}),
],
)

@with_feature("organizations:workflow-engine-metric-alert-dual-processing-logs")
@patch("sentry.incidents.subscription_processor.metrics")
def test_alert_metrics(self, mock_metrics):
rule = self.rule
trigger = self.trigger
self.send_update(rule, trigger.alert_threshold + 1)
mock_metrics.incr.assert_has_calls(
[
call(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": "static", "organization_id": rule.organization_id},
),
call("incidents.alert_rules.trigger", tags={"type": "fire"}),
],
)

def test_alert_dedupe(self):
# Verify that an alert rule that only expects a single update to be over the
Expand Down Expand Up @@ -1174,8 +1200,9 @@ def test_no_active_incident_resolve(self):
self.assert_trigger_does_not_exist(trigger)
self.assert_action_handler_called_with_actions(None, [])

@patch("sentry.incidents.subscription_processor.metrics")
@patch("sentry.incidents.utils.metric_issue_poc.create_or_update_metric_issue")
def test_resolve(self, create_metric_issue_mock):
def test_resolve(self, create_metric_issue_mock, mock_metrics):
# Verify that an alert rule that only expects a single update to be under the
# resolve threshold triggers correctly
rule = self.rule
Expand Down Expand Up @@ -1218,6 +1245,42 @@ def test_resolve(self, create_metric_issue_mock):
],
)
create_metric_issue_mock.assert_not_called()
mock_metrics.incr.assert_has_calls(
[
call(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": "static", "organization_id": None},
),
call("incidents.alert_rules.trigger", tags={"type": "fire"}),
call(
"incidents.alert_rules.threshold.resolve",
tags={"detection_type": "static", "organization_id": None},
),
call("incidents.alert_rules.trigger", tags={"type": "resolve"}),
]
)

@with_feature("organizations:workflow-engine-metric-alert-dual-processing-logs")
@patch("sentry.incidents.subscription_processor.metrics")
def test_resolve_metrics(self, mock_metrics):
rule = self.rule
trigger = self.trigger
self.send_update(rule, trigger.alert_threshold + 1, timedelta(minutes=-2))
self.send_update(rule, rule.resolve_threshold - 1, timedelta(minutes=-1))
mock_metrics.incr.assert_has_calls(
[
call(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": "static", "organization_id": rule.organization_id},
),
call("incidents.alert_rules.trigger", tags={"type": "fire"}),
call(
"incidents.alert_rules.threshold.resolve",
tags={"detection_type": "static", "organization_id": rule.organization_id},
),
call("incidents.alert_rules.trigger", tags={"type": "resolve"}),
]
)

def test_resolve_multiple_threshold_periods(self):
# Verify that a rule that expects two consecutive updates to be under the
Expand Down
13 changes: 13 additions & 0 deletions tests/sentry/workflow_engine/processors/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ def test_metrics_triggered_actions(self, mock_incr):
tags={"detector_type": self.error_detector.type},
)

@with_feature("organizations:workflow-engine-process-workflows")
@with_feature("organizations:workflow-engine-metric-alert-dual-processing-logs")
@patch("sentry.utils.metrics.incr")
def test_metrics_issue_dual_processing_metrics(self, mock_incr):
process_workflows(self.event_data)
mock_incr.assert_any_call(
"workflow_engine.process_workflows.fired_actions",
tags={
"detector_type": self.error_detector.type,
"organization_id": self.error_detector.project.organization_id,
},
)


class TestEvaluateWorkflowTriggers(BaseWorkflowTest):
def setUp(self):
Expand Down
Loading