Skip to content

feat(group-attributes): Add migration to backfill group attributes #69071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion migrations_lockfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ feedback: 0004_index_together
hybridcloud: 0016_add_control_cacheversion
nodestore: 0002_nodestore_no_dictfield
replays: 0004_index_together
sentry: 0699_update_monitor_owner_team_id_cascsade
sentry: 0700_backfill_group_priority_to_self_hosted
social_auth: 0002_default_auto_field
188 changes: 188 additions & 0 deletions src/sentry/migrations/0700_backfill_group_priority_to_self_hosted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Generated by Django 5.0.3 on 2024-04-16 22:52

from __future__ import annotations

import dataclasses
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING

from django.conf import settings
from django.db import migrations
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.db.migrations.state import StateApps
from django.db.models import F, Window
from django.db.models.functions import Rank
from sentry_kafka_schemas.schema_types.group_attributes_v1 import GroupAttributesSnapshot

from sentry.issues.attributes import produce_snapshot_to_kafka
from sentry.new_migrations.migrations import CheckedMigration
from sentry.utils import redis
from sentry.utils.iterators import chunked
from sentry.utils.query import RangeQuerySetWrapperWithProgressBarApprox

if TYPE_CHECKING:
from sentry.models.group import Group
from sentry.models.groupassignee import GroupAssignee
from sentry.models.groupowner import GroupOwner

CHUNK_SIZE = 10000


class GroupOwnerType(Enum):
SUSPECT_COMMIT = 0
OWNERSHIP_RULE = 1
CODEOWNERS = 2


@dataclasses.dataclass
class GroupValues:
id: int
project_id: int
status: int
substatus: int | None
first_seen: datetime
num_comments: int


def _bulk_retrieve_group_values(group_ids: list[int], Group: type[Group]) -> list[GroupValues]:
group_values_map = {
group["id"]: group
for group in Group.objects.filter(id__in=group_ids).values(
"id", "project_id", "status", "substatus", "first_seen", "num_comments"
)
}
assert len(group_values_map) == len(group_ids)

results = []
for group_id in group_ids:
group_values = group_values_map[group_id]
results.append(
GroupValues(
id=group_id,
project_id=group_values["project_id"],
status=group_values["status"],
substatus=group_values["substatus"],
first_seen=group_values["first_seen"],
num_comments=group_values["num_comments"],
)
)
return results


def _bulk_retrieve_snapshot_values(
group_values_list: list[GroupValues],
GroupAssignee: type[GroupAssignee],
GroupOwner: type[GroupOwner],
) -> list[GroupAttributesSnapshot]:
group_assignee_map = {
ga["group_id"]: ga
for ga in GroupAssignee.objects.filter(
group_id__in=[gv.id for gv in group_values_list]
).values("group_id", "user_id", "team_id")
}

group_owner_map = {}

for group_owner in (
GroupOwner.objects.annotate(
position=Window(Rank(), partition_by=[F("group_id"), F("type")], order_by="-date_added")
)
.filter(position=1, group_id__in=[g.id for g in group_values_list])
.values("group_id", "user_id", "team_id", "type")
):
group_owner_map[(group_owner["group_id"], group_owner["type"])] = group_owner

snapshots = []
for group_value in group_values_list:
assignee = group_assignee_map.get(group_value.id)
suspect_owner = group_owner_map.get((group_value.id, GroupOwnerType.SUSPECT_COMMIT.value))
ownership_owner = group_owner_map.get((group_value.id, GroupOwnerType.OWNERSHIP_RULE.value))
codeowners_owner = group_owner_map.get((group_value.id, GroupOwnerType.CODEOWNERS.value))
snapshot: GroupAttributesSnapshot = {
"group_deleted": False,
"project_id": group_value.project_id,
"group_id": group_value.id,
"status": group_value.status,
"substatus": group_value.substatus,
"first_seen": group_value.first_seen.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"num_comments": group_value.num_comments,
"timestamp": datetime.now().isoformat(),
"assignee_user_id": assignee["user_id"] if assignee else None,
"assignee_team_id": assignee["team_id"] if assignee else None,
"owner_suspect_commit_user_id": suspect_owner["user_id"] if suspect_owner else None,
"owner_ownership_rule_user_id": ownership_owner["user_id"] if ownership_owner else None,
"owner_ownership_rule_team_id": ownership_owner["team_id"] if ownership_owner else None,
"owner_codeowners_user_id": codeowners_owner["user_id"] if codeowners_owner else None,
"owner_codeowners_team_id": codeowners_owner["team_id"] if codeowners_owner else None,
}
snapshots.append(snapshot)

return snapshots


def bulk_send_snapshot_values(
group_ids: list[int],
Group: type[Group],
GroupAssignee: type[GroupAssignee],
GroupOwner: type[GroupOwner],
) -> None:
group_list = []
if group_ids:
group_list.extend(_bulk_retrieve_group_values(group_ids, Group))

snapshots = _bulk_retrieve_snapshot_values(group_list, GroupAssignee, GroupOwner)

for snapshot in snapshots:
produce_snapshot_to_kafka(snapshot)


def backfill_group_attributes_to_snuba(
apps: StateApps, schema_editor: BaseDatabaseSchemaEditor
) -> None:
Group = apps.get_model("sentry", "Group")
GroupAssignee = apps.get_model("sentry", "GroupAssignee")
GroupOwner = apps.get_model("sentry", "GroupOwner")

backfill_key = "backfill_group_attributes_to_snuba_progress"
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

progress_id = int(redis_client.get(backfill_key) or 0)

for group_ids in chunked(
RangeQuerySetWrapperWithProgressBarApprox(
Group.objects.filter(id__gt=progress_id).values_list("id", flat=True),
step=CHUNK_SIZE,
result_value_getter=lambda item: item,
),
CHUNK_SIZE,
):
bulk_send_snapshot_values(group_ids, Group, GroupAssignee, GroupOwner)
# Save progress to redis in case we have to restart
redis_client.set(backfill_key, group_ids[-1], ex=60 * 60 * 24 * 7)


class Migration(CheckedMigration):
# This flag is used to mark that a migration shouldn't be automatically run in production. For
# the most part, this should only be used for operations where it's safe to run the migration
# after your code has deployed. So this should not be used for most operations that alter the
# schema of a table.
# Here are some things that make sense to mark as post deployment:
# - Large data migrations. Typically we want these to be run manually by ops so that they can
# be monitored and not block the deploy for a long period of time while they run.
# - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
# have ops run this and not block the deploy. Note that while adding an index is a schema
# change, it's completely safe to run the operation after the code has deployed.
is_post_deployment = True

dependencies = [
("sentry", "0699_update_monitor_owner_team_id_cascsade"),
]

operations = [
migrations.RunPython(
backfill_group_attributes_to_snuba,
reverse_code=migrations.RunPython.noop,
hints={"tables": ["sentry_groupedmessage"]},
)
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from sentry_sdk import Hub
from snuba_sdk.legacy import json_to_snql

from sentry.testutils.cases import SnubaTestCase, TestMigrations
from sentry.utils import json, redis
from sentry.utils.snuba import _snql_query


def run_test(expected_groups):
project = expected_groups[0].project
json_body = {
"selected_columns": [
"group_id",
],
"offset": 0,
"limit": 100,
"project": [project.id],
"dataset": "group_attributes",
"order_by": ["group_id"],
"consistent": True,
"tenant_ids": {
"referrer": "group_attributes",
"organization_id": project.organization_id,
},
}
request = json_to_snql(json_body, "group_attributes")
request.validate()
identity = lambda x: x
resp = _snql_query(((request, identity, identity), Hub(Hub.current), {}, "test_api"))[0]
assert resp.status == 200
data = json.loads(resp.data)["data"]
assert {g.id for g in expected_groups} == {d["group_id"] for d in data}


class TestBackfillGroupAttributes(SnubaTestCase, TestMigrations):
migrate_from = "0699_update_monitor_owner_team_id_cascsade"
migrate_to = "0700_backfill_group_priority_to_self_hosted"

def setup_initial_state(self):
self.group = self.create_group()
self.group_2 = self.create_group()

def test(self):
run_test([self.group, self.group_2])


class TestBackfillGroupAttributesRetry(SnubaTestCase, TestMigrations):
migrate_from = "0699_update_monitor_owner_team_id_cascsade"
migrate_to = "0700_backfill_group_priority_to_self_hosted"

def setup_initial_state(self):
self.group = self.create_group()
self.group_2 = self.create_group()
redis_client = redis.redis_clusters.get("default")
redis_client.set("backfill_group_attributes_to_snuba_progress", self.group.id)

def test_restart(self):
run_test([self.group_2])
Loading