Skip to content

Commit c3da5bd

Browse files
authored
feat(metric_alerts): Make metric alerts work in local dev. (#21801)
This makes the subscription consumer run if `SENTRY_DEV_PROCESS_SUBSCRIPTIONS` is true. Also starts Kafka and fixes some bugs around auto topic creation that we don't see in prod.
1 parent f027182 commit c3da5bd

File tree

4 files changed

+83
-39
lines changed

4 files changed

+83
-39
lines changed

src/sentry/conf/server.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1442,6 +1442,10 @@ def create_partitioned_queues(name):
14421442
SENTRY_USE_RELAY = True
14431443
SENTRY_RELAY_PORT = 7899
14441444

1445+
# Controls whether we'll run the snuba subscription processor. If enabled, we'll run
1446+
# it as a worker, and devservices will run Kafka.
1447+
SENTRY_DEV_PROCESS_SUBSCRIPTIONS = False
1448+
14451449
# The chunk size for attachments in blob store. Should be a power of two.
14461450
SENTRY_ATTACHMENT_BLOB_SIZE = 8 * 1024 * 1024 # 8MB
14471451

@@ -1525,7 +1529,9 @@ def create_partitioned_queues(name):
15251529
},
15261530
"volumes": {"kafka": {"bind": "/var/lib/kafka"}},
15271531
"only_if": lambda settings, options: (
1528-
"kafka" in settings.SENTRY_EVENTSTREAM or settings.SENTRY_USE_RELAY
1532+
"kafka" in settings.SENTRY_EVENTSTREAM
1533+
or settings.SENTRY_USE_RELAY
1534+
or settings.SENTRY_DEV_PROCESS_SUBSCRIPTIONS
15291535
),
15301536
},
15311537
"clickhouse": {
@@ -1883,6 +1889,10 @@ def get_sentry_sdk_config():
18831889
KAFKA_OUTCOMES = "outcomes"
18841890
KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results"
18851891
KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results"
1892+
KAFKA_SUBSCRIPTION_RESULT_TOPICS = {
1893+
"events": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
1894+
"transactions": KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
1895+
}
18861896
KAFKA_INGEST_EVENTS = "ingest-events"
18871897
KAFKA_INGEST_ATTACHMENTS = "ingest-attachments"
18881898
KAFKA_INGEST_TRANSACTIONS = "ingest-transactions"

src/sentry/runner/commands/devserver.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,22 @@
2121
"ingest": ["sentry", "run", "ingest-consumer", "--all-consumer-types"],
2222
"server": ["sentry", "run", "web"],
2323
"storybook": ["yarn", "storybook"],
24+
"subscription-consumer": [
25+
"sentry",
26+
"run",
27+
"query-subscription-consumer",
28+
"--commit-batch-size",
29+
"1",
30+
],
2431
}
2532

2633

27-
def _get_daemon(name):
28-
return (name, _DEFAULT_DAEMONS[name])
34+
def _get_daemon(name, *args, **kwargs):
35+
display_name = name
36+
if "suffix" in kwargs:
37+
display_name = u"{}-{}".format(name, kwargs["suffix"])
38+
39+
return (display_name, _DEFAULT_DAEMONS[name] + list(args))
2940

3041

3142
@click.command()
@@ -204,6 +215,15 @@ def devserver(
204215
if eventstream.requires_post_process_forwarder():
205216
daemons += [_get_daemon("post-process-forwarder")]
206217

218+
if settings.SENTRY_DEV_PROCESS_SUBSCRIPTIONS:
219+
if not settings.SENTRY_EVENTSTREAM == "sentry.eventstream.kafka.KafkaEventStream":
220+
raise click.ClickException(
221+
"`SENTRY_DEV_PROCESS_SUBSCRIPTIONS` can only be used when "
222+
"`SENTRY_EVENTSTREAM=sentry.eventstream.kafka.KafkaEventStream`."
223+
)
224+
for name, topic in settings.KAFKA_SUBSCRIPTION_RESULT_TOPICS.items():
225+
daemons += [_get_daemon("subscription-consumer", "--topic", topic, suffix=name)]
226+
207227
if settings.SENTRY_USE_RELAY:
208228
daemons += [_get_daemon("ingest")]
209229

src/sentry/snuba/query_subscription_consumer.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import sentry_sdk
77
import six
88
from confluent_kafka import Consumer, KafkaException, OFFSET_INVALID, TopicPartition
9+
from confluent_kafka.admin import AdminClient
910
from dateutil.parser import parse as parse_date
1011
from django.conf import settings
1112

1213
from sentry.snuba.json_schemas import SUBSCRIPTION_PAYLOAD_VERSIONS, SUBSCRIPTION_WRAPPER_SCHEMA
1314
from sentry.snuba.models import QueryDatasets, QuerySubscription
1415
from sentry.snuba.tasks import _delete_from_snuba
1516
from sentry.utils import metrics, json
17+
from sentry.utils.batching_kafka_consumer import wait_for_topics
1618

1719
logger = logging.getLogger(__name__)
1820

@@ -107,6 +109,17 @@ def on_revoke(consumer, partitions):
107109
},
108110
)
109111

112+
if settings.KAFKA_CONSUMER_AUTO_CREATE_TOPICS:
113+
# This is required for confluent-kafka>=1.5.0, otherwise the topics will
114+
# not be automatically created.
115+
admin_client = AdminClient(
116+
{
117+
"bootstrap.servers": conf["bootstrap.servers"],
118+
"allow.auto.create.topics": "true",
119+
}
120+
)
121+
wait_for_topics(admin_client, [self.topic])
122+
110123
self.consumer = Consumer(conf)
111124
self.consumer.subscribe([self.topic], on_assign=on_assign, on_revoke=on_revoke)
112125

src/sentry/utils/batching_kafka_consumer.py

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,42 @@
2525
DEFAULT_QUEUED_MIN_MESSAGES = 10000
2626

2727

28+
def wait_for_topics(admin_client, topics, timeout=10):
29+
"""
30+
Make sure that the provided topics exist and have non-zero partitions in them.
31+
"""
32+
for topic in topics:
33+
start = time.time()
34+
last_error = None
35+
36+
while True:
37+
if time.time() > start + timeout:
38+
raise RuntimeError(
39+
"Timeout when waiting for Kafka topic '%s' to become available, last error: %s".format(
40+
topic, last_error
41+
)
42+
)
43+
44+
result = admin_client.list_topics(topic=topic)
45+
topic_metadata = result.topics.get(topic)
46+
if topic_metadata and topic_metadata.partitions and not topic_metadata.error:
47+
logger.debug("Topic '%s' is ready", topic)
48+
break
49+
elif topic_metadata.error in {
50+
KafkaError.UNKNOWN_TOPIC_OR_PART,
51+
KafkaError.LEADER_NOT_AVAILABLE,
52+
}:
53+
last_error = topic_metadata.error
54+
logger.warn("Topic '%s' or its partitions are not ready, retrying...", topic)
55+
time.sleep(0.1)
56+
continue
57+
else:
58+
raise RuntimeError(
59+
"Unknown error when waiting for Kafka topic '%s': %s"
60+
% (topic, topic_metadata.error)
61+
)
62+
63+
2864
@six.add_metaclass(abc.ABCMeta)
2965
class AbstractBatchWorker(object):
3066
"""The `BatchingKafkaConsumer` requires an instance of this class to
@@ -172,41 +208,6 @@ def __record_timing(self, metric, value, tags=None):
172208
sample_rate = self.__metrics_sample_rates.get(metric, settings.SENTRY_METRICS_SAMPLE_RATE)
173209
return self.__metrics.timing(metric, value, tags=tags, sample_rate=sample_rate)
174210

175-
def _wait_for_topics(self, admin_client, topics, timeout=10):
176-
"""
177-
Make sure that the provided topics exist and have non-zero partitions in them.
178-
"""
179-
for topic in topics:
180-
start = time.time()
181-
last_error = None
182-
183-
while True:
184-
if time.time() > start + timeout:
185-
raise RuntimeError(
186-
"Timeout when waiting for Kafka topic '%s' to become available, last error: %s".format(
187-
topic, last_error
188-
)
189-
)
190-
191-
result = admin_client.list_topics(topic=topic)
192-
topic_metadata = result.topics.get(topic)
193-
if topic_metadata and topic_metadata.partitions and not topic_metadata.error:
194-
logger.debug("Topic '%s' is ready", topic)
195-
break
196-
elif topic_metadata.error in {
197-
KafkaError.UNKNOWN_TOPIC_OR_PART,
198-
KafkaError.LEADER_NOT_AVAILABLE,
199-
}:
200-
last_error = topic_metadata.error
201-
logger.warn("Topic '%s' or its partitions are not ready, retrying...", topic)
202-
time.sleep(0.1)
203-
continue
204-
else:
205-
raise RuntimeError(
206-
"Unknown error when waiting for Kafka topic '%s': %s"
207-
% (topic, topic_metadata.error)
208-
)
209-
210211
def create_consumer(
211212
self,
212213
topics,
@@ -236,7 +237,7 @@ def create_consumer(
236237
"allow.auto.create.topics": "true",
237238
}
238239
)
239-
self._wait_for_topics(admin_client, topics)
240+
wait_for_topics(admin_client, topics)
240241

241242
consumer = Consumer(consumer_config)
242243

0 commit comments

Comments
 (0)