Skip to content

Commit 05a7a48

Browse files
ref(uptime): Move helper methods out of the UptimeResultProcessor (#90188)
1 parent 013129c commit 05a7a48

File tree

1 file changed

+141
-143
lines changed

1 file changed

+141
-143
lines changed

src/sentry/uptime/consumers/results_consumer.py

Lines changed: 141 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
5252

5353
logger = logging.getLogger(__name__)
54+
5455
LAST_UPDATE_REDIS_TTL = timedelta(days=7)
5556
ONBOARDING_MONITOR_PERIOD = timedelta(days=3)
5657
# When onboarding a new subscription how many total failures are allowed to happen during
@@ -108,66 +109,148 @@ def get_active_recovery_threshold():
108109
return options.get("uptime.active-recovery-threshold")
109110

110111

111-
class UptimeResultProcessor(ResultProcessor[CheckResult, UptimeSubscription]):
112-
subscription_model = UptimeSubscription
113-
114-
def get_subscription_id(self, result: CheckResult) -> str:
115-
return result["subscription_id"]
112+
def get_host_provider_if_valid(subscription: UptimeSubscription) -> str:
113+
if subscription.host_provider_name in get_top_hosting_provider_names(
114+
TOTAL_PROVIDERS_TO_INCLUDE_AS_TAGS
115+
):
116+
return subscription.host_provider_name
117+
return "other"
116118

117-
def should_run_region_checks(
118-
self, subscription: UptimeSubscription, result: CheckResult
119-
) -> bool:
120-
if not subscription.subscription_id:
121-
# Edge case where we can have no subscription_id here
122-
return False
123-
124-
# XXX: Randomly check for updates once an hour - this is a hack to fix a bug where we're seeing some checks
125-
# not update correctly.
126-
chance_to_run = subscription.interval_seconds / timedelta(hours=1).total_seconds()
127-
if random.random() < chance_to_run:
128-
return True
129-
130-
# Run region checks and updates once an hour
131-
runs_per_hour = UptimeSubscription.IntervalSeconds.ONE_HOUR / subscription.interval_seconds
132-
subscription_run = UUID(subscription.subscription_id).int % runs_per_hour
133-
current_run = (
134-
datetime.fromtimestamp(result["scheduled_check_time_ms"] / 1000, timezone.utc).minute
135-
* 60
136-
) // subscription.interval_seconds
137-
if subscription_run == current_run:
138-
return True
139119

120+
def should_run_region_checks(subscription: UptimeSubscription, result: CheckResult) -> bool:
121+
if not subscription.subscription_id:
122+
# Edge case where we can have no subscription_id here
140123
return False
141124

142-
def check_and_update_regions(
143-
self,
144-
subscription: UptimeSubscription,
145-
result: CheckResult,
146-
regions: list[UptimeSubscriptionRegion],
147-
):
148-
"""
149-
This method will check if regions have been added or removed from our region configuration,
150-
and updates regions associated with this uptime monitor to reflect the new state. This is
151-
done probabilistically, so that the check is performed roughly once an hour for each uptime
152-
monitor.
153-
"""
154-
if not self.should_run_region_checks(subscription, result):
155-
return
125+
# XXX: Randomly check for updates once an hour - this is a hack to fix a bug where we're seeing some checks
126+
# not update correctly.
127+
chance_to_run = subscription.interval_seconds / timedelta(hours=1).total_seconds()
128+
if random.random() < chance_to_run:
129+
return True
130+
131+
# Run region checks and updates once an hour
132+
runs_per_hour = UptimeSubscription.IntervalSeconds.ONE_HOUR / subscription.interval_seconds
133+
subscription_run = UUID(subscription.subscription_id).int % runs_per_hour
134+
current_run = (
135+
datetime.fromtimestamp(result["scheduled_check_time_ms"] / 1000, timezone.utc).minute * 60
136+
) // subscription.interval_seconds
137+
if subscription_run == current_run:
138+
return True
139+
140+
return False
141+
142+
143+
def has_reached_status_threshold(
144+
project_subscription: ProjectUptimeSubscription,
145+
status: str,
146+
metric_tags: dict[str, str],
147+
) -> bool:
148+
pipeline = _get_cluster().pipeline()
149+
key = build_active_consecutive_status_key(project_subscription, status)
150+
pipeline.incr(key)
151+
pipeline.expire(key, ACTIVE_THRESHOLD_REDIS_TTL)
152+
status_count = int(pipeline.execute()[0])
153+
result = (status == CHECKSTATUS_FAILURE and status_count >= get_active_failure_threshold()) or (
154+
status == CHECKSTATUS_SUCCESS and status_count >= get_active_recovery_threshold()
155+
)
156+
if not result:
157+
metrics.incr(
158+
"uptime.result_processor.active.under_threshold",
159+
sample_rate=1.0,
160+
tags=metric_tags,
161+
)
162+
return result
163+
164+
165+
def try_check_and_update_regions(
166+
subscription: UptimeSubscription,
167+
result: CheckResult,
168+
regions: list[UptimeSubscriptionRegion],
169+
):
170+
"""
171+
This method will check if regions have been added or removed from our region configuration,
172+
and updates regions associated with this uptime monitor to reflect the new state. This is
173+
done probabilistically, so that the check is performed roughly once an hour for each uptime
174+
monitor.
175+
"""
176+
if not should_run_region_checks(subscription, result):
177+
return
178+
179+
if not check_and_update_regions(subscription, regions):
180+
return
181+
182+
# Regardless of whether we added or removed regions, we need to send an updated config to all active
183+
# regions for this subscription so that they all get an update set of currently active regions.
184+
subscription.update(status=UptimeSubscription.Status.UPDATING.value)
185+
update_remote_uptime_subscription.delay(subscription.id)
186+
187+
188+
def produce_snuba_uptime_result(
189+
project_subscription: ProjectUptimeSubscription,
190+
result: CheckResult,
191+
metric_tags: dict[str, str],
192+
) -> None:
193+
"""
194+
Produces a message to Snuba's Kafka topic for uptime check results.
195+
196+
Args:
197+
project_subscription: The project subscription associated with the result
198+
result: The check result to be sent to Snuba
199+
"""
200+
try:
201+
project = project_subscription.project
202+
retention_days = quotas.backend.get_event_retention(organization=project.organization) or 90
203+
204+
if project_subscription.uptime_status == UptimeStatus.FAILED:
205+
incident_status = IncidentStatus.IN_INCIDENT
206+
else:
207+
incident_status = IncidentStatus.NO_INCIDENT
208+
209+
snuba_message: SnubaUptimeResult = {
210+
# Copy over fields from original result
211+
"guid": result["guid"],
212+
"subscription_id": result["subscription_id"],
213+
"status": result["status"],
214+
"status_reason": result["status_reason"],
215+
"trace_id": result["trace_id"],
216+
"span_id": result["span_id"],
217+
"scheduled_check_time_ms": result["scheduled_check_time_ms"],
218+
"actual_check_time_ms": result["actual_check_time_ms"],
219+
"duration_ms": result["duration_ms"],
220+
"request_info": result["request_info"],
221+
# Add required Snuba-specific fields
222+
"organization_id": project.organization_id,
223+
"project_id": project.id,
224+
"retention_days": retention_days,
225+
"incident_status": incident_status.value,
226+
"region": result["region"],
227+
}
156228

157-
if not check_and_update_regions(subscription, regions):
158-
return
229+
topic = get_topic_definition(Topic.SNUBA_UPTIME_RESULTS)["real_topic_name"]
230+
payload = KafkaPayload(None, SNUBA_UPTIME_RESULTS_CODEC.encode(snuba_message), [])
159231

160-
# Regardless of whether we added or removed regions, we need to send an updated config to all active
161-
# regions for this subscription so that they all get an update set of currently active regions.
162-
subscription.update(status=UptimeSubscription.Status.UPDATING.value)
163-
update_remote_uptime_subscription.delay(subscription.id)
232+
_snuba_uptime_checks_producer.produce(ArroyoTopic(topic), payload)
164233

165-
def get_host_provider_if_valid(self, subscription: UptimeSubscription) -> str:
166-
if subscription.host_provider_name in get_top_hosting_provider_names(
167-
TOTAL_PROVIDERS_TO_INCLUDE_AS_TAGS
168-
):
169-
return subscription.host_provider_name
170-
return "other"
234+
metrics.incr(
235+
"uptime.result_processor.snuba_message_produced",
236+
sample_rate=1.0,
237+
tags=metric_tags,
238+
)
239+
240+
except Exception:
241+
logger.exception("Failed to produce Snuba message for uptime result")
242+
metrics.incr(
243+
"uptime.result_processor.snuba_message_failed",
244+
sample_rate=1.0,
245+
tags=metric_tags,
246+
)
247+
248+
249+
class UptimeResultProcessor(ResultProcessor[CheckResult, UptimeSubscription]):
250+
subscription_model = UptimeSubscription
251+
252+
def get_subscription_id(self, result: CheckResult) -> str:
253+
return result["subscription_id"]
171254

172255
def is_shadow_region_result(
173256
self, result: CheckResult, regions: list[UptimeSubscriptionRegion]
@@ -195,7 +278,7 @@ def handle_result(self, subscription: UptimeSubscription | None, result: CheckRe
195278
return
196279

197280
metric_tags = {
198-
"host_provider": self.get_host_provider_if_valid(subscription),
281+
"host_provider": get_host_provider_if_valid(subscription),
199282
"status": result["status"],
200283
"uptime_region": result["region"],
201284
}
@@ -207,7 +290,7 @@ def handle_result(self, subscription: UptimeSubscription | None, result: CheckRe
207290
)
208291
return
209292

210-
self.check_and_update_regions(subscription, result, subscription_regions)
293+
try_check_and_update_regions(subscription, result, subscription_regions)
211294

212295
project_subscriptions = get_project_subscriptions_for_uptime_subscription(subscription.id)
213296

@@ -330,7 +413,7 @@ def handle_result_for_project(
330413

331414
# After processing the result and updating Redis, produce message to Kafka
332415
if options.get("uptime.snuba_uptime_results.enabled"):
333-
self._produce_snuba_uptime_result(project_subscription, result, metric_tags.copy())
416+
produce_snuba_uptime_result(project_subscription, result, metric_tags.copy())
334417

335418
# The amount of time it took for a check result to get from the checker to this consumer and be processed
336419
metrics.distribution(
@@ -427,7 +510,7 @@ def handle_result_for_project_active_mode(
427510
project_subscription.uptime_status == UptimeStatus.OK
428511
and result["status"] == CHECKSTATUS_FAILURE
429512
):
430-
if not self.has_reached_status_threshold(
513+
if not has_reached_status_threshold(
431514
project_subscription, result["status"], metric_tags
432515
):
433516
return
@@ -481,7 +564,7 @@ def handle_result_for_project_active_mode(
481564
project_subscription.uptime_status == UptimeStatus.FAILED
482565
and result["status"] == CHECKSTATUS_SUCCESS
483566
):
484-
if not self.has_reached_status_threshold(
567+
if not has_reached_status_threshold(
485568
project_subscription, result["status"], metric_tags
486569
):
487570
return
@@ -512,91 +595,6 @@ def handle_result_for_project_active_mode(
512595
uptime_status=UptimeStatus.OK, uptime_status_update_date=django_timezone.now()
513596
)
514597

515-
def has_reached_status_threshold(
516-
self,
517-
project_subscription: ProjectUptimeSubscription,
518-
status: str,
519-
metric_tags: dict[str, str],
520-
) -> bool:
521-
pipeline = _get_cluster().pipeline()
522-
key = build_active_consecutive_status_key(project_subscription, status)
523-
pipeline.incr(key)
524-
pipeline.expire(key, ACTIVE_THRESHOLD_REDIS_TTL)
525-
status_count = int(pipeline.execute()[0])
526-
result = (
527-
status == CHECKSTATUS_FAILURE and status_count >= get_active_failure_threshold()
528-
) or (status == CHECKSTATUS_SUCCESS and status_count >= get_active_recovery_threshold())
529-
if not result:
530-
metrics.incr(
531-
"uptime.result_processor.active.under_threshold",
532-
sample_rate=1.0,
533-
tags=metric_tags,
534-
)
535-
return result
536-
537-
def _produce_snuba_uptime_result(
538-
self,
539-
project_subscription: ProjectUptimeSubscription,
540-
result: CheckResult,
541-
metric_tags: dict[str, str],
542-
) -> None:
543-
"""
544-
Produces a message to Snuba's Kafka topic for uptime check results.
545-
546-
Args:
547-
project_subscription: The project subscription associated with the result
548-
result: The check result to be sent to Snuba
549-
"""
550-
try:
551-
project = project_subscription.project
552-
retention_days = (
553-
quotas.backend.get_event_retention(organization=project.organization) or 90
554-
)
555-
556-
if project_subscription.uptime_status == UptimeStatus.FAILED:
557-
incident_status = IncidentStatus.IN_INCIDENT
558-
else:
559-
incident_status = IncidentStatus.NO_INCIDENT
560-
561-
snuba_message: SnubaUptimeResult = {
562-
# Copy over fields from original result
563-
"guid": result["guid"],
564-
"subscription_id": result["subscription_id"],
565-
"status": result["status"],
566-
"status_reason": result["status_reason"],
567-
"trace_id": result["trace_id"],
568-
"span_id": result["span_id"],
569-
"scheduled_check_time_ms": result["scheduled_check_time_ms"],
570-
"actual_check_time_ms": result["actual_check_time_ms"],
571-
"duration_ms": result["duration_ms"],
572-
"request_info": result["request_info"],
573-
# Add required Snuba-specific fields
574-
"organization_id": project.organization_id,
575-
"project_id": project.id,
576-
"retention_days": retention_days,
577-
"incident_status": incident_status.value,
578-
"region": result["region"],
579-
}
580-
581-
topic = get_topic_definition(Topic.SNUBA_UPTIME_RESULTS)["real_topic_name"]
582-
payload = KafkaPayload(None, SNUBA_UPTIME_RESULTS_CODEC.encode(snuba_message), [])
583-
584-
_snuba_uptime_checks_producer.produce(ArroyoTopic(topic), payload)
585-
586-
metrics.incr(
587-
"uptime.result_processor.snuba_message_produced",
588-
sample_rate=1.0,
589-
tags=metric_tags,
590-
)
591-
592-
except Exception:
593-
logger.exception("Failed to produce Snuba message for uptime result")
594-
metrics.incr(
595-
"uptime.result_processor.snuba_message_failed",
596-
sample_rate=1.0,
597-
tags=metric_tags,
598-
)
599-
600598

601599
class UptimeResultsStrategyFactory(ResultsStrategyFactory[CheckResult, UptimeSubscription]):
602600
result_processor_cls = UptimeResultProcessor

0 commit comments

Comments
 (0)