|
3 | 3 | import logging
|
4 | 4 | from datetime import datetime, timedelta, timezone
|
5 | 5 |
|
6 |
| -import sentry_sdk |
7 | 6 | from arroyo import Topic as ArroyoTopic
|
8 | 7 | from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
|
9 | 8 | from django.conf import settings
|
10 | 9 | from sentry_kafka_schemas.codecs import Codec
|
11 | 10 | from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick
|
12 | 11 |
|
13 |
| -from sentry import options |
14 | 12 | from sentry.conf.types.kafka_definition import Topic, get_topic_codec
|
15 |
| -from sentry.monitors.tasks.check_missed import check_missing |
16 |
| -from sentry.monitors.tasks.check_timeout import check_timeout |
17 | 13 | from sentry.utils import metrics, redis
|
18 | 14 | from sentry.utils.arroyo_producer import SingletonProducer
|
19 | 15 | from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
|
@@ -63,19 +59,15 @@ def _dispatch_tick(ts: datetime):
|
63 | 59 | sentry.io, when we deploy we restart the celery beat worker and it will
|
64 | 60 | skip any tasks it missed)
|
65 | 61 | """
|
66 |
| - if options.get("crons.use_clock_pulse_consumer"): |
67 |
| - if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream": |
68 |
| - # XXX(epurkhiser): Unclear what we want to do if we're not using kafka |
69 |
| - return |
| 62 | + if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream": |
| 63 | + # XXX(epurkhiser): Unclear what we want to do if we're not using kafka |
| 64 | + return |
70 | 65 |
|
71 |
| - message: ClockTick = {"ts": ts.timestamp()} |
72 |
| - payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode(message), []) |
| 66 | + message: ClockTick = {"ts": ts.timestamp()} |
| 67 | + payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode(message), []) |
73 | 68 |
|
74 |
| - topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"] |
75 |
| - _clock_tick_producer.produce(ArroyoTopic(topic), payload) |
76 |
| - else: |
77 |
| - check_missing.delay(current_datetime=ts) |
78 |
| - check_timeout.delay(current_datetime=ts) |
| 69 | + topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"] |
| 70 | + _clock_tick_producer.produce(ArroyoTopic(topic), payload) |
79 | 71 |
|
80 | 72 |
|
81 | 73 | def try_monitor_clock_tick(ts: datetime, partition: int):
|
@@ -150,20 +142,14 @@ def try_monitor_clock_tick(ts: datetime, partition: int):
|
150 | 142 | # task run, backfill those ticks. This can happen when one partition has
|
151 | 143 | # slowed down too much and is missing a minutes worth of check-ins
|
152 | 144 | if last_ts is not None and slowest_part_ts > last_ts + 60:
|
153 |
| - if options.get("crons.use_clock_pulse_consumer"): |
154 |
| - # We only want to do backfills when we're using the clock tick |
155 |
| - # consumer, otherwise the celery tasks may process out of order |
156 |
| - backfill_tick = datetime.fromtimestamp(last_ts + 60, tz=timezone.utc) |
157 |
| - while backfill_tick < tick: |
158 |
| - extra = {"reference_datetime": str(backfill_tick)} |
159 |
| - logger.info("monitors.consumer.clock_tick_backfill", extra=extra) |
160 |
| - |
161 |
| - _dispatch_tick(backfill_tick) |
162 |
| - backfill_tick = backfill_tick + timedelta(minutes=1) |
163 |
| - else: |
164 |
| - with sentry_sdk.push_scope() as scope: |
165 |
| - scope.set_extra("last_ts", last_ts) |
166 |
| - scope.set_extra("slowest_part_ts", slowest_part_ts) |
167 |
| - sentry_sdk.capture_message("Monitor task dispatch minute skipped") |
| 145 | + # We only want to do backfills when we're using the clock tick |
| 146 | + # consumer, otherwise the celery tasks may process out of order |
| 147 | + backfill_tick = datetime.fromtimestamp(last_ts + 60, tz=timezone.utc) |
| 148 | + while backfill_tick < tick: |
| 149 | + extra = {"reference_datetime": str(backfill_tick)} |
| 150 | + logger.info("monitors.consumer.clock_tick_backfill", extra=extra) |
| 151 | + |
| 152 | + _dispatch_tick(backfill_tick) |
| 153 | + backfill_tick = backfill_tick + timedelta(minutes=1) |
168 | 154 |
|
169 | 155 | _dispatch_tick(tick)
|
0 commit comments