From 622e8c6348b81ebea0cf777c2994b32c447aa380 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Fri, 5 Apr 2024 12:21:56 -0700 Subject: [PATCH 01/20] feat(issue-alert): Create initial delayed processor function --- .../api/endpoints/project_rule_actions.py | 2 +- src/sentry/api/endpoints/project_rules.py | 2 +- src/sentry/rules/history/preview.py | 2 +- src/sentry/rules/processing/__init__.py | 0 .../rules/processing/delayed_processing.py | 34 ++++++++++++++ .../rules/{ => processing}/processor.py | 0 src/sentry/tasks/post_process.py | 2 +- tests/sentry/rules/processing/__init__.py | 0 .../processing/test_delayed_processing.py | 45 +++++++++++++++++++ .../rules/{ => processing}/test_processor.py | 18 ++++---- tests/sentry/tasks/test_post_process.py | 34 +++++++------- 11 files changed, 110 insertions(+), 29 deletions(-) create mode 100644 src/sentry/rules/processing/__init__.py create mode 100644 src/sentry/rules/processing/delayed_processing.py rename src/sentry/rules/{ => processing}/processor.py (100%) create mode 100644 tests/sentry/rules/processing/__init__.py create mode 100644 tests/sentry/rules/processing/test_delayed_processing.py rename tests/sentry/rules/{ => processing}/test_processor.py (97%) diff --git a/src/sentry/api/endpoints/project_rule_actions.py b/src/sentry/api/endpoints/project_rule_actions.py index 457bda65643596..15049bec8035e7 100644 --- a/src/sentry/api/endpoints/project_rule_actions.py +++ b/src/sentry/api/endpoints/project_rule_actions.py @@ -8,7 +8,7 @@ from sentry.api.bases import ProjectAlertRulePermission, ProjectEndpoint from sentry.api.serializers.rest_framework import RuleActionSerializer from sentry.models.rule import Rule -from sentry.rules.processor import RuleProcessor +from sentry.rules.processing.processor import RuleProcessor from sentry.utils.safe import safe_execute from sentry.utils.samples import create_sample_event diff --git a/src/sentry/api/endpoints/project_rules.py b/src/sentry/api/endpoints/project_rules.py index 2e76f98c32551b..6d5ae511a5d75a 100644 --- a/src/sentry/api/endpoints/project_rules.py +++ b/src/sentry/api/endpoints/project_rules.py @@ -32,7 +32,7 @@ from sentry.models.user import User from sentry.rules.actions import trigger_sentry_app_action_creators_for_issues from sentry.rules.actions.base import instantiate_action -from sentry.rules.processor import is_condition_slow +from sentry.rules.processing.processor import is_condition_slow from sentry.signals import alert_rule_created from sentry.tasks.integrations.slack import find_channel_id_for_rule from sentry.utils import metrics diff --git a/src/sentry/rules/history/preview.py b/src/sentry/rules/history/preview.py index 1afc7677d2b344..de4e465b56b7d4 100644 --- a/src/sentry/rules/history/preview.py +++ b/src/sentry/rules/history/preview.py @@ -18,7 +18,7 @@ get_update_kwargs_for_group, get_update_kwargs_for_groups, ) -from sentry.rules.processor import get_match_function +from sentry.rules.processing.processor import get_match_function from sentry.snuba.dataset import Dataset from sentry.types.condition_activity import ( FREQUENCY_CONDITION_BUCKET_SIZE, diff --git a/src/sentry/rules/processing/__init__.py b/src/sentry/rules/processing/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py new file mode 100644 index 00000000000000..31b645a6c69caa --- /dev/null +++ b/src/sentry/rules/processing/delayed_processing.py @@ -0,0 +1,34 @@ +import logging +from collections.abc import Mapping + +from sentry.buffer.redis import RedisBuffer +from sentry.models.project import Project +from sentry.utils.safe import safe_execute + +logger = logging.getLogger("sentry.rules.delayed_processing") + + +# TODO(schew2381): Import once available +PROJECT_ID_BUFFER_LIST_KEY = "project_id_buffer_list" + + +# TODO: Add redis buffer registry decorator +def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: + project_ids = buffer.get_list(PROJECT_ID_BUFFER_LIST_KEY) + + # Get actual projects in bulk query, is this too many projects + # to fetch at once and keep in memory? + project_mapping = Project.objects.in_bulk(set(project_ids)) + + for project_id, project in project_mapping.items(): + rulegroup_event_mapping: dict[str, int] = buffer.get_queue(Project, {id: project_id}) + safe_execute( + apply_delayed, + project, + rulegroup_event_mapping, + _with_transaction=False, + ) + + +def apply_delayed(project: Project, rule_group_pairs: Mapping[str, int]): + pass diff --git a/src/sentry/rules/processor.py b/src/sentry/rules/processing/processor.py similarity index 100% rename from src/sentry/rules/processor.py rename to src/sentry/rules/processing/processor.py diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index 8812eab86375c7..bd0d2e4ae903f8 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -1081,7 +1081,7 @@ def process_rules(job: PostProcessJob) -> None: if job["is_reprocessed"]: return - from sentry.rules.processor import RuleProcessor + from sentry.rules.processing.processor import RuleProcessor group_event = job["event"] if isinstance(group_event, Event): diff --git a/tests/sentry/rules/processing/__init__.py b/tests/sentry/rules/processing/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py new file mode 100644 index 00000000000000..26b205c26645fa --- /dev/null +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -0,0 +1,45 @@ +from unittest.mock import Mock, patch + +from sentry.rules.processing.delayed_processing import ( + apply_delayed, + process_delayed_alert_conditions, +) +from sentry.testutils.cases import TestCase + + +class ProcessDelayedAlertConditionsTest(TestCase): + def get_rulegroup_event_mapping_from_input(self, proj_model, proj_id_map): + proj_id = proj_id_map.popitem()[1] + return self.buffer_mapping[proj_id] + + @patch("sentry.rules.processing.delayed_processing.safe_execute") + def test_fetches_from_buffer_and_executes(self, mock_safe_execute): + self.project_two = self.create_project() + + project_id_mapping = { + self.project.id: self.project, + self.project_two.id: self.project_two, + } + rulegroup_event_mapping_one = {"1:1": 1, "2:2": 2} + rulegroup_event_mapping_two = {"3:3": 3, "4:4": 4} + self.buffer_mapping = { + self.project.id: rulegroup_event_mapping_one, + self.project_two.id: rulegroup_event_mapping_two, + } + + mock_buffer = Mock() + mock_buffer.get_list.return_value = self.buffer_mapping.keys() + # To get the correct mapping, we need to return the correct + # rulegroup_event mapping based on the project_id input + mock_buffer.get_queue.side_effect = self.get_rulegroup_event_mapping_from_input + + process_delayed_alert_conditions(mock_buffer) + + for project_id, rule_group_event_mapping in self.buffer_mapping.items(): + project = project_id_mapping[project_id] + mock_safe_execute.assert_any_call( + apply_delayed, + project, + rule_group_event_mapping, + _with_transaction=False, + ) diff --git a/tests/sentry/rules/test_processor.py b/tests/sentry/rules/processing/test_processor.py similarity index 97% rename from tests/sentry/rules/test_processor.py rename to tests/sentry/rules/processing/test_processor.py index 12ac4fb5ca8954..d0c8e5419ccdb6 100644 --- a/tests/sentry/rules/test_processor.py +++ b/tests/sentry/rules/processing/test_processor.py @@ -17,7 +17,7 @@ from sentry.rules import init_registry from sentry.rules.conditions import EventCondition from sentry.rules.filters.base import EventFilter -from sentry.rules.processor import RuleProcessor +from sentry.rules.processing.processor import RuleProcessor from sentry.testutils.cases import TestCase from sentry.testutils.helpers import install_slack from sentry.testutils.helpers.features import with_feature @@ -317,7 +317,9 @@ def test_multiple_rules(self): # Test that we don't get errors if we try to create statuses that already exist due to a # race condition - with mock.patch("sentry.rules.processor.GroupRuleStatus") as mocked_GroupRuleStatus: + with mock.patch( + "sentry.rules.processing.processor.GroupRuleStatus" + ) as mocked_GroupRuleStatus: call_count = 0 def mock_filter(*args, **kwargs): @@ -356,7 +358,7 @@ def test_slow_conditions_evaluate_last(self): }, ) with ( - patch("sentry.rules.processor.rules", init_registry()), + patch("sentry.rules.processing.processor.rules", init_registry()), patch( "sentry.rules.conditions.event_frequency.BaseEventFrequencyCondition.passes" ) as passes, @@ -418,7 +420,7 @@ def test_filter_passes(self): }, ) # patch the rule registry to contain the mocked rules - with patch("sentry.rules.processor.rules", init_registry()): + with patch("sentry.rules.processing.processor.rules", init_registry()): rp = RuleProcessor( self.group_event, is_new=True, @@ -447,7 +449,7 @@ def test_filter_fails(self): }, ) # patch the rule registry to contain the mocked rules - with patch("sentry.rules.processor.rules", init_registry()): + with patch("sentry.rules.processing.processor.rules", init_registry()): rp = RuleProcessor( self.group_event, is_new=True, @@ -553,13 +555,13 @@ def test_last_active_too_recent(self): ) with mock.patch( - "sentry.rules.processor.RuleProcessor.bulk_get_rule_status", + "sentry.rules.processing.processor.RuleProcessor.bulk_get_rule_status", return_value={self.rule.id: grs}, ): results = list(rp.apply()) assert len(results) == 0 - @mock.patch("sentry.rules.processor.RuleProcessor.logger") + @mock.patch("sentry.rules.processing.processor.RuleProcessor.logger") def test_invalid_predicate(self, mock_logger): filter_data = {"id": "tests.sentry.rules.test_processor.MockFilterTrue"} @@ -573,7 +575,7 @@ def test_invalid_predicate(self, mock_logger): }, ) - with patch("sentry.rules.processor.get_match_function", return_value=None): + with patch("sentry.rules.processing.processor.get_match_function", return_value=None): rp = RuleProcessor( self.group_event, is_new=True, diff --git a/tests/sentry/tasks/test_post_process.py b/tests/sentry/tasks/test_post_process.py index 4e1eb7bdc0121e..970f7a199c3437 100644 --- a/tests/sentry/tasks/test_post_process.py +++ b/tests/sentry/tasks/test_post_process.py @@ -110,7 +110,7 @@ def call_post_process_group( class CorePostProcessGroupTestMixin(BasePostProgressGroupMixin): - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") @patch("sentry.tasks.servicehooks.process_service_hook") @patch("sentry.tasks.sentry_apps.process_resource_change_bound.delay") @patch("sentry.signals.event_processed.send_robust") @@ -147,7 +147,7 @@ def test_issueless( # transaction events do not call event.processed assert mock_signal.call_count == 0 - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_no_cache_abort(self, mock_processor): event = self.create_event(data={}, project_id=self.project.id) @@ -354,7 +354,7 @@ def test_skipping_an_issue_doesnt_mark_it_processed(self, mock_derive_code_mappi class RuleProcessorTestMixin(BasePostProgressGroupMixin): - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_rule_processor_backwards_compat(self, mock_processor): event = self.create_event(data={}, project_id=self.project.id) @@ -375,7 +375,7 @@ def test_rule_processor_backwards_compat(self, mock_processor): mock_callback.assert_called_once_with(EventMatcher(event), mock_futures) - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_rule_processor(self, mock_processor): event = self.create_event(data={"message": "testing"}, project_id=self.project.id) @@ -451,7 +451,7 @@ def test_rule_processor_buffer_values(self): ) assert MockAction.return_value.after.call_count == 1 - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_group_refresh(self, mock_processor): event = self.create_event(data={"message": "testing"}, project_id=self.project.id) @@ -480,7 +480,7 @@ def test_group_refresh(self, mock_processor): EventMatcher(event, group=group2), True, False, True, False, False ) - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_group_last_seen_buffer(self, mock_processor): first_event_date = timezone.now() - timedelta(days=90) event1 = self.create_event( @@ -541,7 +541,7 @@ def test_service_hook_fires_on_new_event(self, mock_process_service_hook): ) @patch("sentry.tasks.servicehooks.process_service_hook") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_service_hook_fires_on_alert(self, mock_processor, mock_process_service_hook): event = self.create_event(data={}, project_id=self.project.id) @@ -570,7 +570,7 @@ def test_service_hook_fires_on_alert(self, mock_processor, mock_process_service_ ) @patch("sentry.tasks.servicehooks.process_service_hook") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_service_hook_does_not_fire_without_alert( self, mock_processor, mock_process_service_hook ): @@ -734,7 +734,7 @@ def test_processes_resource_change_task_not_called_without_error_created(self, d class InboxTestMixin(BasePostProgressGroupMixin): - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_group_inbox_regression(self, mock_processor): new_event = self.create_event(data={"message": "testing"}, project_id=self.project.id) @@ -1562,7 +1562,7 @@ def test_does_not_skip_when_is_new(self, mock_get_commit_context): class SnoozeTestSkipSnoozeMixin(BasePostProgressGroupMixin): @patch("sentry.signals.issue_unignored.send_robust") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_invalidates_snooze_issue_platform(self, mock_processor, mock_send_unignored_robust): event = self.create_event(data={"message": "testing"}, project_id=self.project.id) group = event.group @@ -1628,7 +1628,7 @@ def test_invalidates_snooze_issue_platform(self, mock_processor, mock_send_unign class SnoozeTestMixin(BasePostProgressGroupMixin): @patch("sentry.signals.issue_unignored.send_robust") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_invalidates_snooze(self, mock_processor, mock_send_unignored_robust): event = self.create_event(data={"message": "testing"}, project_id=self.project.id) @@ -1677,7 +1677,7 @@ def test_invalidates_snooze(self, mock_processor, mock_send_unignored_robust): @override_settings(SENTRY_BUFFER="sentry.buffer.redis.RedisBuffer") @patch("sentry.signals.issue_unignored.send_robust") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_invalidates_snooze_with_buffers(self, mock_processor, send_robust): redis_buffer = RedisBuffer() with ( @@ -1714,7 +1714,7 @@ def test_invalidates_snooze_with_buffers(self, mock_processor, send_robust): ) assert not GroupSnooze.objects.filter(id=snooze.id).exists() - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_maintains_valid_snooze(self, mock_processor): event = self.create_event(data={}, project_id=self.project.id) group = event.group @@ -2380,7 +2380,7 @@ def call_post_process_group( @patch("sentry.sentry_metrics.client.generic_metrics_backend.counter") @patch("sentry.tasks.post_process.run_post_process_job") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") @patch("sentry.signals.transaction_processed.send_robust") @patch("sentry.signals.event_processed.send_robust") def test_process_transaction_event_with_no_group( @@ -2420,7 +2420,7 @@ def test_process_transaction_event_with_no_group( @patch("sentry.tasks.post_process.handle_auto_assignment") @patch("sentry.tasks.post_process.process_rules") @patch("sentry.tasks.post_process.run_post_process_job") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") @patch("sentry.signals.transaction_processed.send_robust") @patch("sentry.signals.event_processed.send_robust") def test_full_pipeline_with_group_states( @@ -2603,7 +2603,7 @@ def test_no_cache_abort(self): # We don't use the cache for generic issues, so skip this test pass - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") def test_occurrence_deduping(self, mock_processor): event = self.create_event(data={"message": "testing"}, project_id=self.project.id) @@ -2631,7 +2631,7 @@ def test_occurrence_deduping(self, mock_processor): @patch("sentry.tasks.post_process.handle_auto_assignment") @patch("sentry.tasks.post_process.process_rules") @patch("sentry.tasks.post_process.run_post_process_job") - @patch("sentry.rules.processor.RuleProcessor") + @patch("sentry.rules.processing.processor.RuleProcessor") @patch("sentry.signals.event_processed.send_robust") @patch("sentry.utils.snuba.raw_query") def test_full_pipeline_with_group_states( From 0c5ac9cd0a28a48607d984654276207b83ebd15d Mon Sep 17 00:00:00 2001 From: schew2381 Date: Fri, 5 Apr 2024 12:26:52 -0700 Subject: [PATCH 02/20] remove comments and add more typing --- src/sentry/rules/processing/delayed_processing.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 31b645a6c69caa..c9a9dc3d38ef91 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -16,8 +16,6 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: project_ids = buffer.get_list(PROJECT_ID_BUFFER_LIST_KEY) - # Get actual projects in bulk query, is this too many projects - # to fetch at once and keep in memory? project_mapping = Project.objects.in_bulk(set(project_ids)) for project_id, project in project_mapping.items(): @@ -30,5 +28,5 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: ) -def apply_delayed(project: Project, rule_group_pairs: Mapping[str, int]): +def apply_delayed(project: Project, rule_group_pairs: Mapping[str, int]) -> None: pass From 8c04206175b7a51cba54f8dac82df65ccb5b51b3 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Fri, 5 Apr 2024 12:28:18 -0700 Subject: [PATCH 03/20] fix more typing --- src/sentry/rules/processing/delayed_processing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index c9a9dc3d38ef91..c7bbeb7170add2 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -14,12 +14,12 @@ # TODO: Add redis buffer registry decorator def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: - project_ids = buffer.get_list(PROJECT_ID_BUFFER_LIST_KEY) + project_ids = buffer.get_list(PROJECT_ID_BUFFER_LIST_KEY) # type: ignore[attr-defined] project_mapping = Project.objects.in_bulk(set(project_ids)) for project_id, project in project_mapping.items(): - rulegroup_event_mapping: dict[str, int] = buffer.get_queue(Project, {id: project_id}) + rulegroup_event_mapping: dict[str, int] = buffer.get_queue(Project, {id: project_id}) # type: ignore[attr-defined] safe_execute( apply_delayed, project, From a8fc21f665e48afa80959576dd3702ec73b08d4a Mon Sep 17 00:00:00 2001 From: schew2381 Date: Fri, 5 Apr 2024 12:30:31 -0700 Subject: [PATCH 04/20] remove stuff from comment --- src/sentry/rules/processing/delayed_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index c7bbeb7170add2..fcff3c7f59d02b 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -8,7 +8,7 @@ logger = logging.getLogger("sentry.rules.delayed_processing") -# TODO(schew2381): Import once available +# TODO: Import once available PROJECT_ID_BUFFER_LIST_KEY = "project_id_buffer_list" From 195da43f68d699aacf3cb71b7311ff676e45d926 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Fri, 5 Apr 2024 12:33:35 -0700 Subject: [PATCH 05/20] reduce test verboseness --- .../sentry/rules/processing/test_delayed_processing.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 26b205c26645fa..38a005fe244bb0 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -16,10 +16,6 @@ def get_rulegroup_event_mapping_from_input(self, proj_model, proj_id_map): def test_fetches_from_buffer_and_executes(self, mock_safe_execute): self.project_two = self.create_project() - project_id_mapping = { - self.project.id: self.project, - self.project_two.id: self.project_two, - } rulegroup_event_mapping_one = {"1:1": 1, "2:2": 2} rulegroup_event_mapping_two = {"3:3": 3, "4:4": 4} self.buffer_mapping = { @@ -35,8 +31,10 @@ def test_fetches_from_buffer_and_executes(self, mock_safe_execute): process_delayed_alert_conditions(mock_buffer) - for project_id, rule_group_event_mapping in self.buffer_mapping.items(): - project = project_id_mapping[project_id] + for project, rule_group_event_mapping in ( + (self.project, rulegroup_event_mapping_one), + (self.project_two, rulegroup_event_mapping_two), + ): mock_safe_execute.assert_any_call( apply_delayed, project, From 90bf014cebb5a9028641693e3114e5e79f45a4e6 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 10:54:22 -0700 Subject: [PATCH 06/20] initial fixes --- src/sentry/rules/processing/delayed_processing.py | 9 ++++----- tests/sentry/buffer/test_redis.py | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index fcff3c7f59d02b..ec5df2aac9bc91 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,25 +1,24 @@ import logging from collections.abc import Mapping -from sentry.buffer.redis import RedisBuffer +from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry from sentry.models.project import Project from sentry.utils.safe import safe_execute logger = logging.getLogger("sentry.rules.delayed_processing") -# TODO: Import once available PROJECT_ID_BUFFER_LIST_KEY = "project_id_buffer_list" -# TODO: Add redis buffer registry decorator +@redis_buffer_registry.add_handler(BufferHookEvent.FLUSH) def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: - project_ids = buffer.get_list(PROJECT_ID_BUFFER_LIST_KEY) # type: ignore[attr-defined] + project_ids = buffer.get_set(PROJECT_ID_BUFFER_LIST_KEY) project_mapping = Project.objects.in_bulk(set(project_ids)) for project_id, project in project_mapping.items(): - rulegroup_event_mapping: dict[str, int] = buffer.get_queue(Project, {id: project_id}) # type: ignore[attr-defined] + rulegroup_event_mapping: dict[str, int] = buffer.get_hash(Project, {id: project_id}) safe_execute( apply_delayed, project, diff --git a/tests/sentry/buffer/test_redis.py b/tests/sentry/buffer/test_redis.py index abd5c313795fee..d15506e88c7f3b 100644 --- a/tests/sentry/buffer/test_redis.py +++ b/tests/sentry/buffer/test_redis.py @@ -11,6 +11,7 @@ from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry from sentry.models.group import Group from sentry.models.project import Project +from sentry.rules.processing.delayed_processing import PROJECT_ID_BUFFER_LIST_KEY from sentry.testutils.helpers.datetime import freeze_time from sentry.testutils.pytest.fixtures import django_db_all from sentry.utils import json @@ -268,7 +269,6 @@ def group_rule_data_by_project_id(self, buffer, project_ids): return project_ids_to_rule_data def test_enqueue(self): - PROJECT_ID_BUFFER_LIST_KEY = "project_id_buffer_list" project_id = 1 rule_id = 2 group_id = 3 From b6565283821a5fca2025ae72f4e978508ddaf23a Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 10:54:41 -0700 Subject: [PATCH 07/20] typing --- src/sentry/rules/processing/delayed_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index ec5df2aac9bc91..dd96318a8875fc 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -27,5 +27,5 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: ) -def apply_delayed(project: Project, rule_group_pairs: Mapping[str, int]) -> None: +def apply_delayed(project: Project, rule_group_pairs: Mapping[str, str]) -> None: pass From d0688040aae91059b7b8dcc1bc995224a7d5e599 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 11:36:05 -0700 Subject: [PATCH 08/20] add timers --- .../rules/processing/delayed_processing.py | 27 ++++++++++--------- .../processing/test_delayed_processing.py | 4 +-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index dd96318a8875fc..dd64f9b1359362 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -3,6 +3,8 @@ from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry from sentry.models.project import Project +from sentry.utils import metrics +from sentry.utils.query import RangeQuerySetWrapper from sentry.utils.safe import safe_execute logger = logging.getLogger("sentry.rules.delayed_processing") @@ -13,18 +15,19 @@ @redis_buffer_registry.add_handler(BufferHookEvent.FLUSH) def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: - project_ids = buffer.get_set(PROJECT_ID_BUFFER_LIST_KEY) - - project_mapping = Project.objects.in_bulk(set(project_ids)) - - for project_id, project in project_mapping.items(): - rulegroup_event_mapping: dict[str, int] = buffer.get_hash(Project, {id: project_id}) - safe_execute( - apply_delayed, - project, - rulegroup_event_mapping, - _with_transaction=False, - ) + with metrics.timer("delayed_processing.process_all_conditions.duration"): + project_ids = buffer.get_set(PROJECT_ID_BUFFER_LIST_KEY) + + for project in RangeQuerySetWrapper(Project.objects.filter(id__in=project_ids)): + with metrics.timer("delayed_processing.process_project.duration"): + rulegroup_event_mapping: dict[str, int] = buffer.get_hash(Project, {id: project.id}) + + safe_execute( + apply_delayed, + project, + rulegroup_event_mapping, + _with_transaction=False, + ) def apply_delayed(project: Project, rule_group_pairs: Mapping[str, str]) -> None: diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 38a005fe244bb0..9f86e8f0466ea1 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -24,10 +24,10 @@ def test_fetches_from_buffer_and_executes(self, mock_safe_execute): } mock_buffer = Mock() - mock_buffer.get_list.return_value = self.buffer_mapping.keys() + mock_buffer.get_set.return_value = self.buffer_mapping.keys() # To get the correct mapping, we need to return the correct # rulegroup_event mapping based on the project_id input - mock_buffer.get_queue.side_effect = self.get_rulegroup_event_mapping_from_input + mock_buffer.get_hash.side_effect = self.get_rulegroup_event_mapping_from_input process_delayed_alert_conditions(mock_buffer) From f6dc255b9e55c5070a0df8c222f3c1364f21a1bf Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 12:06:33 -0700 Subject: [PATCH 09/20] fix typing --- src/sentry/rules/processing/delayed_processing.py | 2 +- tests/sentry/rules/processing/test_delayed_processing.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index dd64f9b1359362..ffdd9c8ad1696a 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -20,7 +20,7 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: for project in RangeQuerySetWrapper(Project.objects.filter(id__in=project_ids)): with metrics.timer("delayed_processing.process_project.duration"): - rulegroup_event_mapping: dict[str, int] = buffer.get_hash(Project, {id: project.id}) + rulegroup_event_mapping = buffer.get_hash(model=Project, field={"id": project.id}) safe_execute( apply_delayed, diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 9f86e8f0466ea1..f64026497cde04 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -16,8 +16,8 @@ def get_rulegroup_event_mapping_from_input(self, proj_model, proj_id_map): def test_fetches_from_buffer_and_executes(self, mock_safe_execute): self.project_two = self.create_project() - rulegroup_event_mapping_one = {"1:1": 1, "2:2": 2} - rulegroup_event_mapping_two = {"3:3": 3, "4:4": 4} + rulegroup_event_mapping_one = {"1:1": "event_1", "2:2": "event_2"} + rulegroup_event_mapping_two = {"3:3": "event_3", "4:4": "event_4"} self.buffer_mapping = { self.project.id: rulegroup_event_mapping_one, self.project_two.id: rulegroup_event_mapping_two, From a2402da44007a85d4d12edf9d5c083ee5ce5bda6 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 12:08:57 -0700 Subject: [PATCH 10/20] fix imports --- .../sentry/rules/processing/test_processor.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/sentry/rules/processing/test_processor.py b/tests/sentry/rules/processing/test_processor.py index d0c8e5419ccdb6..364d9f37f6b1d7 100644 --- a/tests/sentry/rules/processing/test_processor.py +++ b/tests/sentry/rules/processing/test_processor.py @@ -37,7 +37,7 @@ class MockConditionTrue(EventCondition): - id = "tests.sentry.rules.test_processor.MockConditionTrue" + id = "tests.sentry.rules.processing.test_processor.MockConditionTrue" label = "Mock condition which always passes." def passes(self, event, state): @@ -341,7 +341,7 @@ def mock_filter(*args, **kwargs): [ "sentry.mail.actions.NotifyEmailAction", "sentry.rules.conditions.event_frequency.EventFrequencyCondition", - "tests.sentry.rules.test_processor.MockConditionTrue", + "tests.sentry.rules.processing.test_processor.MockConditionTrue", ], ) def test_slow_conditions_evaluate_last(self): @@ -351,7 +351,7 @@ def test_slow_conditions_evaluate_last(self): data={ "conditions": [ {"id": "sentry.rules.conditions.event_frequency.EventFrequencyCondition"}, - {"id": "tests.sentry.rules.test_processor.MockConditionTrue"}, + {"id": "tests.sentry.rules.processing.test_processor.MockConditionTrue"}, ], "action_match": "any", "actions": [EMAIL_ACTION_DATA], @@ -378,7 +378,7 @@ def test_slow_conditions_evaluate_last(self): class MockFilterTrue(EventFilter): - id = "tests.sentry.rules.test_processor.MockFilterTrue" + id = "tests.sentry.rules.processing.test_processor.MockFilterTrue" label = "Mock filter which always passes." def passes(self, event, state): @@ -386,7 +386,7 @@ def passes(self, event, state): class MockFilterFalse(EventFilter): - id = "tests.sentry.rules.test_processor.MockFilterFalse" + id = "tests.sentry.rules.processing.test_processor.MockFilterFalse" label = "Mock filter which never passes." def passes(self, event, state): @@ -397,8 +397,8 @@ class RuleProcessorTestFilters(TestCase): MOCK_SENTRY_RULES_WITH_FILTERS = ( "sentry.mail.actions.NotifyEmailAction", "sentry.rules.conditions.every_event.EveryEventCondition", - "tests.sentry.rules.test_processor.MockFilterTrue", - "tests.sentry.rules.test_processor.MockFilterFalse", + "tests.sentry.rules.processing.test_processor.MockFilterTrue", + "tests.sentry.rules.processing.test_processor.MockFilterFalse", ) def setUp(self): @@ -408,7 +408,7 @@ def setUp(self): @patch("sentry.constants._SENTRY_RULES", MOCK_SENTRY_RULES_WITH_FILTERS) def test_filter_passes(self): # setup a simple alert rule with 1 condition and 1 filter that always pass - filter_data = {"id": "tests.sentry.rules.test_processor.MockFilterTrue"} + filter_data = {"id": "tests.sentry.rules.processing.test_processor.MockFilterTrue"} Rule.objects.filter(project=self.group_event.project).delete() ProjectOwnership.objects.create(project_id=self.project.id, fallthrough=True) @@ -438,7 +438,7 @@ def test_filter_passes(self): @patch("sentry.constants._SENTRY_RULES", MOCK_SENTRY_RULES_WITH_FILTERS) def test_filter_fails(self): # setup a simple alert rule with 1 condition and 1 filter that doesn't pass - filter_data = {"id": "tests.sentry.rules.test_processor.MockFilterFalse"} + filter_data = {"id": "tests.sentry.rules.processing.test_processor.MockFilterFalse"} Rule.objects.filter(project=self.group_event.project).delete() self.rule = Rule.objects.create( @@ -563,7 +563,7 @@ def test_last_active_too_recent(self): @mock.patch("sentry.rules.processing.processor.RuleProcessor.logger") def test_invalid_predicate(self, mock_logger): - filter_data = {"id": "tests.sentry.rules.test_processor.MockFilterTrue"} + filter_data = {"id": "tests.sentry.rules.processing.test_processor.MockFilterTrue"} Rule.objects.filter(project=self.group_event.project).delete() ProjectOwnership.objects.create(project_id=self.project.id, fallthrough=True) From a2d58896ce2dda06bd190ac8893d8ccde3df0943 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 12:13:21 -0700 Subject: [PATCH 11/20] move timer --- src/sentry/rules/processing/delayed_processing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index ffdd9c8ad1696a..ca6b374658c7ec 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -19,9 +19,9 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: project_ids = buffer.get_set(PROJECT_ID_BUFFER_LIST_KEY) for project in RangeQuerySetWrapper(Project.objects.filter(id__in=project_ids)): - with metrics.timer("delayed_processing.process_project.duration"): - rulegroup_event_mapping = buffer.get_hash(model=Project, field={"id": project.id}) + rulegroup_event_mapping = buffer.get_hash(model=Project, field={"id": project.id}) + with metrics.timer("delayed_processing.process_project.duration"): safe_execute( apply_delayed, project, From c27c86ed278b83b1a18260c02092967e117a47d9 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 23:30:52 -0700 Subject: [PATCH 12/20] feat(issue-alerts): Group unique conditions --- src/sentry/models/rule.py | 50 ++++++++++++++++++- src/sentry/rules/base.py | 10 ++-- .../rules/conditions/event_frequency.py | 41 +++++++++++++-- .../rules/processing/delayed_processing.py | 23 ++++++++- src/sentry/rules/processing/processor.py | 29 +++-------- .../sentry/rules/processing/test_processor.py | 11 +++- 6 files changed, 132 insertions(+), 32 deletions(-) diff --git a/src/sentry/models/rule.py b/src/sentry/models/rule.py index fdcaf7f67e8104..5d62a96c16f30f 100644 --- a/src/sentry/models/rule.py +++ b/src/sentry/models/rule.py @@ -1,6 +1,9 @@ +from __future__ import annotations + +import logging from collections.abc import Sequence from enum import Enum, IntEnum -from typing import Any, ClassVar, Self +from typing import TYPE_CHECKING, Any, ClassVar, Self from django.db import models from django.utils import timezone @@ -19,6 +22,14 @@ from sentry.db.models.manager import BaseManager from sentry.utils.cache import cache +if TYPE_CHECKING: + from sentry.rules.conditions.event_frequency import ( + CountComparisonConditionData, + PercentComparisonConditionData, + ) + +logger = logging.getLogger("sentry.rules") + class RuleSource(IntEnum): ISSUE = 0 @@ -34,6 +45,7 @@ def as_choices(cls) -> Sequence[tuple[int, str]]: @region_silo_only_model class Rule(Model): + __relocation_scope__ = RelocationScope.Organization DEFAULT_CONDITION_MATCH = "all" # any, all @@ -90,6 +102,42 @@ def created_by_id(self): return None + @property + def conditions(self) -> list[CountComparisonConditionData | PercentComparisonConditionData]: + from sentry.rules import rules + from sentry.rules.conditions.base import EventCondition + + conditions_and_filters = self.data.get("conditions", []) + conditions = [] + for condition_or_filter in conditions_and_filters: + id = condition_or_filter["id"] + rule_cls = rules.get(id) + if rule_cls is None: + logger.warning("Unregistered condition or filter %r", id) + continue + + if rule_cls.rule_type == EventCondition.rule_type: + conditions.append(condition_or_filter) + return conditions + + @property + def filters(self) -> list[dict[str, Any]]: + from sentry.rules import rules + from sentry.rules.filters.base import EventFilter + + conditions_and_filters = self.data.get("conditions", []) + filters = [] + for condition_or_filter in conditions_and_filters: + id = condition_or_filter["id"] + rule_cls = rules.get(id) + if rule_cls is None: + logger.warning("Unregistered condition or filter %r", id) + continue + + if rule_cls.rule_type == EventFilter.rule_type: + filters.append(condition_or_filter) + return filters + def delete(self, *args, **kwargs): rv = super().delete(*args, **kwargs) cache_key = f"project:{self.project_id}:rules" diff --git a/src/sentry/rules/base.py b/src/sentry/rules/base.py index 18fae675c0497c..2033fd9a78fb20 100644 --- a/src/sentry/rules/base.py +++ b/src/sentry/rules/base.py @@ -3,19 +3,21 @@ import abc import logging from collections import namedtuple -from collections.abc import Callable, Sequence -from typing import Any, ClassVar +from collections.abc import Callable, Mapping, Sequence +from typing import TYPE_CHECKING, Any, ClassVar from django import forms from sentry.eventstore.models import GroupEvent from sentry.models.project import Project -from sentry.models.rule import Rule from sentry.models.rulefirehistory import RuleFireHistory from sentry.snuba.dataset import Dataset from sentry.types.condition_activity import ConditionActivity from sentry.types.rules import RuleFuture +if TYPE_CHECKING: + from sentry.models.rule import Rule + """ Rules apply either before an event gets stored, or immediately after. @@ -60,7 +62,7 @@ class RuleBase(abc.ABC): def __init__( self, project: Project, - data: dict[str, Any] | None = None, + data: Mapping[str, Any] | None = None, rule: Rule | None = None, rule_fire_history: RuleFireHistory | None = None, ) -> None: diff --git a/src/sentry/rules/conditions/event_frequency.py b/src/sentry/rules/conditions/event_frequency.py index 697ad2694344ba..cfc584979eeff1 100644 --- a/src/sentry/rules/conditions/event_frequency.py +++ b/src/sentry/rules/conditions/event_frequency.py @@ -6,7 +6,7 @@ import re from collections.abc import Mapping, Sequence from datetime import datetime, timedelta -from typing import Any +from typing import Any, NotRequired, TypedDict from django import forms from django.core.cache import cache @@ -51,6 +51,36 @@ } +class GenericConditionData(TypedDict): + """ + The base typed dict for all condition data representing EventFrequency issue + alert rule conditions + """ + + # The ID of the condition class + id: str + # Either the count or percentage. + value: int + # The interval to compare the value against such as 5m, 1h, 3w, etc. + # e.g. # of issues is more than {value} in {interval}. + interval: str + + +class CountComparisonConditionData(GenericConditionData): + # NOTE: Some of tne earlier alert rules were created without the comparisonType + # field, although modern rules will always have it. + # Specifies the comparison type. Should be set to COMPARISON_TYPE_COUNT. + comparisonType: NotRequired[str] + + +class PercentComparisonConditionData(GenericConditionData): + # Specifies the comparison type. Should be set to COMPARISON_TYPE_PERCENT. + comparisonType: str + # The previous interval to compare the curr interval against. + # e.g. # of issues is 50% higher in {interval} compared to {comparisonInterval} + comparisonInterval: str + + class EventFrequencyForm(forms.Form): intervals = standard_intervals interval = forms.ChoiceField( @@ -96,7 +126,12 @@ class BaseEventFrequencyCondition(EventCondition, abc.ABC): intervals = standard_intervals form_cls = EventFrequencyForm - def __init__(self, *args: Any, **kwargs: Any) -> None: + def __init__( + self, + data: CountComparisonConditionData | PercentComparisonConditionData, + *args: Any, + **kwargs: Any, + ) -> None: self.tsdb = kwargs.pop("tsdb", tsdb) self.form_fields = { "value": {"type": "number", "placeholder": 100}, @@ -112,7 +147,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: }, } - super().__init__(*args, **kwargs) + super().__init__(data=data, *args, **kwargs) # type:ignore[misc] def _get_options(self) -> tuple[str | None, float | None]: interval, value = None, None diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index ca6b374658c7ec..cb59294323dc01 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,8 +1,11 @@ import logging +from collections import defaultdict from collections.abc import Mapping from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry from sentry.models.project import Project +from sentry.models.rule import Rule +from sentry.rules.processing.processor import is_condition_slow from sentry.utils import metrics from sentry.utils.query import RangeQuerySetWrapper from sentry.utils.safe import safe_execute @@ -31,4 +34,22 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: def apply_delayed(project: Project, rule_group_pairs: Mapping[str, str]) -> None: - pass + rule_group_mapping = defaultdict(set) + # Map each rule to its associated groups + for rule_group in rule_group_pairs.keys(): + rule_id, group_id = rule_group.split(":") + rule_group_mapping[rule_id].add(group_id) + + rules = Rule.objects.filter(id__in=list(rule_group_mapping.keys())) + + condition_groups = defaultdict(set) + for rule in rules: + rule: Rule + rule.conditions + # Exclude filters and slow conditions. We only want to check fast + # conditions because rules are only added to the buffer if we've already + # checked their slow conditions. + conditions = [cond for cond in rule.conditions if not is_condition_slow(cond)] + for condition in conditions: + unique_condition = (condition["id"], condition.interval) + condition_groups[unique_condition].update(rule_group_mapping[rule.id]) diff --git a/src/sentry/rules/processing/processor.py b/src/sentry/rules/processing/processor.py index 3d7a0e60b40925..cc1129fb66d904 100644 --- a/src/sentry/rules/processing/processor.py +++ b/src/sentry/rules/processing/processor.py @@ -5,7 +5,6 @@ from collections.abc import Callable, Collection, Mapping, MutableMapping, Sequence from datetime import timedelta from random import randrange -from typing import Any from django.core.cache import cache from django.utils import timezone @@ -20,6 +19,7 @@ from sentry.rules import EventState, history, rules from sentry.rules.actions.base import instantiate_action from sentry.rules.conditions.base import EventCondition +from sentry.rules.conditions.event_frequency import GenericConditionData from sentry.rules.filters.base import EventFilter from sentry.types.rules import RuleFuture from sentry.utils.hashlib import hash_values @@ -38,7 +38,9 @@ def get_match_function(match_name: str) -> Callable[..., bool] | None: return None -def is_condition_slow(condition: Mapping[str, str]) -> bool: +def is_condition_slow( + condition: GenericConditionData, +) -> bool: for slow_conditions in SLOW_CONDITION_MATCHES: if slow_conditions in condition["id"]: return True @@ -138,7 +140,7 @@ def bulk_get_rule_status(self, rules: Sequence[Rule]) -> Mapping[int, GroupRuleS return rule_statuses def condition_matches( - self, condition: dict[str, Any], state: EventState, rule: Rule + self, condition: GenericConditionData, state: EventState, rule: Rule ) -> bool | None: condition_cls = rules.get(condition["id"]) if condition_cls is None: @@ -157,15 +159,6 @@ def condition_matches( ) return passes - def get_rule_type(self, condition: Mapping[str, Any]) -> str | None: - rule_cls = rules.get(condition["id"]) - if rule_cls is None: - self.logger.warning("Unregistered condition or filter %r", condition["id"]) - return None - - rule_type: str = rule_cls.rule_type - return rule_type - def get_state(self) -> EventState: return EventState( is_new=self.is_new, @@ -196,7 +189,6 @@ def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None: condition_match = rule.data.get("action_match") or Rule.DEFAULT_CONDITION_MATCH filter_match = rule.data.get("filter_match") or Rule.DEFAULT_FILTER_MATCH - rule_condition_list = rule.data.get("conditions", ()) frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY try: environment = self.event.get_environment() @@ -213,13 +205,8 @@ def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None: state = self.get_state() - condition_list = [] - filter_list = [] - for rule_cond in rule_condition_list: - if self.get_rule_type(rule_cond) == "condition/event": - condition_list.append(rule_cond) - else: - filter_list.append(rule_cond) + condition_list = rule.conditions + filter_list = rule.filters # Sort `condition_list` so that most expensive conditions run last. condition_list.sort(key=lambda condition: is_condition_slow(condition)) @@ -230,7 +217,7 @@ def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None: ): if not predicate_list: continue - predicate_iter = (self.condition_matches(f, state, rule) for f in predicate_list) + predicate_iter = [self.condition_matches(f, state, rule) for f in predicate_list] # type: ignore[attr-defined] predicate_func = get_match_function(match) if predicate_func: if not predicate_func(predicate_iter): diff --git a/tests/sentry/rules/processing/test_processor.py b/tests/sentry/rules/processing/test_processor.py index 364d9f37f6b1d7..e9979549693f98 100644 --- a/tests/sentry/rules/processing/test_processor.py +++ b/tests/sentry/rules/processing/test_processor.py @@ -359,6 +359,7 @@ def test_slow_conditions_evaluate_last(self): ) with ( patch("sentry.rules.processing.processor.rules", init_registry()), + patch("sentry.rules.rules", init_registry()), patch( "sentry.rules.conditions.event_frequency.BaseEventFrequencyCondition.passes" ) as passes, @@ -420,7 +421,10 @@ def test_filter_passes(self): }, ) # patch the rule registry to contain the mocked rules - with patch("sentry.rules.processing.processor.rules", init_registry()): + with ( + patch("sentry.rules.processing.processor.rules", init_registry()), + patch("sentry.rules.rules", init_registry()), + ): rp = RuleProcessor( self.group_event, is_new=True, @@ -449,7 +453,10 @@ def test_filter_fails(self): }, ) # patch the rule registry to contain the mocked rules - with patch("sentry.rules.processing.processor.rules", init_registry()): + with ( + patch("sentry.rules.processing.processor.rules", init_registry()), + patch("sentry.rules.rules", init_registry()), + ): rp = RuleProcessor( self.group_event, is_new=True, From 7c01b849ab695913c923beb336d3394e3c67346a Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 8 Apr 2024 23:43:05 -0700 Subject: [PATCH 13/20] fix typing --- src/sentry/rules/base.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/sentry/rules/base.py b/src/sentry/rules/base.py index 2033fd9a78fb20..a33244dc992f9d 100644 --- a/src/sentry/rules/base.py +++ b/src/sentry/rules/base.py @@ -3,7 +3,7 @@ import abc import logging from collections import namedtuple -from collections.abc import Callable, Mapping, Sequence +from collections.abc import Callable, MutableMapping, Sequence from typing import TYPE_CHECKING, Any, ClassVar from django import forms @@ -62,7 +62,7 @@ class RuleBase(abc.ABC): def __init__( self, project: Project, - data: Mapping[str, Any] | None = None, + data: MutableMapping[str, Any] | None = None, rule: Rule | None = None, rule_fire_history: RuleFireHistory | None = None, ) -> None: @@ -83,10 +83,7 @@ def get_option(self, key: str, default: str | None = None) -> str: return self.data.get(key, default) def get_form_instance(self) -> forms.Form: - data: dict[str, Any] | None = None - if self.had_data: - data = self.data - return self.form_cls(data) + return self.form_cls(self.data if self.had_data else None) def render_label(self) -> str: return self.label.format(**self.data) From a34ede786c38c9d507afbf1ee34c2f73c6f40276 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Wed, 10 Apr 2024 17:20:57 -0700 Subject: [PATCH 14/20] remove big stuff --- .../endpoints/project_rules_configuration.py | 2 +- src/sentry/models/rule.py | 47 +-------------- .../rules/conditions/event_frequency.py | 60 +++++++++---------- src/sentry/rules/processing/processor.py | 14 +++-- .../sentry/rules/processing/test_processor.py | 11 +--- 5 files changed, 41 insertions(+), 93 deletions(-) diff --git a/src/sentry/api/endpoints/project_rules_configuration.py b/src/sentry/api/endpoints/project_rules_configuration.py index 3519f238fc28ac..3de7e95b5608d1 100644 --- a/src/sentry/api/endpoints/project_rules_configuration.py +++ b/src/sentry/api/endpoints/project_rules_configuration.py @@ -39,7 +39,7 @@ def get(self, request: Request, project) -> Response: # TODO: conditions need to be based on actions for rule_type, rule_cls in rules: - node = rule_cls(project) + node = rule_cls(project=project) # skip over conditions if they are not in the migrated set for a project with alert-filters if project_has_filters and node.id in MIGRATED_CONDITIONS: continue diff --git a/src/sentry/models/rule.py b/src/sentry/models/rule.py index 5d62a96c16f30f..e24b08b9669c35 100644 --- a/src/sentry/models/rule.py +++ b/src/sentry/models/rule.py @@ -1,9 +1,8 @@ from __future__ import annotations -import logging from collections.abc import Sequence from enum import Enum, IntEnum -from typing import TYPE_CHECKING, Any, ClassVar, Self +from typing import Any, ClassVar, Self from django.db import models from django.utils import timezone @@ -22,14 +21,6 @@ from sentry.db.models.manager import BaseManager from sentry.utils.cache import cache -if TYPE_CHECKING: - from sentry.rules.conditions.event_frequency import ( - CountComparisonConditionData, - PercentComparisonConditionData, - ) - -logger = logging.getLogger("sentry.rules") - class RuleSource(IntEnum): ISSUE = 0 @@ -102,42 +93,6 @@ def created_by_id(self): return None - @property - def conditions(self) -> list[CountComparisonConditionData | PercentComparisonConditionData]: - from sentry.rules import rules - from sentry.rules.conditions.base import EventCondition - - conditions_and_filters = self.data.get("conditions", []) - conditions = [] - for condition_or_filter in conditions_and_filters: - id = condition_or_filter["id"] - rule_cls = rules.get(id) - if rule_cls is None: - logger.warning("Unregistered condition or filter %r", id) - continue - - if rule_cls.rule_type == EventCondition.rule_type: - conditions.append(condition_or_filter) - return conditions - - @property - def filters(self) -> list[dict[str, Any]]: - from sentry.rules import rules - from sentry.rules.filters.base import EventFilter - - conditions_and_filters = self.data.get("conditions", []) - filters = [] - for condition_or_filter in conditions_and_filters: - id = condition_or_filter["id"] - rule_cls = rules.get(id) - if rule_cls is None: - logger.warning("Unregistered condition or filter %r", id) - continue - - if rule_cls.rule_type == EventFilter.rule_type: - filters.append(condition_or_filter) - return filters - def delete(self, *args, **kwargs): rv = super().delete(*args, **kwargs) cache_key = f"project:{self.project_id}:rules" diff --git a/src/sentry/rules/conditions/event_frequency.py b/src/sentry/rules/conditions/event_frequency.py index cfc584979eeff1..019fce6d13cdd0 100644 --- a/src/sentry/rules/conditions/event_frequency.py +++ b/src/sentry/rules/conditions/event_frequency.py @@ -6,10 +6,11 @@ import re from collections.abc import Mapping, Sequence from datetime import datetime, timedelta -from typing import Any, NotRequired, TypedDict +from typing import Any, Literal, NotRequired, TypedDict from django import forms from django.core.cache import cache +from django.db.models.enums import TextChoices from django.utils import timezone from sentry import release_health, tsdb @@ -43,15 +44,14 @@ "1w": ("one week", timedelta(days=7)), "30d": ("30 days", timedelta(days=30)), } -COMPARISON_TYPE_COUNT = "count" -COMPARISON_TYPE_PERCENT = "percent" -comparison_types = { - COMPARISON_TYPE_COUNT: COMPARISON_TYPE_COUNT, - COMPARISON_TYPE_PERCENT: COMPARISON_TYPE_PERCENT, -} -class GenericConditionData(TypedDict): +class ComparisonType(TextChoices): + COUNT = "count" + PERCENT = "percent" + + +class EventFrequencyConditionData(TypedDict): """ The base typed dict for all condition data representing EventFrequency issue alert rule conditions @@ -64,21 +64,13 @@ class GenericConditionData(TypedDict): # The interval to compare the value against such as 5m, 1h, 3w, etc. # e.g. # of issues is more than {value} in {interval}. interval: str - - -class CountComparisonConditionData(GenericConditionData): - # NOTE: Some of tne earlier alert rules were created without the comparisonType - # field, although modern rules will always have it. - # Specifies the comparison type. Should be set to COMPARISON_TYPE_COUNT. - comparisonType: NotRequired[str] - - -class PercentComparisonConditionData(GenericConditionData): - # Specifies the comparison type. Should be set to COMPARISON_TYPE_PERCENT. - comparisonType: str - # The previous interval to compare the curr interval against. + # NOTE: Some of tne earliest COUNT conditions were created without the + # comparisonType field, although modern rules will always have it. + comparisonType: NotRequired[Literal[ComparisonType.COUNT, ComparisonType.PERCENT]] + # The previous interval to compare the curr interval against. This is only + # present in PERCENT conditions. # e.g. # of issues is 50% higher in {interval} compared to {comparisonInterval} - comparisonInterval: str + comparisonInterval: NotRequired[str] class EventFrequencyForm(forms.Form): @@ -93,7 +85,7 @@ class EventFrequencyForm(forms.Form): ) value = forms.IntegerField(widget=forms.TextInput()) comparisonType = forms.ChoiceField( - choices=tuple(comparison_types.items()), + choices=ComparisonType, required=False, ) comparisonInterval = forms.ChoiceField( @@ -112,8 +104,8 @@ def clean(self) -> dict[str, Any] | None: # Don't store an empty string here if the value isn't passed if cleaned_data.get("comparisonInterval") == "": del cleaned_data["comparisonInterval"] - cleaned_data["comparisonType"] = cleaned_data.get("comparisonType") or COMPARISON_TYPE_COUNT - if cleaned_data["comparisonType"] == COMPARISON_TYPE_PERCENT and not cleaned_data.get( + cleaned_data["comparisonType"] = cleaned_data.get("comparisonType") or ComparisonType.COUNT + if cleaned_data["comparisonType"] == ComparisonType.PERCENT and not cleaned_data.get( "comparisonInterval" ): msg = forms.ValidationError("comparisonInterval is required when comparing by percent") @@ -128,7 +120,7 @@ class BaseEventFrequencyCondition(EventCondition, abc.ABC): def __init__( self, - data: CountComparisonConditionData | PercentComparisonConditionData, + data: EventFrequencyConditionData | None = None, *args: Any, **kwargs: Any, ) -> None: @@ -147,7 +139,9 @@ def __init__( }, } - super().__init__(data=data, *args, **kwargs) # type:ignore[misc] + # MyPy refuses to make TypedDict compatible with MutableMapping + # https://github.com/python/mypy/issues/4976 + super().__init__(data=data, *args, **kwargs) # type:ignore[misc, arg-type] def _get_options(self) -> tuple[str | None, float | None]: interval, value = None, None @@ -179,18 +173,18 @@ def passes_activity_frequency( if not (interval and value is not None): return False interval_delta = self.intervals[interval][1] - comparison_type = self.get_option("comparisonType", COMPARISON_TYPE_COUNT) + comparison_type = self.get_option("comparisonType", ComparisonType.COUNT) # extrapolate if interval less than bucket size # if comparing percent increase, both intervals will be increased, so do not extrapolate value if interval_delta < FREQUENCY_CONDITION_BUCKET_SIZE: - if comparison_type != COMPARISON_TYPE_PERCENT: + if comparison_type != ComparisonType.PERCENT: value *= int(FREQUENCY_CONDITION_BUCKET_SIZE / interval_delta) interval_delta = FREQUENCY_CONDITION_BUCKET_SIZE result = bucket_count(activity.timestamp - interval_delta, activity.timestamp, buckets) - if comparison_type == COMPARISON_TYPE_PERCENT: + if comparison_type == ComparisonType.PERCENT: comparison_interval = comparison_intervals[self.get_option("comparisonInterval")][1] comparison_end = activity.timestamp - comparison_interval @@ -262,8 +256,8 @@ def get_rate(self, event: GroupEvent, interval: str, environment_id: str) -> int option_override_cm = options_override({"consistent": False}) with option_override_cm: result: int = self.query(event, end - duration, end, environment_id=environment_id) - comparison_type = self.get_option("comparisonType", COMPARISON_TYPE_COUNT) - if comparison_type == COMPARISON_TYPE_PERCENT: + comparison_type = self.get_option("comparisonType", ComparisonType.COUNT) + if comparison_type == ComparisonType.PERCENT: comparison_interval = comparison_intervals[self.get_option("comparisonInterval")][1] comparison_end = end - comparison_interval # TODO: Figure out if there's a way we can do this less frequently. All queries are @@ -375,7 +369,7 @@ def clean(self) -> dict[str, Any] | None: cleaned_data = super().clean() if ( cleaned_data - and cleaned_data["comparisonType"] == COMPARISON_TYPE_COUNT + and cleaned_data["comparisonType"] == ComparisonType.COUNT and cleaned_data.get("value", 0) > 100 ): self.add_error( diff --git a/src/sentry/rules/processing/processor.py b/src/sentry/rules/processing/processor.py index cc1129fb66d904..5c6c3d0f692043 100644 --- a/src/sentry/rules/processing/processor.py +++ b/src/sentry/rules/processing/processor.py @@ -5,6 +5,7 @@ from collections.abc import Callable, Collection, Mapping, MutableMapping, Sequence from datetime import timedelta from random import randrange +from typing import Any from django.core.cache import cache from django.utils import timezone @@ -19,7 +20,7 @@ from sentry.rules import EventState, history, rules from sentry.rules.actions.base import instantiate_action from sentry.rules.conditions.base import EventCondition -from sentry.rules.conditions.event_frequency import GenericConditionData +from sentry.rules.conditions.event_frequency import EventFrequencyConditionData from sentry.rules.filters.base import EventFilter from sentry.types.rules import RuleFuture from sentry.utils.hashlib import hash_values @@ -39,7 +40,7 @@ def get_match_function(match_name: str) -> Callable[..., bool] | None: def is_condition_slow( - condition: GenericConditionData, + condition: EventFrequencyConditionData | Mapping[str, Any], ) -> bool: for slow_conditions in SLOW_CONDITION_MATCHES: if slow_conditions in condition["id"]: @@ -140,14 +141,19 @@ def bulk_get_rule_status(self, rules: Sequence[Rule]) -> Mapping[int, GroupRuleS return rule_statuses def condition_matches( - self, condition: GenericConditionData, state: EventState, rule: Rule + self, + condition: EventFrequencyConditionData | Mapping[str, Any], + state: EventState, + rule: Rule, ) -> bool | None: condition_cls = rules.get(condition["id"]) if condition_cls is None: self.logger.warning("Unregistered condition %r", condition["id"]) return None - condition_inst = condition_cls(self.project, data=condition, rule=rule) + # MyPy refuses to make TypedDict compatible with MutableMapping + # https://github.com/python/mypy/issues/4976 + condition_inst = condition_cls(project=self.project, data=condition, rule=rule) # type: ignore[arg-type] if not isinstance(condition_inst, (EventCondition, EventFilter)): self.logger.warning("Unregistered condition %r", condition["id"]) return None diff --git a/tests/sentry/rules/processing/test_processor.py b/tests/sentry/rules/processing/test_processor.py index e9979549693f98..364d9f37f6b1d7 100644 --- a/tests/sentry/rules/processing/test_processor.py +++ b/tests/sentry/rules/processing/test_processor.py @@ -359,7 +359,6 @@ def test_slow_conditions_evaluate_last(self): ) with ( patch("sentry.rules.processing.processor.rules", init_registry()), - patch("sentry.rules.rules", init_registry()), patch( "sentry.rules.conditions.event_frequency.BaseEventFrequencyCondition.passes" ) as passes, @@ -421,10 +420,7 @@ def test_filter_passes(self): }, ) # patch the rule registry to contain the mocked rules - with ( - patch("sentry.rules.processing.processor.rules", init_registry()), - patch("sentry.rules.rules", init_registry()), - ): + with patch("sentry.rules.processing.processor.rules", init_registry()): rp = RuleProcessor( self.group_event, is_new=True, @@ -453,10 +449,7 @@ def test_filter_fails(self): }, ) # patch the rule registry to contain the mocked rules - with ( - patch("sentry.rules.processing.processor.rules", init_registry()), - patch("sentry.rules.rules", init_registry()), - ): + with patch("sentry.rules.processing.processor.rules", init_registry()): rp = RuleProcessor( self.group_event, is_new=True, From 2be0a88fc21a69a6d0ec49d2c7a2aa31646ad82c Mon Sep 17 00:00:00 2001 From: schew2381 Date: Fri, 12 Apr 2024 21:01:12 -0700 Subject: [PATCH 15/20] shift to do stuff --- src/sentry/models/rule.py | 3 - src/sentry/rules/base.py | 4 +- src/sentry/rules/conditions/base.py | 7 ++ .../rules/conditions/event_frequency.py | 32 +++-- .../rules/processing/delayed_processing.py | 117 ++++++++++++++---- src/sentry/rules/processing/processor.py | 53 +++++--- 6 files changed, 151 insertions(+), 65 deletions(-) diff --git a/src/sentry/models/rule.py b/src/sentry/models/rule.py index e24b08b9669c35..fdcaf7f67e8104 100644 --- a/src/sentry/models/rule.py +++ b/src/sentry/models/rule.py @@ -1,5 +1,3 @@ -from __future__ import annotations - from collections.abc import Sequence from enum import Enum, IntEnum from typing import Any, ClassVar, Self @@ -36,7 +34,6 @@ def as_choices(cls) -> Sequence[tuple[int, str]]: @region_silo_only_model class Rule(Model): - __relocation_scope__ = RelocationScope.Organization DEFAULT_CONDITION_MATCH = "all" # any, all diff --git a/src/sentry/rules/base.py b/src/sentry/rules/base.py index a33244dc992f9d..aae7d45845c362 100644 --- a/src/sentry/rules/base.py +++ b/src/sentry/rules/base.py @@ -3,7 +3,7 @@ import abc import logging from collections import namedtuple -from collections.abc import Callable, MutableMapping, Sequence +from collections.abc import Callable, Mapping, Sequence from typing import TYPE_CHECKING, Any, ClassVar from django import forms @@ -62,7 +62,7 @@ class RuleBase(abc.ABC): def __init__( self, project: Project, - data: MutableMapping[str, Any] | None = None, + data: Mapping[str, Any] | None = None, rule: Rule | None = None, rule_fire_history: RuleFireHistory | None = None, ) -> None: diff --git a/src/sentry/rules/conditions/base.py b/src/sentry/rules/conditions/base.py index 468147d864980d..09d371e6705d01 100644 --- a/src/sentry/rules/conditions/base.py +++ b/src/sentry/rules/conditions/base.py @@ -1,12 +1,19 @@ import abc from collections.abc import Sequence from datetime import datetime +from typing import TypedDict from sentry.eventstore.models import GroupEvent from sentry.rules.base import EventState, RuleBase from sentry.types.condition_activity import ConditionActivity +class GenericCondition(TypedDict): + # the ID in the rules registry that maps to a condition class + # e.g. "sentry.rules.conditions.every_event.EveryEventCondition" + id: str + + class EventCondition(RuleBase, abc.ABC): rule_type = "condition/event" diff --git a/src/sentry/rules/conditions/event_frequency.py b/src/sentry/rules/conditions/event_frequency.py index 6f06b041f368bf..55c8d19d86b6db 100644 --- a/src/sentry/rules/conditions/event_frequency.py +++ b/src/sentry/rules/conditions/event_frequency.py @@ -7,7 +7,7 @@ from collections import defaultdict from collections.abc import Callable, Mapping, Sequence from datetime import datetime, timedelta -from typing import Any, Literal, NotRequired, TypedDict +from typing import Any, Literal, NotRequired from django import forms from django.core.cache import cache @@ -21,7 +21,7 @@ from sentry.models.group import Group from sentry.receivers.rules import DEFAULT_RULE_LABEL, DEFAULT_RULE_LABEL_NEW from sentry.rules import EventState -from sentry.rules.conditions.base import EventCondition +from sentry.rules.conditions.base import EventCondition, GenericCondition from sentry.tsdb.base import TSDBModel from sentry.types.condition_activity import ( FREQUENCY_CONDITION_BUCKET_SIZE, @@ -58,14 +58,12 @@ class ComparisonType(TextChoices): PERCENT = "percent" -class EventFrequencyConditionData(TypedDict): +class EventFrequencyConditionData(GenericCondition): """ The base typed dict for all condition data representing EventFrequency issue alert rule conditions """ - # The ID of the condition class - id: str # Either the count or percentage. value: int # The interval to compare the value against such as 5m, 1h, 3w, etc. @@ -205,7 +203,7 @@ def passes_activity_frequency( def get_preview_aggregate(self) -> tuple[str, str]: raise NotImplementedError - def query(self, event: GroupEvent, start: datetime, end: datetime, environment_id: str) -> int: + def query(self, event: GroupEvent, start: datetime, end: datetime, environment_id: int) -> int: """ Queries Snuba for a unique condition for a single group. """ @@ -220,7 +218,7 @@ def query(self, event: GroupEvent, start: datetime, end: datetime, environment_i return query_result def query_hook( - self, event: GroupEvent, start: datetime, end: datetime, environment_id: str + self, event: GroupEvent, start: datetime, end: datetime, environment_id: int ) -> int: """ Abstract method that specifies how to query Snuba for a single group @@ -229,7 +227,7 @@ def query_hook( raise NotImplementedError def batch_query( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: str + self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: """ Queries Snuba for a unique condition for multiple groups. @@ -245,7 +243,7 @@ def batch_query( return batch_query_result def batch_query_hook( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: str + self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: """ Abstract method that specifies how to query Snuba for multiple groups @@ -253,7 +251,7 @@ def batch_query_hook( """ raise NotImplementedError - def get_rate(self, event: GroupEvent, interval: str, environment_id: str) -> int: + def get_rate(self, event: GroupEvent, interval: str, environment_id: int) -> int: _, duration = self.intervals[interval] end = timezone.now() # For conditions with interval >= 1 hour we don't need to worry about read your writes @@ -285,7 +283,7 @@ def get_snuba_query_result( model: TSDBModel, start: datetime, end: datetime, - environment_id: str, + environment_id: int, referrer_suffix: str, ) -> Mapping[int, int]: result: Mapping[int, int] = tsdb_function( @@ -308,7 +306,7 @@ def get_chunked_result( groups: list[Group], start: datetime, end: datetime, - environment_id: str, + environment_id: int, referrer_suffix: str, ) -> dict[int, int]: batch_totals: dict[int, int] = defaultdict(int) @@ -349,7 +347,7 @@ class EventFrequencyCondition(BaseEventFrequencyCondition): label = "The issue is seen more than {value} times in {interval}" def query_hook( - self, event: GroupEvent, start: datetime, end: datetime, environment_id: str + self, event: GroupEvent, start: datetime, end: datetime, environment_id: int ) -> int: sums: Mapping[int, int] = self.get_snuba_query_result( tsdb_function=self.tsdb.get_sums, @@ -364,7 +362,7 @@ def query_hook( return sums[event.group_id] def batch_query_hook( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: str + self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: batch_sums: dict[int, int] = defaultdict(int) groups = Group.objects.filter(id__in=group_ids) @@ -406,7 +404,7 @@ class EventUniqueUserFrequencyCondition(BaseEventFrequencyCondition): label = "The issue is seen by more than {value} users in {interval}" def query_hook( - self, event: GroupEvent, start: datetime, end: datetime, environment_id: str + self, event: GroupEvent, start: datetime, end: datetime, environment_id: int ) -> int: totals: Mapping[int, int] = self.get_snuba_query_result( tsdb_function=self.tsdb.get_distinct_counts_totals, @@ -421,7 +419,7 @@ def query_hook( return totals[event.group_id] def batch_query_hook( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: str + self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: batch_totals: dict[int, int] = defaultdict(int) groups = Group.objects.filter(id__in=group_ids) @@ -527,7 +525,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: } def query_hook( - self, event: GroupEvent, start: datetime, end: datetime, environment_id: str + self, event: GroupEvent, start: datetime, end: datetime, environment_id: int ) -> int: project_id = event.project_id cache_key = f"r.c.spc:{project_id}-{environment_id}" diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index cb59294323dc01..aed37be3ac9d06 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,14 +1,18 @@ import logging from collections import defaultdict -from collections.abc import Mapping from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry from sentry.models.project import Project from sentry.models.rule import Rule -from sentry.rules.processing.processor import is_condition_slow +from sentry.rules.conditions.base import EventCondition +from sentry.rules.conditions.event_frequency import EventFrequencyConditionData +from sentry.rules.processing.processor import is_condition_slow, split_conditions_and_filters +from sentry.silo.base import SiloMode +from sentry.tasks.base import instrumented_task from sentry.utils import metrics from sentry.utils.query import RangeQuerySetWrapper -from sentry.utils.safe import safe_execute + +# from sentry.utils.safe import safe_execute logger = logging.getLogger("sentry.rules.delayed_processing") @@ -16,40 +20,99 @@ PROJECT_ID_BUFFER_LIST_KEY = "project_id_buffer_list" +def get_slow_conditions(rule: Rule) -> list[EventFrequencyConditionData]: + """ + Returns the slow conditions of a rule model instance. + """ + conditions_and_filters = rule.data.get("conditions", ()) + conditions, _ = split_conditions_and_filters(conditions_and_filters) + slow_conditions = [cond for cond in conditions if is_condition_slow(cond)] + + # MyPy refuses to make TypedDict compatible with MutableMapping + # https://github.com/python/mypy/issues/4976 + return slow_conditions # type: ignore[return-value] + + @redis_buffer_registry.add_handler(BufferHookEvent.FLUSH) def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: with metrics.timer("delayed_processing.process_all_conditions.duration"): project_ids = buffer.get_set(PROJECT_ID_BUFFER_LIST_KEY) for project in RangeQuerySetWrapper(Project.objects.filter(id__in=project_ids)): - rulegroup_event_mapping = buffer.get_hash(model=Project, field={"id": project.id}) with metrics.timer("delayed_processing.process_project.duration"): - safe_execute( - apply_delayed, - project, - rulegroup_event_mapping, - _with_transaction=False, - ) - - -def apply_delayed(project: Project, rule_group_pairs: Mapping[str, str]) -> None: - rule_group_mapping = defaultdict(set) - # Map each rule to its associated groups - for rule_group in rule_group_pairs.keys(): + apply_delayed.delay(project=project, buffer=buffer) + + +@instrumented_task( + name="sentry.delayed_processing.tasks.apply_delayed", + queue="dynamicsampling", + default_retry_delay=5, + max_retries=5, + soft_time_limit=50, + time_limit=60, # 1 minute + silo_mode=SiloMode.REGION, +) +def apply_delayed(project: Project, buffer: RedisBuffer) -> None: + """ """ + # STEP 1: Fetch the rulegroup_to_event mapping for the project from redis + + # The mapping looks like: {rule.id}:{group.id} -> {event.id} + rulegroup_to_event = buffer.get_hash(model=Project, field={"id": project.id}) + + # STEP 2: Map each rule to the groups that must be checked for that rule. + rules_to_groups = defaultdict(set) + for rule_group in rulegroup_to_event.keys(): rule_id, group_id = rule_group.split(":") - rule_group_mapping[rule_id].add(group_id) + rules_to_groups[rule_id].add(group_id) + + # STEP 3: Fetch the Rule models we need to check + rules = Rule.objects.filter(id__in=list(rules_to_groups.keys())) - rules = Rule.objects.filter(id__in=list(rule_group_mapping.keys())) + # STEP 4: Create a map of unique conditions to the groups that must be + # checked for that condition, as well as the information needed to. We don't + # query per rule condition because condition of the same class, interval, + # and environment can share a single scan. + # e.g. In prod env + # - num issues in a group > 5 over 1 hr + # - num issues in a group < 100 over 1 hr + # Map unique conditions to the group IDs that need to checked for that + # condition. We also store a pointer to that condition's JSON so we can + # instantiate the class later condition_groups = defaultdict(set) for rule in rules: - rule: Rule - rule.conditions - # Exclude filters and slow conditions. We only want to check fast - # conditions because rules are only added to the buffer if we've already - # checked their slow conditions. - conditions = [cond for cond in rule.conditions if not is_condition_slow(cond)] - for condition in conditions: - unique_condition = (condition["id"], condition.interval) - condition_groups[unique_condition].update(rule_group_mapping[rule.id]) + # We only want to a rule's fast conditions because rules are only added + # to the buffer if we've already checked their fast conditions. + slow_conditions = get_slow_conditions(rule) + for condition in slow_conditions: + unique_condition = (condition["id"], condition.interval, rule.environment_id) + condition_groups[unique_condition].update(rules_to_groups[rule.id]) + + # Step 5: Instantiate each unique condition, and evaluate the relevant + # group_ids that apply for that condition + condition_group_results = defaultdict(int) + for (condition_cls_id, cond_interval, environment_id), group_ids in condition_groups.items(): + condition_cls = rules.get(condition_cls_id) + + if condition_cls is None: + logger.warning("Unregistered condition %r", condition["id"]) + return None + + condition_inst = condition_cls(project=project, data=condition, rule=rule) + if not isinstance(condition_inst, EventCondition): + logger.warning("Unregistered condition %r", condition["id"]) + return None + + # have event to pass into when instantiating condition + # condition to execute, group_ids + # group_results: dict[int, int] = cond.batch_query(group_ids) + # for group_id, result in group_results.items(): + # condition_group_results[(cond, group_id)] = result + + # safe_execute( + # apply_delayed, + # project, + # rulegroup_event_mapping, + # _with_transaction=False, + # ) diff --git a/src/sentry/rules/processing/processor.py b/src/sentry/rules/processing/processor.py index 5c6c3d0f692043..565d5d221f06e1 100644 --- a/src/sentry/rules/processing/processor.py +++ b/src/sentry/rules/processing/processor.py @@ -20,12 +20,13 @@ from sentry.rules import EventState, history, rules from sentry.rules.actions.base import instantiate_action from sentry.rules.conditions.base import EventCondition -from sentry.rules.conditions.event_frequency import EventFrequencyConditionData from sentry.rules.filters.base import EventFilter from sentry.types.rules import RuleFuture from sentry.utils.hashlib import hash_values from sentry.utils.safe import safe_execute +logger = logging.getLogger("sentry.rules") + SLOW_CONDITION_MATCHES = ["event_frequency"] @@ -40,17 +41,39 @@ def get_match_function(match_name: str) -> Callable[..., bool] | None: def is_condition_slow( - condition: EventFrequencyConditionData | Mapping[str, Any], + condition: Mapping[str, Any], ) -> bool: + """ + Returns whether a condition is considered slow. Note that the slow condition + mapping take the form on EventFrequencyConditionData. + """ for slow_conditions in SLOW_CONDITION_MATCHES: if slow_conditions in condition["id"]: return True return False -class RuleProcessor: - logger = logging.getLogger("sentry.rules") +def split_conditions_and_filters( + data: Mapping[str, Any], +) -> tuple[list[Mapping[str, Any]], list[Mapping[str, Any]]]: + conditions_and_filters = data.get("conditions", []) + conditions = [] + filters = [] + for condition_or_filter in conditions_and_filters: + id = condition_or_filter["id"] + rule_cls = rules.get(id) + if rule_cls is None: + logger.warning("Unregistered condition or filter %r", id) + continue + + if rule_cls.rule_type == EventFilter.rule_type: + filters.append(condition_or_filter) + elif rule_cls.rule_type == EventCondition.rule_type: + conditions.append(condition_or_filter) + return conditions, filters + +class RuleProcessor: def __init__( self, event: GroupEvent, @@ -129,7 +152,7 @@ def bulk_get_rule_status(self, rules: Sequence[Rule]) -> Mapping[int, GroupRuleS if missing_rule_ids: # Shouldn't happen, but log just in case - self.logger.error( + logger.error( "Failed to fetch some GroupRuleStatuses in RuleProcessor", extra={"missing_rule_ids": missing_rule_ids, "group_id": self.group.id}, ) @@ -142,20 +165,18 @@ def bulk_get_rule_status(self, rules: Sequence[Rule]) -> Mapping[int, GroupRuleS def condition_matches( self, - condition: EventFrequencyConditionData | Mapping[str, Any], + condition: Mapping[str, Any], state: EventState, rule: Rule, ) -> bool | None: condition_cls = rules.get(condition["id"]) if condition_cls is None: - self.logger.warning("Unregistered condition %r", condition["id"]) + logger.warning("Unregistered condition %r", condition["id"]) return None - # MyPy refuses to make TypedDict compatible with MutableMapping - # https://github.com/python/mypy/issues/4976 - condition_inst = condition_cls(project=self.project, data=condition, rule=rule) # type: ignore[arg-type] + condition_inst = condition_cls(project=self.project, data=condition, rule=rule) if not isinstance(condition_inst, (EventCondition, EventFilter)): - self.logger.warning("Unregistered condition %r", condition["id"]) + logger.warning("Unregistered condition %r", condition["id"]) return None passes: bool = safe_execute( condition_inst.passes, @@ -195,6 +216,7 @@ def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None: condition_match = rule.data.get("action_match") or Rule.DEFAULT_CONDITION_MATCH filter_match = rule.data.get("filter_match") or Rule.DEFAULT_FILTER_MATCH + conditions_and_filters = rule.data.get("conditions", ()) frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY try: environment = self.event.get_environment() @@ -211,8 +233,7 @@ def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None: state = self.get_state() - condition_list = rule.conditions - filter_list = rule.filters + condition_list, filter_list = split_conditions_and_filters(conditions_and_filters) # Sort `condition_list` so that most expensive conditions run last. condition_list.sort(key=lambda condition: is_condition_slow(condition)) @@ -223,13 +244,13 @@ def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None: ): if not predicate_list: continue - predicate_iter = [self.condition_matches(f, state, rule) for f in predicate_list] # type: ignore[attr-defined] + predicate_iter = [self.condition_matches(f, state, rule) for f in predicate_list] predicate_func = get_match_function(match) if predicate_func: if not predicate_func(predicate_iter): return else: - self.logger.error( + logger.error( f"Unsupported {name}_match {match!r} for rule {rule.id}", filter_match, rule.id, @@ -279,7 +300,7 @@ def activate_downstream_actions( notification_uuid=notification_uuid, ) if results is None: - self.logger.warning("Action %s did not return any futures", action["id"]) + logger.warning("Action %s did not return any futures", action["id"]) continue for future in results: From d69b2685977265b2e4a3aebdecec0cb6f18f4651 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Sat, 13 Apr 2024 23:31:17 -1000 Subject: [PATCH 16/20] fixed most typing and stuffs --- .../rules/conditions/event_frequency.py | 40 +++---- .../rules/processing/delayed_processing.py | 103 ++++++++++++++---- 2 files changed, 102 insertions(+), 41 deletions(-) diff --git a/src/sentry/rules/conditions/event_frequency.py b/src/sentry/rules/conditions/event_frequency.py index 55c8d19d86b6db..2c306dbec2f831 100644 --- a/src/sentry/rules/conditions/event_frequency.py +++ b/src/sentry/rules/conditions/event_frequency.py @@ -5,7 +5,7 @@ import logging import re from collections import defaultdict -from collections.abc import Callable, Mapping, Sequence +from collections.abc import Callable, Mapping from datetime import datetime, timedelta from typing import Any, Literal, NotRequired @@ -32,7 +32,7 @@ from sentry.utils.iterators import chunked from sentry.utils.snuba import options_override -standard_intervals = { +STANDARD_INTERVALS = { "1m": ("one minute", timedelta(minutes=1)), "5m": ("5 minutes", timedelta(minutes=5)), "15m": ("15 minutes", timedelta(minutes=15)), @@ -41,7 +41,7 @@ "1w": ("one week", timedelta(days=7)), "30d": ("30 days", timedelta(days=30)), } -comparison_intervals = { +COMPARISON_INTERVALS = { "5m": ("5 minutes", timedelta(minutes=5)), "15m": ("15 minutes", timedelta(minutes=15)), "1h": ("one hour", timedelta(hours=1)), @@ -79,7 +79,7 @@ class EventFrequencyConditionData(GenericCondition): class EventFrequencyForm(forms.Form): - intervals = standard_intervals + intervals = STANDARD_INTERVALS interval = forms.ChoiceField( choices=[ (key, label) @@ -96,7 +96,7 @@ class EventFrequencyForm(forms.Form): comparisonInterval = forms.ChoiceField( choices=[ (key, label) - for key, (label, _) in sorted(comparison_intervals.items(), key=lambda item: item[1][1]) + for key, (label, _) in sorted(COMPARISON_INTERVALS.items(), key=lambda item: item[1][1]) ], required=False, ) @@ -120,7 +120,7 @@ def clean(self) -> dict[str, Any] | None: class BaseEventFrequencyCondition(EventCondition, abc.ABC): - intervals = standard_intervals + intervals = STANDARD_INTERVALS form_cls = EventFrequencyForm def __init__( @@ -146,7 +146,7 @@ def __init__( # MyPy refuses to make TypedDict compatible with MutableMapping # https://github.com/python/mypy/issues/4976 - super().__init__(data=data, *args, **kwargs) # type:ignore[misc, arg-type] + super().__init__(data=data, *args, **kwargs) # type:ignore[misc] def _get_options(self) -> tuple[str | None, float | None]: interval, value = None, None @@ -190,7 +190,7 @@ def passes_activity_frequency( result = bucket_count(activity.timestamp - interval_delta, activity.timestamp, buckets) if comparison_type == ComparisonType.PERCENT: - comparison_interval = comparison_intervals[self.get_option("comparisonInterval")][1] + comparison_interval = COMPARISON_INTERVALS[self.get_option("comparisonInterval")][1] comparison_end = activity.timestamp - comparison_interval comparison_result = bucket_count( @@ -227,7 +227,7 @@ def query_hook( raise NotImplementedError def batch_query( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int + self, group_ids: set[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: """ Queries Snuba for a unique condition for multiple groups. @@ -243,7 +243,7 @@ def batch_query( return batch_query_result def batch_query_hook( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int + self, group_ids: set[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: """ Abstract method that specifies how to query Snuba for multiple groups @@ -263,7 +263,7 @@ def get_rate(self, event: GroupEvent, interval: str, environment_id: int) -> int result: int = self.query(event, end - duration, end, environment_id=environment_id) comparison_type = self.get_option("comparisonType", ComparisonType.COUNT) if comparison_type == ComparisonType.PERCENT: - comparison_interval = comparison_intervals[self.get_option("comparisonInterval")][1] + comparison_interval = COMPARISON_INTERVALS[self.get_option("comparisonInterval")][1] comparison_end = end - comparison_interval # TODO: Figure out if there's a way we can do this less frequently. All queries are # automatically cached for 10s. We could consider trying to cache this and the main @@ -362,7 +362,7 @@ def query_hook( return sums[event.group_id] def batch_query_hook( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int + self, group_ids: set[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: batch_sums: dict[int, int] = defaultdict(int) groups = Group.objects.filter(id__in=group_ids) @@ -419,7 +419,7 @@ def query_hook( return totals[event.group_id] def batch_query_hook( - self, group_ids: Sequence[int], start: datetime, end: datetime, environment_id: int + self, group_ids: set[int], start: datetime, end: datetime, environment_id: int ) -> dict[int, int]: batch_totals: dict[int, int] = defaultdict(int) groups = Group.objects.filter(id__in=group_ids) @@ -456,7 +456,7 @@ def get_preview_aggregate(self) -> tuple[str, str]: return "uniq", "user" -percent_intervals = { +PERCENT_INTERVALS = { "1m": ("1 minute", timedelta(minutes=1)), "5m": ("5 minutes", timedelta(minutes=5)), "10m": ("10 minutes", timedelta(minutes=10)), @@ -464,7 +464,7 @@ def get_preview_aggregate(self) -> tuple[str, str]: "1h": ("1 hour", timedelta(minutes=60)), } -percent_intervals_to_display = { +PERCENT_INTERVALS_TO_DISPLAY = { "5m": ("5 minutes", timedelta(minutes=5)), "10m": ("10 minutes", timedelta(minutes=10)), "30m": ("30 minutes", timedelta(minutes=30)), @@ -474,12 +474,12 @@ def get_preview_aggregate(self) -> tuple[str, str]: class EventFrequencyPercentForm(EventFrequencyForm): - intervals = percent_intervals_to_display + intervals = PERCENT_INTERVALS_TO_DISPLAY interval = forms.ChoiceField( choices=[ (key, label) for key, (label, duration) in sorted( - percent_intervals_to_display.items(), + PERCENT_INTERVALS_TO_DISPLAY.items(), key=lambda key____label__duration: key____label__duration[1][1], ) ] @@ -507,7 +507,7 @@ class EventFrequencyPercentCondition(BaseEventFrequencyCondition): logger = logging.getLogger("sentry.rules.event_frequency") def __init__(self, *args: Any, **kwargs: Any) -> None: - self.intervals = percent_intervals + self.intervals = PERCENT_INTERVALS self.form_cls = EventFrequencyPercentForm super().__init__(*args, **kwargs) @@ -518,7 +518,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: "choices": [ (key, label) for key, (label, duration) in sorted( - percent_intervals_to_display.items(), + PERCENT_INTERVALS_TO_DISPLAY.items(), key=lambda key____label__duration: key____label__duration[1][1], ) ], @@ -544,7 +544,7 @@ def query_hook( if session_count_last_hour >= MIN_SESSIONS_TO_FIRE: interval_in_minutes = ( - percent_intervals[self.get_option("interval")][1].total_seconds() // 60 + PERCENT_INTERVALS[self.get_option("interval")][1].total_seconds() // 60 ) avg_sessions_in_interval = session_count_last_hour / (60 / interval_in_minutes) issue_count = self.get_snuba_query_result( diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index aed37be3ac9d06..648ec9a76c9c1d 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,17 +1,27 @@ +import contextlib import logging from collections import defaultdict +from datetime import timedelta, timezone +from typing import DefaultDict, NamedTuple from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry from sentry.models.project import Project from sentry.models.rule import Rule from sentry.rules.conditions.base import EventCondition -from sentry.rules.conditions.event_frequency import EventFrequencyConditionData +from sentry.rules.conditions.event_frequency import ( + BaseEventFrequencyCondition, + ComparisonType, + EventFrequencyConditionData, + percent_increase, +) from sentry.rules.processing.processor import is_condition_slow, split_conditions_and_filters from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task from sentry.utils import metrics from sentry.utils.query import RangeQuerySetWrapper +from sentry.utils.snuba import options_override +# XXX: Where should the safe_execute part go? Is it even needed for tasks? # from sentry.utils.safe import safe_execute logger = logging.getLogger("sentry.rules.delayed_processing") @@ -58,10 +68,11 @@ def apply_delayed(project: Project, buffer: RedisBuffer) -> None: # STEP 1: Fetch the rulegroup_to_event mapping for the project from redis # The mapping looks like: {rule.id}:{group.id} -> {event.id} + # TODO: Updating return type of get_hash rulegroup_to_event = buffer.get_hash(model=Project, field={"id": project.id}) # STEP 2: Map each rule to the groups that must be checked for that rule. - rules_to_groups = defaultdict(set) + rules_to_groups: DefaultDict[str, set[int]] = defaultdict(set) for rule_group in rulegroup_to_event.keys(): rule_id, group_id = rule_group.split(":") rules_to_groups[rule_id].add(group_id) @@ -69,43 +80,93 @@ def apply_delayed(project: Project, buffer: RedisBuffer) -> None: # STEP 3: Fetch the Rule models we need to check rules = Rule.objects.filter(id__in=list(rules_to_groups.keys())) - # STEP 4: Create a map of unique conditions to the groups that must be - # checked for that condition, as well as the information needed to. We don't - # query per rule condition because condition of the same class, interval, - # and environment can share a single scan. - # e.g. In prod env - # - num issues in a group > 5 over 1 hr - # - num issues in a group < 100 over 1 hr + # STEP 4: Create a map of unique conditions to a tuple containing the JSON + # information needed to instantiate that condition class and the group_ids that + # must be checked for that condition. We don't query per rule condition because + # condition of the same class, interval, and environment can share a single scan. # Map unique conditions to the group IDs that need to checked for that # condition. We also store a pointer to that condition's JSON so we can # instantiate the class later - condition_groups = defaultdict(set) + class UniqueCondition(NamedTuple): + cls_id: str + interval: str + environment_id: int + + class DataAndGroups(NamedTuple): + data: EventFrequencyConditionData + group_ids: set[int] + + condition_groups: dict[UniqueCondition, DataAndGroups] = {} + for rule in rules: # We only want to a rule's fast conditions because rules are only added # to the buffer if we've already checked their fast conditions. slow_conditions = get_slow_conditions(rule) - for condition in slow_conditions: - unique_condition = (condition["id"], condition.interval, rule.environment_id) - condition_groups[unique_condition].update(rules_to_groups[rule.id]) + for condition_data in slow_conditions: + unique_condition = UniqueCondition( + condition_data["id"], condition_data["interval"], rule.environment_id + ) + + # Add to set the set of group_ids if there are already group_ids for + # that apply to the unique condition + if data_and_groups := condition_groups.get(unique_condition): + data_and_groups.group_ids.update(rules_to_groups[rule.id]) + # Otherwise, create the tuple containing the condition data and the + # set of group_ids that apply to the unique condition + else: + condition_groups[unique_condition] = DataAndGroups( + condition_data, rules_to_groups[rule.id] + ) # Step 5: Instantiate each unique condition, and evaluate the relevant # group_ids that apply for that condition - condition_group_results = defaultdict(int) - for (condition_cls_id, cond_interval, environment_id), group_ids in condition_groups.items(): - condition_cls = rules.get(condition_cls_id) + condition_group_results: dict[UniqueCondition, dict[int, int]] = {} + for unique_condition, (condition_data, group_ids) in condition_groups.items(): + condition_cls = rules.get(unique_condition.cls_id) if condition_cls is None: - logger.warning("Unregistered condition %r", condition["id"]) + logger.warning("Unregistered condition %r", condition_data["id"]) return None - condition_inst = condition_cls(project=project, data=condition, rule=rule) + condition_inst: BaseEventFrequencyCondition = condition_cls( + project=project, data=condition_data, rule=rule + ) if not isinstance(condition_inst, EventCondition): - logger.warning("Unregistered condition %r", condition["id"]) + logger.warning("Unregistered condition %r", condition_data["id"]) return None - # have event to pass into when instantiating condition - # condition to execute, group_ids + _, duration = condition_inst.intervals[unique_condition.interval] + end = timezone.now() + # For conditions with interval >= 1 hour we don't need to worry about read your writes + # consistency. Disable it so that we can scale to more nodes. + option_override_cm: contextlib.AbstractContextManager[object] = contextlib.nullcontext() + if duration >= timedelta(hours=1): + option_override_cm = options_override({"consistent": False}) + + with option_override_cm: + results = condition_inst.batch_query( + group_ids, end - duration, end, environment_id=unique_condition.environment_id + ) + comparison_type = condition_data.get("comparisonType", ComparisonType.COUNT) + if comparison_type == ComparisonType.PERCENT: + comparison_interval = condition_inst.intervals[unique_condition.interval][1] + comparison_end = end - comparison_interval + + comparison_results = condition_inst.batch_query( + group_ids, + comparison_end - duration, + comparison_end, + environment_id=unique_condition.environment_id, + ) + + results = { + group_id: percent_increase(results[group_id], comparison_results[group_id]) + for group_id in group_ids + } + + condition_group_results[unique_condition] = results + # group_results: dict[int, int] = cond.batch_query(group_ids) # for group_id, result in group_results.items(): # condition_group_results[(cond, group_id)] = result From cd2bc097b063befc71c7bbd154982a85239263ad Mon Sep 17 00:00:00 2001 From: schew2381 Date: Sun, 14 Apr 2024 21:48:33 -1000 Subject: [PATCH 17/20] fix all typing --- src/sentry/buffer/redis.py | 2 +- src/sentry/rules/processing/delayed_processing.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index ad6bab81e73de9..e042572a8ecd01 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -289,7 +289,7 @@ def push_to_hash( def get_hash( self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> dict[str, str]: + ) -> dict[str, int]: key = self._make_key(model, field) return self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 648ec9a76c9c1d..8278b800bb52ee 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,7 +1,7 @@ import contextlib import logging from collections import defaultdict -from datetime import timedelta, timezone +from datetime import datetime, timedelta from typing import DefaultDict, NamedTuple from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry @@ -75,7 +75,7 @@ def apply_delayed(project: Project, buffer: RedisBuffer) -> None: rules_to_groups: DefaultDict[str, set[int]] = defaultdict(set) for rule_group in rulegroup_to_event.keys(): rule_id, group_id = rule_group.split(":") - rules_to_groups[rule_id].add(group_id) + rules_to_groups[rule_id].add(int(group_id)) # STEP 3: Fetch the Rule models we need to check rules = Rule.objects.filter(id__in=list(rules_to_groups.keys())) @@ -137,7 +137,7 @@ class DataAndGroups(NamedTuple): return None _, duration = condition_inst.intervals[unique_condition.interval] - end = timezone.now() + end = datetime.now() # For conditions with interval >= 1 hour we don't need to worry about read your writes # consistency. Disable it so that we can scale to more nodes. option_override_cm: contextlib.AbstractContextManager[object] = contextlib.nullcontext() From b6bafacc160546ab4e98ce4173a10efa322c82c5 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Sun, 14 Apr 2024 21:53:52 -1000 Subject: [PATCH 18/20] add more typing --- src/sentry/rules/conditions/event_frequency.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sentry/rules/conditions/event_frequency.py b/src/sentry/rules/conditions/event_frequency.py index 2c306dbec2f831..b66f0c454ae3f2 100644 --- a/src/sentry/rules/conditions/event_frequency.py +++ b/src/sentry/rules/conditions/event_frequency.py @@ -32,7 +32,7 @@ from sentry.utils.iterators import chunked from sentry.utils.snuba import options_override -STANDARD_INTERVALS = { +STANDARD_INTERVALS: dict[str, tuple[str, timedelta]] = { "1m": ("one minute", timedelta(minutes=1)), "5m": ("5 minutes", timedelta(minutes=5)), "15m": ("15 minutes", timedelta(minutes=15)), @@ -41,7 +41,7 @@ "1w": ("one week", timedelta(days=7)), "30d": ("30 days", timedelta(days=30)), } -COMPARISON_INTERVALS = { +COMPARISON_INTERVALS: dict[str, tuple[str, timedelta]] = { "5m": ("5 minutes", timedelta(minutes=5)), "15m": ("15 minutes", timedelta(minutes=15)), "1h": ("one hour", timedelta(hours=1)), @@ -456,7 +456,7 @@ def get_preview_aggregate(self) -> tuple[str, str]: return "uniq", "user" -PERCENT_INTERVALS = { +PERCENT_INTERVALS: dict[str, tuple[str, timedelta]] = { "1m": ("1 minute", timedelta(minutes=1)), "5m": ("5 minutes", timedelta(minutes=5)), "10m": ("10 minutes", timedelta(minutes=10)), @@ -464,7 +464,7 @@ def get_preview_aggregate(self) -> tuple[str, str]: "1h": ("1 hour", timedelta(minutes=60)), } -PERCENT_INTERVALS_TO_DISPLAY = { +PERCENT_INTERVALS_TO_DISPLAY: dict[str, tuple[str, timedelta]] = { "5m": ("5 minutes", timedelta(minutes=5)), "10m": ("10 minutes", timedelta(minutes=10)), "30m": ("30 minutes", timedelta(minutes=30)), From be7ead8274b2a54cafec6e0d9b51533e47392993 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 15 Apr 2024 00:53:56 -1000 Subject: [PATCH 19/20] finish working --- .../rules/processing/delayed_processing.py | 66 ++++++++++++++----- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 8278b800bb52ee..1f28f9c3596e32 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -21,9 +21,6 @@ from sentry.utils.query import RangeQuerySetWrapper from sentry.utils.snuba import options_override -# XXX: Where should the safe_execute part go? Is it even needed for tasks? -# from sentry.utils.safe import safe_execute - logger = logging.getLogger("sentry.rules.delayed_processing") @@ -65,6 +62,18 @@ def process_delayed_alert_conditions(buffer: RedisBuffer) -> None: ) def apply_delayed(project: Project, buffer: RedisBuffer) -> None: """ """ + + # XXX(schew2381): This is from + # https://github.com/getsentry/sentry/blob/fbfd6800cf067f171840c427df7d5c2864b91fb0/src/sentry/rules/processor.py#L209-L212 + # Do we need to check this before we start the steps (ask dan, I think the + # answer is now b/c we check it before triggering actions)) + + # frequency = rule.data.get("frequency") or Rule.DEFAULT_FREQUENCY + # now = datetime.now() + # freq_offset = now - timedelta(minutes=frequency) + # if status.last_active and status.last_active > freq_offset: + # return + # STEP 1: Fetch the rulegroup_to_event mapping for the project from redis # The mapping looks like: {rule.id}:{group.id} -> {event.id} @@ -72,10 +81,10 @@ def apply_delayed(project: Project, buffer: RedisBuffer) -> None: rulegroup_to_event = buffer.get_hash(model=Project, field={"id": project.id}) # STEP 2: Map each rule to the groups that must be checked for that rule. - rules_to_groups: DefaultDict[str, set[int]] = defaultdict(set) + rules_to_groups: DefaultDict[int, set[int]] = defaultdict(set) for rule_group in rulegroup_to_event.keys(): rule_id, group_id = rule_group.split(":") - rules_to_groups[rule_id].add(int(group_id)) + rules_to_groups[int(rule_id)].add(int(group_id)) # STEP 3: Fetch the Rule models we need to check rules = Rule.objects.filter(id__in=list(rules_to_groups.keys())) @@ -121,6 +130,9 @@ class DataAndGroups(NamedTuple): # Step 5: Instantiate each unique condition, and evaluate the relevant # group_ids that apply for that condition + + # XXX: Probably want to safe execute somewhere in this step before making + # the query condition_group_results: dict[UniqueCondition, dict[int, int]] = {} for unique_condition, (condition_data, group_ids) in condition_groups.items(): condition_cls = rules.get(unique_condition.cls_id) @@ -148,6 +160,9 @@ class DataAndGroups(NamedTuple): results = condition_inst.batch_query( group_ids, end - duration, end, environment_id=unique_condition.environment_id ) + + # If the condition is a percent comparison, we need to query the + # previous interval to compare against the current interval comparison_type = condition_data.get("comparisonType", ComparisonType.COUNT) if comparison_type == ComparisonType.PERCENT: comparison_interval = condition_inst.intervals[unique_condition.interval][1] @@ -160,20 +175,35 @@ class DataAndGroups(NamedTuple): environment_id=unique_condition.environment_id, ) - results = { - group_id: percent_increase(results[group_id], comparison_results[group_id]) - for group_id in group_ids - } + results = { + group_id: percent_increase(results[group_id], comparison_results[group_id]) + for group_id in group_ids + } condition_group_results[unique_condition] = results - # group_results: dict[int, int] = cond.batch_query(group_ids) - # for group_id, result in group_results.items(): - # condition_group_results[(cond, group_id)] = result + # Step 6: For each rule and group applying to that rule, check if the group + # meets the conditions of the rule (basically doing BaseEventFrequencyCondition.passes) + for rule in rules: + # don't know why mypy is complaining : (expression has type "int", variable has type "str") + for group_id in rules_to_groups[rule.id]: # type: ignore[assignment] + pass + + # get: + # 1. rule conditions + result of condition check in results dict + # 2. the predicate func (any, all) to iterate over for conditions + # 3. The interval rules value to compare the result of the query + # against (e.g. # issues is > 50, 50 is the value). This is + # stored in DataAndGroups.data["value"], see EventFrequencyConditionData + + # Store all rule/group pairs we need to activate + + # Step 8: Bulk fetch the events of the rule/group pairs we need to trigger + # actions for + + # Was thinking we could do something like this where we get futures, + # then safe execute them + # https://github.com/getsentry/sentry/blob/3075c03e0819dd2c974897cc3014764c43151db5/src/sentry/tasks/post_process.py#L1115-L1128 - # safe_execute( - # apply_delayed, - # project, - # rulegroup_event_mapping, - # _with_transaction=False, - # ) + # XXX: Be sure to do this before triggering actions! + # https://github.com/getsentry/sentry/blob/fbfd6800cf067f171840c427df7d5c2864b91fb0/src/sentry/rules/processor.py#L247-L254 From 3c582f94b7642fc7947882863f92ba5ac2ba6853 Mon Sep 17 00:00:00 2001 From: schew2381 Date: Mon, 15 Apr 2024 01:01:53 -1000 Subject: [PATCH 20/20] add extra comment --- src/sentry/rules/processing/delayed_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 1f28f9c3596e32..5bf17432952134 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -199,7 +199,7 @@ class DataAndGroups(NamedTuple): # Store all rule/group pairs we need to activate # Step 8: Bulk fetch the events of the rule/group pairs we need to trigger - # actions for + # actions for, then trigger those actions # Was thinking we could do something like this where we get futures, # then safe execute them