Skip to content

Commit ee9affd

Browse files
getsentry-botceorourke
authored andcommitted
Revert "ref(ACI): Move get_comparison_aggregation_value out of subscription processor (#91783)"
This reverts commit fbc6095. Co-authored-by: ceorourke <29959063+ceorourke@users.noreply.github.com>
1 parent aa612f6 commit ee9affd

File tree

6 files changed

+233
-193
lines changed

6 files changed

+233
-193
lines changed

src/sentry/incidents/subscription_processor.py

Lines changed: 127 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from django.db import router, transaction
1212
from django.utils import timezone
1313
from sentry_redis_tools.retrying_cluster import RetryingRedisCluster
14+
from snuba_sdk import Column, Condition, Limit, Op
1415

1516
from sentry import features
1617
from sentry.constants import ObjectStatus
@@ -41,7 +42,7 @@
4142
from sentry.incidents.tasks import handle_trigger_action
4243
from sentry.incidents.utils.metric_issue_poc import create_or_update_metric_issue
4344
from sentry.incidents.utils.process_update_helpers import (
44-
get_comparison_aggregation_value,
45+
get_aggregation_value_helper,
4546
get_crash_rate_alert_metrics_aggregation_value_helper,
4647
)
4748
from sentry.incidents.utils.types import (
@@ -50,14 +51,20 @@
5051
QuerySubscriptionUpdate,
5152
)
5253
from sentry.models.project import Project
54+
from sentry.search.eap.utils import add_start_end_conditions
5355
from sentry.seer.anomaly_detection.get_anomaly_data import get_anomaly_data_from_seer
5456
from sentry.seer.anomaly_detection.utils import anomaly_has_confidence, has_anomaly
5557
from sentry.snuba.dataset import Dataset
56-
from sentry.snuba.models import QuerySubscription
58+
from sentry.snuba.entity_subscription import (
59+
ENTITY_TIME_COLUMNS,
60+
get_entity_key_from_query_builder,
61+
get_entity_subscription_from_snuba_query,
62+
)
63+
from sentry.snuba.models import QuerySubscription, SnubaQuery
5764
from sentry.snuba.subscriptions import delete_snuba_subscription
58-
from sentry.utils import metrics, redis
65+
from sentry.utils import metrics, redis, snuba_rpc
5966
from sentry.utils.dates import to_datetime
60-
from sentry.workflow_engine.models import DataPacket, Detector
67+
from sentry.workflow_engine.models import DataPacket
6168
from sentry.workflow_engine.processors.data_packet import process_data_packets
6269

6370
logger = logging.getLogger(__name__)
@@ -209,6 +216,106 @@ def calculate_resolve_threshold(self, trigger: AlertRuleTrigger) -> float:
209216
threshold: float = trigger.alert_threshold + resolve_add
210217
return threshold
211218

219+
def get_comparison_aggregation_value(
220+
self, subscription_update: QuerySubscriptionUpdate
221+
) -> float | None:
222+
# NOTE (mifu67): we create this helper because we also use it in the new detector processing flow
223+
aggregation_value = get_aggregation_value_helper(subscription_update)
224+
if self.alert_rule.comparison_delta is None:
225+
return aggregation_value
226+
227+
# For comparison alerts run a query over the comparison period and use it to calculate the
228+
# % change.
229+
delta = timedelta(seconds=self.alert_rule.comparison_delta)
230+
end = subscription_update["timestamp"] - delta
231+
snuba_query = self.subscription.snuba_query
232+
start = end - timedelta(seconds=snuba_query.time_window)
233+
234+
entity_subscription = get_entity_subscription_from_snuba_query(
235+
snuba_query,
236+
self.subscription.project.organization_id,
237+
)
238+
dataset = Dataset(snuba_query.dataset)
239+
query_type = SnubaQuery.Type(snuba_query.type)
240+
project_ids = [self.subscription.project_id]
241+
242+
comparison_aggregate: None | float = None
243+
if query_type == SnubaQuery.Type.PERFORMANCE and dataset == Dataset.EventsAnalyticsPlatform:
244+
try:
245+
rpc_time_series_request = entity_subscription.build_rpc_request(
246+
query=snuba_query.query,
247+
project_ids=project_ids,
248+
environment=snuba_query.environment,
249+
params={
250+
"organization_id": self.subscription.project.organization.id,
251+
"project_id": project_ids,
252+
},
253+
referrer="subscription_processor.comparison_query",
254+
)
255+
256+
rpc_time_series_request = add_start_end_conditions(
257+
rpc_time_series_request, start, end
258+
)
259+
260+
rpc_response = snuba_rpc.timeseries_rpc([rpc_time_series_request])[0]
261+
if len(rpc_response.result_timeseries):
262+
comparison_aggregate = rpc_response.result_timeseries[0].data_points[0].data
263+
264+
except Exception:
265+
logger.exception(
266+
"Failed to run RPC comparison query",
267+
extra={
268+
"alert_rule_id": self.alert_rule.id,
269+
"subscription_id": subscription_update.get("subscription_id"),
270+
"organization_id": self.alert_rule.organization_id,
271+
},
272+
)
273+
return None
274+
275+
else:
276+
try:
277+
# TODO: determine whether we need to include the subscription query_extra here
278+
query_builder = entity_subscription.build_query_builder(
279+
query=snuba_query.query,
280+
project_ids=project_ids,
281+
environment=snuba_query.environment,
282+
params={
283+
"organization_id": self.subscription.project.organization.id,
284+
"project_id": project_ids,
285+
"start": start,
286+
"end": end,
287+
},
288+
)
289+
time_col = ENTITY_TIME_COLUMNS[get_entity_key_from_query_builder(query_builder)]
290+
query_builder.add_conditions(
291+
[
292+
Condition(Column(time_col), Op.GTE, start),
293+
Condition(Column(time_col), Op.LT, end),
294+
]
295+
)
296+
query_builder.limit = Limit(1)
297+
results = query_builder.run_query(
298+
referrer="subscription_processor.comparison_query"
299+
)
300+
comparison_aggregate = list(results["data"][0].values())[0]
301+
302+
except Exception:
303+
logger.exception(
304+
"Failed to run comparison query",
305+
extra={
306+
"alert_rule_id": self.alert_rule.id,
307+
"subscription_id": subscription_update.get("subscription_id"),
308+
"organization_id": self.alert_rule.organization_id,
309+
},
310+
)
311+
return None
312+
313+
if not comparison_aggregate:
314+
metrics.incr("incidents.alert_rules.skipping_update_comparison_value_invalid")
315+
return None
316+
317+
return (aggregation_value / comparison_aggregate) * 100
318+
212319
def get_crash_rate_alert_metrics_aggregation_value(
213320
self, subscription_update: QuerySubscriptionUpdate
214321
) -> float | None:
@@ -235,22 +342,13 @@ def get_crash_rate_alert_metrics_aggregation_value(
235342
self.reset_trigger_counts()
236343
return aggregation_value
237344

238-
def get_aggregation_value(
239-
self, subscription_update: QuerySubscriptionUpdate, comparison_delta: int | None = None
240-
) -> float | None:
345+
def get_aggregation_value(self, subscription_update: QuerySubscriptionUpdate) -> float | None:
241346
if self.subscription.snuba_query.dataset == Dataset.Metrics.value:
242347
aggregation_value = self.get_crash_rate_alert_metrics_aggregation_value(
243348
subscription_update
244349
)
245350
else:
246-
aggregation_value = get_comparison_aggregation_value(
247-
subscription_update=subscription_update,
248-
snuba_query=self.subscription.snuba_query,
249-
organization_id=self.subscription.project.organization.id,
250-
project_ids=[self.subscription.project_id],
251-
comparison_delta=comparison_delta,
252-
alert_rule_id=self.alert_rule.id,
253-
)
351+
aggregation_value = self.get_comparison_aggregation_value(subscription_update)
254352

255353
return aggregation_value
256354

@@ -268,15 +366,14 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
268366
if self.subscription.project.status != ObjectStatus.ACTIVE:
269367
metrics.incr("incidents.alert_rules.ignore_deleted_project")
270368
return
271-
272-
organization = self.subscription.project.organization
273-
274-
if dataset == "events" and not features.has("organizations:incidents", organization):
369+
if dataset == "events" and not features.has(
370+
"organizations:incidents", self.subscription.project.organization
371+
):
275372
# They have downgraded since these subscriptions have been created. So we just ignore updates for now.
276373
metrics.incr("incidents.alert_rules.ignore_update_missing_incidents")
277374
return
278375
elif dataset == "transactions" and not features.has(
279-
"organizations:performance-view", organization
376+
"organizations:performance-view", self.subscription.project.organization
280377
):
281378
# They have downgraded since these subscriptions have been created. So we just ignore updates for now.
282379
metrics.incr("incidents.alert_rules.ignore_update_missing_incidents_performance")
@@ -293,6 +390,8 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
293390
metrics.incr("incidents.alert_rules.skipping_already_processed_update")
294391
return
295392

393+
aggregation_value = self.get_aggregation_value(subscription_update)
394+
296395
self.last_update = subscription_update["timestamp"]
297396

298397
if (
@@ -309,30 +408,11 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
309408
},
310409
)
311410

312-
has_metric_alert_processing = features.has(
313-
"organizations:workflow-engine-metric-alert-processing", organization
314-
)
315-
comparison_delta = None
316-
317-
if has_metric_alert_processing:
318-
try:
319-
detector = Detector.objects.get(
320-
data_sources__source_id=str(self.subscription.id),
321-
data_sources__type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION,
322-
)
323-
except Detector.DoesNotExist:
324-
logger.exception(
325-
"Detector not found", extra={"subscription_id": self.subscription.id}
326-
)
327-
328-
comparison_delta = detector.config.get("comparison_delta")
329-
else:
330-
comparison_delta = self.alert_rule.comparison_delta
331-
332-
aggregation_value = self.get_aggregation_value(subscription_update, comparison_delta)
333-
334411
if aggregation_value is not None:
335-
if has_metric_alert_processing:
412+
if features.has(
413+
"organizations:workflow-engine-metric-alert-processing",
414+
self.subscription.project.organization,
415+
):
336416
packet = MetricDetectorUpdate(
337417
entity=subscription_update.get("entity", ""),
338418
subscription_id=subscription_update["subscription_id"],
@@ -358,8 +438,10 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
358438
)
359439

360440
has_anomaly_detection = features.has(
361-
"organizations:anomaly-detection-alerts", organization
362-
) and features.has("organizations:anomaly-detection-rollout", organization)
441+
"organizations:anomaly-detection-alerts", self.subscription.project.organization
442+
) and features.has(
443+
"organizations:anomaly-detection-rollout", self.subscription.project.organization
444+
)
363445

364446
potential_anomalies = None
365447
if (

src/sentry/incidents/tasks.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
IncidentStatus,
1313
IncidentStatusMethod,
1414
)
15-
from sentry.incidents.utils.constants import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
15+
from sentry.incidents.utils.constants import (
16+
INCIDENTS_SNUBA_SUBSCRIPTION_TYPE,
17+
SUBSCRIPTION_METRICS_LOGGER,
18+
)
1619
from sentry.incidents.utils.types import QuerySubscriptionUpdate
1720
from sentry.models.project import Project
1821
from sentry.silo.base import SiloMode
22+
from sentry.snuba.dataset import Dataset
1923
from sentry.snuba.models import QuerySubscription
2024
from sentry.snuba.query_subscriptions.consumer import register_subscriber
2125
from sentry.tasks.base import instrumented_task
@@ -27,6 +31,36 @@
2731
logger = logging.getLogger(__name__)
2832

2933

34+
@register_subscriber(SUBSCRIPTION_METRICS_LOGGER)
35+
def handle_subscription_metrics_logger(
36+
subscription_update: QuerySubscriptionUpdate, subscription: QuerySubscription
37+
) -> None:
38+
"""
39+
Logs results from a `QuerySubscription`.
40+
"""
41+
from sentry.incidents.subscription_processor import SubscriptionProcessor
42+
43+
try:
44+
if subscription.snuba_query.dataset == Dataset.Metrics.value:
45+
processor = SubscriptionProcessor(subscription)
46+
# XXX: Temporary hack so that we can extract these values without raising an exception
47+
processor.reset_trigger_counts = lambda *arg, **kwargs: None # type: ignore[method-assign]
48+
aggregation_value = processor.get_aggregation_value(subscription_update)
49+
50+
logger.info(
51+
"handle_subscription_metrics_logger.message",
52+
extra={
53+
"subscription_id": subscription.id,
54+
"dataset": subscription.snuba_query.dataset,
55+
"snuba_subscription_id": subscription.subscription_id,
56+
"result": subscription_update,
57+
"aggregation_value": aggregation_value,
58+
},
59+
)
60+
except Exception:
61+
logger.exception("Failed to log subscription results")
62+
63+
3064
@register_subscriber(INCIDENTS_SNUBA_SUBSCRIPTION_TYPE)
3165
def handle_snuba_query_update(
3266
subscription_update: QuerySubscriptionUpdate, subscription: QuerySubscription
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
INCIDENTS_SNUBA_SUBSCRIPTION_TYPE = "incidents"
22
INCIDENT_SNAPSHOT_BATCH_SIZE = 50
3+
SUBSCRIPTION_METRICS_LOGGER = "subscription_metrics_logger"

0 commit comments

Comments
 (0)