-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
feat(aci milestone 3): anomaly detection condition handler #88647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c1dba54
14d84dd
30eab83
9054cbb
8e20c36
9d9c7a5
a999884
a4eb040
4eec4a9
6181e45
7afc678
aa18e49
9b6bcf4
a749210
fec8351
7c0cdb5
bfbb3a4
1ca68e9
f67cbc5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
__all__ = [ | ||
"AnomalyDetectionHandler", | ||
] | ||
|
||
from .anomaly_detection_handler import AnomalyDetectionHandler |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import logging | ||
from typing import Any | ||
|
||
from django.conf import settings | ||
|
||
from sentry.net.http import connection_from_url | ||
from sentry.seer.anomaly_detection.get_anomaly_data import get_anomaly_data_from_seer | ||
from sentry.seer.anomaly_detection.types import ( | ||
AnomalyDetectionSeasonality, | ||
AnomalyDetectionSensitivity, | ||
AnomalyDetectionThresholdType, | ||
AnomalyType, | ||
) | ||
from sentry.snuba.models import QuerySubscription | ||
from sentry.workflow_engine.models import Condition, DataPacket | ||
from sentry.workflow_engine.registry import condition_handler_registry | ||
from sentry.workflow_engine.types import DataConditionHandler, DetectorPriorityLevel | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
SEER_ANOMALY_DETECTION_CONNECTION_POOL = connection_from_url( | ||
settings.SEER_ANOMALY_DETECTION_URL, | ||
timeout=settings.SEER_ANOMALY_DETECTION_TIMEOUT, | ||
) | ||
|
||
SEER_EVALUATION_TO_DETECTOR_PRIORITY = { | ||
AnomalyType.HIGH_CONFIDENCE.value: DetectorPriorityLevel.HIGH, | ||
AnomalyType.LOW_CONFIDENCE.value: DetectorPriorityLevel.OK, # Seer doesn't support warning alerts yet | ||
AnomalyType.NONE.value: DetectorPriorityLevel.OK, | ||
} | ||
|
||
|
||
# placeholder until we create this in the workflow engine model | ||
class DetectorError(Exception): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling this out as a TODO (need to add it to the workflow engine + build exception handling for it within There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this could live in either |
||
pass | ||
|
||
|
||
@condition_handler_registry.register(Condition.ANOMALY_DETECTION) | ||
class AnomalyDetectionHandler(DataConditionHandler[DataPacket]): | ||
group = DataConditionHandler.Group.DETECTOR_TRIGGER | ||
comparison_json_schema = { | ||
"type": "object", | ||
"properties": { | ||
"sensitivity": { | ||
"type": "string", | ||
"enum": [*AnomalyDetectionSensitivity], | ||
}, | ||
"seasonality": { | ||
"type": "string", | ||
"enum": [*AnomalyDetectionSeasonality], | ||
}, | ||
"threshold_type": { | ||
"type": "integer", | ||
"enum": [*AnomalyDetectionThresholdType], | ||
}, | ||
}, | ||
"required": ["sensitivity", "seasonality", "threshold_type"], | ||
"additionalProperties": False, | ||
} | ||
|
||
@staticmethod | ||
def evaluate_value(update: DataPacket, comparison: Any) -> DetectorPriorityLevel: | ||
sensitivity = comparison["sensitivity"] | ||
seasonality = comparison["seasonality"] | ||
threshold_type = comparison["threshold_type"] | ||
|
||
subscription: QuerySubscription = QuerySubscription.objects.get(id=int(update.source_id)) | ||
|
||
subscription_update = update.packet | ||
|
||
anomaly_data = get_anomaly_data_from_seer( | ||
sensitivity=sensitivity, | ||
seasonality=seasonality, | ||
threshold_type=threshold_type, | ||
subscription=subscription, | ||
subscription_update=subscription_update, | ||
) | ||
# covers both None and [] | ||
if not anomaly_data: | ||
# something went wrong during evaluation | ||
raise DetectorError("Error during Seer data evaluation process.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this return False instead of raising an exception, if you're going to build exception handling for it? what's the outcome of the exception handling? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want this to raise an error instead of returning False, because returning False indicates that we should set the detector priority level to OK. I think we actually want to change this condition handler to emit multiple detector priority levels according to the anomaly detection result 🤔 |
||
|
||
anomaly_type = anomaly_data[0].get("anomaly", {}).get("anomaly_type") | ||
if anomaly_type == AnomalyType.NO_DATA.value: | ||
raise DetectorError("Project doesn't have enough data for detector to evaluate") | ||
elif anomaly_type is None: | ||
raise DetectorError("Seer response contained no evaluation data") | ||
|
||
return SEER_EVALUATION_TO_DETECTOR_PRIORITY[anomaly_type] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,17 +5,22 @@ | |
|
||
from sentry.conf.server import SEER_ANOMALY_DETECTION_ENDPOINT_URL | ||
from sentry.incidents.models.alert_rule import AlertRule | ||
from sentry.incidents.utils.types import MetricDetectorUpdate | ||
from sentry.net.http import connection_from_url | ||
from sentry.seer.anomaly_detection.types import ( | ||
AlertInSeer, | ||
AnomalyDetectionConfig, | ||
AnomalyDetectionSeasonality, | ||
AnomalyDetectionSensitivity, | ||
AnomalyDetectionThresholdType, | ||
DataSourceType, | ||
DetectAnomaliesRequest, | ||
DetectAnomaliesResponse, | ||
TimeSeriesPoint, | ||
) | ||
from sentry.seer.anomaly_detection.utils import translate_direction | ||
from sentry.seer.signed_seer_api import make_signed_seer_api_request | ||
from sentry.snuba.models import QuerySubscription | ||
from sentry.snuba.models import QuerySubscription, SnubaQuery | ||
from sentry.utils import json, metrics | ||
from sentry.utils.json import JSONDecodeError | ||
|
||
|
@@ -27,7 +32,8 @@ | |
) | ||
|
||
|
||
def get_anomaly_data_from_seer( | ||
# TODO: delete this once we deprecate the AlertRule model | ||
def get_anomaly_data_from_seer_legacy( | ||
alert_rule: AlertRule, | ||
subscription: QuerySubscription, | ||
last_update: float, | ||
|
@@ -153,3 +159,120 @@ def get_anomaly_data_from_seer( | |
) | ||
return None | ||
return ts | ||
|
||
|
||
def get_anomaly_data_from_seer( | ||
sensitivity: AnomalyDetectionSensitivity, | ||
seasonality: AnomalyDetectionSeasonality, | ||
threshold_type: AnomalyDetectionThresholdType, | ||
subscription: QuerySubscription, | ||
subscription_update: MetricDetectorUpdate, | ||
) -> list[TimeSeriesPoint] | None: | ||
snuba_query: SnubaQuery = subscription.snuba_query | ||
aggregation_value = subscription_update["values"].get("value") | ||
source_id = subscription.id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we're passing in the query subscription, we don't need to pass source_id and source_type. |
||
source_type = DataSourceType.SNUBA_QUERY_SUBSCRIPTION | ||
if aggregation_value is None: | ||
logger.error( | ||
"Invalid aggregation value", extra={"source_id": source_id, "source_type": source_type} | ||
) | ||
return None | ||
|
||
extra_data = { | ||
"subscription_id": subscription.id, | ||
"organization_id": subscription.project.organization.id, | ||
"project_id": subscription.project_id, | ||
"source_id": source_id, | ||
mifu67 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"source_type": source_type, | ||
} | ||
|
||
anomaly_detection_config = AnomalyDetectionConfig( | ||
time_period=int(snuba_query.time_window / 60), | ||
sensitivity=sensitivity, | ||
direction=translate_direction(threshold_type), | ||
expected_seasonality=seasonality, | ||
) | ||
context = AlertInSeer( | ||
source_id=source_id, | ||
source_type=source_type, | ||
cur_window=TimeSeriesPoint( | ||
timestamp=subscription_update["timestamp"].timestamp(), value=aggregation_value | ||
), | ||
) | ||
detect_anomalies_request = DetectAnomaliesRequest( | ||
organization_id=subscription.project.organization.id, | ||
project_id=subscription.project_id, | ||
config=anomaly_detection_config, | ||
context=context, | ||
) | ||
extra_data["dataset"] = snuba_query.dataset | ||
try: | ||
logger.info("Sending subscription update data to Seer", extra=extra_data) | ||
response = make_signed_seer_api_request( | ||
SEER_ANOMALY_DETECTION_CONNECTION_POOL, | ||
SEER_ANOMALY_DETECTION_ENDPOINT_URL, | ||
json.dumps(detect_anomalies_request).encode("utf-8"), | ||
) | ||
except (TimeoutError, MaxRetryError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everything below this line is error handling (copied from the legacy method) |
||
logger.warning("Timeout error when hitting anomaly detection endpoint", extra=extra_data) | ||
return None | ||
|
||
if response.status > 400: | ||
logger.error( | ||
"Error when hitting Seer detect anomalies endpoint", | ||
extra={ | ||
"response_data": response.data, | ||
**extra_data, | ||
}, | ||
) | ||
return None | ||
try: | ||
decoded_data = response.data.decode("utf-8") | ||
except AttributeError: | ||
logger.exception( | ||
"Failed to parse Seer anomaly detection response", | ||
extra={ | ||
"ad_config": anomaly_detection_config, | ||
"context": context, | ||
"response_data": response.data, | ||
"response_code": response.status, | ||
}, | ||
) | ||
return None | ||
|
||
try: | ||
results: DetectAnomaliesResponse = json.loads(decoded_data) | ||
except JSONDecodeError: | ||
logger.exception( | ||
"Failed to parse Seer anomaly detection response", | ||
extra={ | ||
"ad_config": anomaly_detection_config, | ||
"context": context, | ||
"response_data": decoded_data, | ||
"response_code": response.status, | ||
}, | ||
) | ||
return None | ||
|
||
if not results.get("success"): | ||
logger.error( | ||
"Error when hitting Seer detect anomalies endpoint", | ||
extra={ | ||
"error_message": results.get("message", ""), | ||
**extra_data, | ||
}, | ||
) | ||
return None | ||
|
||
ts = results.get("timeseries") | ||
if not ts: | ||
logger.warning( | ||
"Seer anomaly detection response returned no potential anomalies", | ||
extra={ | ||
"ad_config": anomaly_detection_config, | ||
"context": context, | ||
"response_data": results.get("message"), | ||
}, | ||
) | ||
return None | ||
return ts | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add tests for this method? Probably can mostly copy/paste from the existing ones, just don't wanna lose the coverage when we delete the old stuff. |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this file to the |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imported so that the condition handler gets added to the registry. Maybe there's a better solution?