Skip to content

Commit 6e2436e

Browse files
authored
feat(taskworker): Make crons taskworker compatible (#88727)
This work is required to migrate tasks from celery to the new taskbroker system. The sentry option will be used to control the rollout of these tasks. The full migration plan is describe in this [document](https://www.notion.so/sentry/Rollout-Planning-1bd8b10e4b5d80aeaaa7dba0efca83bc).
1 parent a9223ea commit 6e2436e

File tree

5 files changed

+20
-1
lines changed

5 files changed

+20
-1
lines changed

src/sentry/conf/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,6 +1401,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
14011401
"sentry.deletions.tasks.hybrid_cloud",
14021402
"sentry.deletions.tasks.scheduled",
14031403
"sentry.incidents.tasks",
1404+
"sentry.monitors.tasks.clock_pulse",
1405+
"sentry.monitors.tasks.detect_broken_monitor_envs",
14041406
"sentry.snuba.tasks",
14051407
"sentry.tasks.auth.auth",
14061408
"sentry.tasks.auth.check_auth",

src/sentry/monitors/tasks/clock_pulse.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from sentry.monitors.clock_dispatch import try_monitor_clock_tick
1818
from sentry.silo.base import SiloMode
1919
from sentry.tasks.base import instrumented_task
20+
from sentry.taskworker.config import TaskworkerConfig
21+
from sentry.taskworker.namespaces import crons_tasks
2022
from sentry.utils.arroyo_producer import SingletonProducer
2123
from sentry.utils.kafka_config import (
2224
get_kafka_admin_cluster_options,
@@ -54,7 +56,11 @@ def _get_partitions() -> Mapping[int, PartitionMetadata]:
5456
return topic_metadata.partitions
5557

5658

57-
@instrumented_task(name="sentry.monitors.tasks.clock_pulse", silo_mode=SiloMode.REGION)
59+
@instrumented_task(
60+
name="sentry.monitors.tasks.clock_pulse",
61+
silo_mode=SiloMode.REGION,
62+
taskworker_config=TaskworkerConfig(namespace=crons_tasks),
63+
)
5864
def clock_pulse(current_datetime=None):
5965
"""
6066
This task is run once a minute when to produce 'clock pulses' into the

src/sentry/monitors/tasks/detect_broken_monitor_envs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
from sentry.notifications.services import notifications_service
2727
from sentry.notifications.types import NotificationSettingEnum
2828
from sentry.tasks.base import instrumented_task
29+
from sentry.taskworker.config import TaskworkerConfig
30+
from sentry.taskworker.namespaces import crons_tasks
2931
from sentry.types.actor import Actor
3032
from sentry.utils.email import MessageBuilder
3133
from sentry.utils.email.manager import get_email_addresses
@@ -162,6 +164,7 @@ def build_open_incidents_queryset():
162164
time_limit=15 * 60,
163165
soft_time_limit=10 * 60,
164166
record_timing=True,
167+
taskworker_config=TaskworkerConfig(namespace=crons_tasks, processing_deadline_duration=15 * 60),
165168
)
166169
def detect_broken_monitor_envs():
167170
open_incidents_qs = build_open_incidents_queryset()
@@ -179,6 +182,7 @@ def detect_broken_monitor_envs():
179182
time_limit=15 * 60,
180183
soft_time_limit=10 * 60,
181184
record_timing=True,
185+
taskworker_config=TaskworkerConfig(namespace=crons_tasks, processing_deadline_duration=15 * 60),
182186
)
183187
def detect_broken_monitor_envs_for_org(org_id: int):
184188
current_time = django_timezone.now()

src/sentry/options/defaults.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3235,3 +3235,8 @@
32353235
default={},
32363236
flags=FLAG_AUTOMATOR_MODIFIABLE,
32373237
)
3238+
register(
3239+
"taskworker.crons.rollout",
3240+
default={},
3241+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3242+
)

src/sentry/taskworker/namespaces.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
auth_control_tasks = taskregistry.create_namespace("auth.control")
99

10+
crons_tasks = taskregistry.create_namespace("crons")
11+
1012
deletion_tasks = taskregistry.create_namespace(
1113
"deletions",
1214
processing_deadline_duration=60 * 3,

0 commit comments

Comments
 (0)