Skip to content

Commit 7d68639

Browse files
ref(uptime): Extract a few more functions from handle_result (#90294)
Just attempting to make it easier to read through the handle_result function without having to read through the details of every step.
1 parent 4d5e737 commit 7d68639

File tree

1 file changed

+136
-100
lines changed

1 file changed

+136
-100
lines changed

src/sentry/uptime/consumers/results_consumer.py

Lines changed: 136 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -248,29 +248,80 @@ def produce_snuba_uptime_result(
248248
)
249249

250250

251+
def is_shadow_region_result(result: CheckResult, regions: list[UptimeSubscriptionRegion]) -> bool:
252+
shadow_region_slugs = {
253+
region.region_slug
254+
for region in regions
255+
if region.mode == UptimeSubscriptionRegion.RegionMode.SHADOW
256+
}
257+
return result["region"] in shadow_region_slugs
258+
259+
260+
def record_check_completion_metrics(result: CheckResult, metric_tags: dict[str, str]) -> None:
261+
"""
262+
Records the amount of time it took for a check result to get from the
263+
checker to this consumer and be processed
264+
"""
265+
actual_check_time = result["actual_check_time_ms"]
266+
duration = result["duration_ms"] if result["duration_ms"] else 0
267+
completion_time = (datetime.now().timestamp() * 1000) - (actual_check_time + duration)
268+
269+
metrics.distribution(
270+
"uptime.result_processor.check_completion_time",
271+
completion_time,
272+
sample_rate=1.0,
273+
unit="millisecond",
274+
tags=metric_tags,
275+
)
276+
277+
278+
def record_check_metrics(
279+
result: CheckResult,
280+
project_subscription: ProjectUptimeSubscription,
281+
metric_tags: dict[str, str],
282+
) -> None:
283+
"""
284+
Records
285+
"""
286+
if result["status"] == CHECKSTATUS_MISSED_WINDOW:
287+
logger.info(
288+
"handle_result_for_project.missed",
289+
extra={"project_id": project_subscription.project_id, **result},
290+
)
291+
# Do not log other metrics for missed_window results, this was a
292+
# synthetic result
293+
return
294+
295+
if result["duration_ms"]:
296+
metrics.distribution(
297+
"uptime.result_processor.check_result.duration",
298+
result["duration_ms"],
299+
sample_rate=1.0,
300+
unit="millisecond",
301+
tags=metric_tags,
302+
)
303+
metrics.distribution(
304+
"uptime.result_processor.check_result.delay",
305+
result["actual_check_time_ms"] - result["scheduled_check_time_ms"],
306+
sample_rate=1.0,
307+
unit="millisecond",
308+
tags=metric_tags,
309+
)
310+
311+
251312
class UptimeResultProcessor(ResultProcessor[CheckResult, UptimeSubscription]):
252313
subscription_model = UptimeSubscription
253314

254315
def get_subscription_id(self, result: CheckResult) -> str:
255316
return result["subscription_id"]
256317

257-
def is_shadow_region_result(
258-
self, result: CheckResult, regions: list[UptimeSubscriptionRegion]
259-
) -> bool:
260-
shadow_region_slugs = {
261-
region.region_slug
262-
for region in regions
263-
if region.mode == UptimeSubscriptionRegion.RegionMode.SHADOW
264-
}
265-
return result["region"] in shadow_region_slugs
266-
267318
def handle_result(self, subscription: UptimeSubscription | None, result: CheckResult):
268319
if random.random() < 0.01:
269320
logger.info("process_result", extra=result)
270321

322+
# If there's no subscription in the database, this subscription has
323+
# been orphaned. Remove from the checker
271324
if subscription is None:
272-
# If no subscription in the Postgres, this subscription has been orphaned. Remove
273-
# from the checker
274325
send_uptime_config_deletion(result["region"], result["subscription_id"])
275326
metrics.incr(
276327
"uptime.result_processor.subscription_not_found",
@@ -286,9 +337,12 @@ def handle_result(self, subscription: UptimeSubscription | None, result: CheckRe
286337
}
287338
subscription_regions = load_regions_for_uptime_subscription(subscription.id)
288339

289-
if self.is_shadow_region_result(result, subscription_regions):
340+
# Discard shadow mode region results
341+
if is_shadow_region_result(result, subscription_regions):
290342
metrics.incr(
291-
"uptime.result_processor.dropped_shadow_result", sample_rate=1.0, tags=metric_tags
343+
"uptime.result_processor.dropped_shadow_result",
344+
sample_rate=1.0,
345+
tags=metric_tags,
292346
)
293347
return
294348

@@ -301,22 +355,19 @@ def handle_result(self, subscription: UptimeSubscription | None, result: CheckRe
301355
remove_uptime_subscription_if_unused(subscription)
302356
return
303357

304-
cluster = _get_cluster()
305-
last_update_raw: str | None = cluster.get(build_last_update_key(project_subscription))
306-
last_update_ms = 0 if last_update_raw is None else int(last_update_raw)
358+
organization = project_subscription.project.organization
307359

308-
if features.has(
309-
"organizations:uptime-detailed-logging", project_subscription.project.organization
310-
):
360+
# Detailed logging for specific organizations, useful for if we need to
361+
# debug a specific organizations checks.
362+
if features.has("organizations:uptime-detailed-logging", organization):
311363
logger.info("handle_result_for_project.before_dedupe", extra=result)
312364

313-
# Nothing to do if this subscription is disabled. Should mean there are
314-
# other ProjectUptimeSubscription's that are not disabled that will use
315-
# this result.
365+
# Nothing to do if this subscription is disabled.
316366
if project_subscription.status == ObjectStatus.DISABLED:
317367
return
318368

319-
if not features.has("organizations:uptime", project_subscription.project.organization):
369+
# Nothing to do if the feature isn't enabled
370+
if not features.has("organizations:uptime", organization):
320371
metrics.incr("uptime.result_processor.dropped_no_feature")
321372
return
322373

@@ -331,93 +382,78 @@ def handle_result(self, subscription: UptimeSubscription | None, result: CheckRe
331382
tags={"mode": mode_name, "status_reason": status_reason, **metric_tags},
332383
sample_rate=1.0,
333384
)
334-
try:
335-
if result["scheduled_check_time_ms"] <= last_update_ms:
336-
# If the scheduled check time is older than the most recent update then we've already processed it.
337-
# We can end up with duplicates due to Kafka replaying tuples, or due to the uptime checker processing
338-
# the same check multiple times and sending duplicate results.
339-
# We only ever want to process the first value related to each check, so we just skip and log here
340-
metrics.incr(
341-
"uptime.result_processor.skipping_already_processed_update",
342-
tags={"mode": mode_name, **metric_tags},
343-
sample_rate=1.0,
344-
)
345-
return
346385

347-
if features.has(
348-
"organizations:uptime-detailed-logging", project_subscription.project.organization
349-
):
350-
logger.info("handle_result_for_project.after_dedupe", extra=result)
386+
cluster = _get_cluster()
387+
last_update_key = build_last_update_key(project_subscription)
388+
last_update_raw: str | None = cluster.get(last_update_key)
389+
last_update_ms = 0 if last_update_raw is None else int(last_update_raw)
351390

352-
if result["status"] == CHECKSTATUS_MISSED_WINDOW:
353-
logger.info(
354-
"handle_result_for_project.missed",
355-
extra={"project_id": project_subscription.project_id, **result},
356-
)
357-
else:
358-
# We log the result stats here after the duplicate check so that we know the "true" duration and delay
359-
# of each check. Since during deploys we might have checks run from both the old/new checker
360-
# deployments, there will be overlap of when things run. The new deployment will have artificially
361-
# inflated delay stats, since it may duplicate checks that already ran on time on the old deployment,
362-
# but will have run them later.
363-
# Since we process all results for a given uptime monitor in order, we can guarantee that we get the
364-
# earliest delay stat for each scheduled check for the monitor here, and so this stat will be a more
365-
# accurate measurement of delay/duration.
366-
if result["duration_ms"]:
367-
metrics.distribution(
368-
"uptime.result_processor.check_result.duration",
369-
result["duration_ms"],
370-
sample_rate=1.0,
371-
unit="millisecond",
372-
tags={"mode": mode_name, **metric_tags},
373-
)
374-
metrics.distribution(
375-
"uptime.result_processor.check_result.delay",
376-
result["actual_check_time_ms"] - result["scheduled_check_time_ms"],
377-
sample_rate=1.0,
378-
unit="millisecond",
379-
tags={"mode": mode_name, **metric_tags},
380-
)
391+
# Nothing to do if we've already processed this result at an earlier time
392+
if result["scheduled_check_time_ms"] <= last_update_ms:
393+
# If the scheduled check time is older than the most recent update then we've already processed it.
394+
# We can end up with duplicates due to Kafka replaying tuples, or due to the uptime checker processing
395+
# the same check multiple times and sending duplicate results.
396+
# We only ever want to process the first value related to each check, so we just skip and log here
397+
metrics.incr(
398+
"uptime.result_processor.skipping_already_processed_update",
399+
tags={"mode": mode_name, **metric_tags},
400+
sample_rate=1.0,
401+
)
402+
return
381403

382-
if project_subscription.mode == ProjectUptimeSubscriptionMode.AUTO_DETECTED_ONBOARDING:
383-
self.handle_result_for_project_auto_onboarding_mode(
384-
project_subscription, result, metric_tags.copy()
385-
)
386-
elif project_subscription.mode in (
387-
ProjectUptimeSubscriptionMode.AUTO_DETECTED_ACTIVE,
388-
ProjectUptimeSubscriptionMode.MANUAL,
389-
):
390-
self.handle_result_for_project_active_mode(
391-
project_subscription, result, metric_tags.copy()
392-
)
404+
if features.has("organizations:uptime-detailed-logging", organization):
405+
logger.info("handle_result_for_project.after_dedupe", extra=result)
406+
407+
# We log the result stats here after the duplicate check so that we
408+
# know the "true" duration and delay of each check. Since during
409+
# deploys we might have checks run from both the old/new checker
410+
# deployments, there will be overlap of when things run. The new
411+
# deployment will have artificially inflated delay stats, since it may
412+
# duplicate checks that already ran on time on the old deployment, but
413+
# will have run them later.
414+
#
415+
# Since we process all results for a given uptime monitor in order, we
416+
# can guarantee that we get the earliest delay stat for each scheduled
417+
# check for the monitor here, and so this stat will be a more accurate
418+
# measurement of delay/duration.
419+
record_check_metrics(result, project_subscription, {"mode": mode_name, **metric_tags})
420+
421+
Mode = ProjectUptimeSubscriptionMode
422+
try:
423+
match project_subscription.mode:
424+
case Mode.AUTO_DETECTED_ONBOARDING:
425+
self.handle_result_for_project_auto_onboarding_mode(
426+
project_subscription,
427+
result,
428+
metric_tags.copy(),
429+
)
430+
case Mode.AUTO_DETECTED_ACTIVE | Mode.MANUAL:
431+
self.handle_result_for_project_active_mode(
432+
project_subscription,
433+
result,
434+
metric_tags.copy(),
435+
)
436+
case _:
437+
logger.error(
438+
"Unknown subscription mode",
439+
extra={"mode": project_subscription.mode},
440+
)
393441
except Exception:
394442
logger.exception("Failed to process result for uptime project subscription")
395443

396-
# Now that we've processed the result for this project subscription we track the last update date
397-
cluster = _get_cluster()
444+
# Snuba production _must_ happen after handling the result, since we
445+
# may mutate the project_subscription when we determine we're in an incident
446+
if options.get("uptime.snuba_uptime_results.enabled"):
447+
produce_snuba_uptime_result(project_subscription, result, metric_tags.copy())
448+
449+
# Track the last update date to allow deduplication
398450
cluster.set(
399-
build_last_update_key(project_subscription),
451+
last_update_key,
400452
int(result["scheduled_check_time_ms"]),
401453
ex=LAST_UPDATE_REDIS_TTL,
402454
)
403455

404-
# After processing the result and updating Redis, produce message to Kafka
405-
if options.get("uptime.snuba_uptime_results.enabled"):
406-
produce_snuba_uptime_result(project_subscription, result, metric_tags.copy())
407-
408-
# The amount of time it took for a check result to get from the checker to this consumer and be processed
409-
metrics.distribution(
410-
"uptime.result_processor.check_completion_time",
411-
(datetime.now().timestamp() * 1000)
412-
- (
413-
result["actual_check_time_ms"] + result["duration_ms"]
414-
if result["duration_ms"]
415-
else 0
416-
),
417-
sample_rate=1.0,
418-
unit="millisecond",
419-
tags=metric_tags,
420-
)
456+
record_check_completion_metrics(result, metric_tags)
421457

422458
def handle_result_for_project_auto_onboarding_mode(
423459
self,

0 commit comments

Comments
 (0)