Skip to content

Commit 419aae8

Browse files
chore(hybrid-cloud): Adds outbox maximum shard depth metric (#63551)
1 parent 05fae2b commit 419aae8

File tree

3 files changed

+119
-2
lines changed

3 files changed

+119
-2
lines changed

src/sentry/models/outbox.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import sentry_sdk
2626
from django import db
2727
from django.db import OperationalError, connections, models, router, transaction
28-
from django.db.models import Max, Min
28+
from django.db.models import Count, Max, Min
2929
from django.db.transaction import Atomic
3030
from django.dispatch import Signal
3131
from django.http import HttpRequest
@@ -675,6 +675,41 @@ def drain_shard(
675675
if _test_processing_barrier:
676676
_test_processing_barrier.wait()
677677

678+
@classmethod
679+
def get_shard_depths_descending(cls, limit: Optional[int] = 10) -> list[dict[str, int | str]]:
680+
"""
681+
Queries all outbox shards for their total depth, aggregated by their
682+
sharding columns as specified by the outbox class implementation.
683+
684+
:param limit: Limits the query to the top N rows with the greatest shard
685+
depth. If limit is None, the entire set of rows will be returned.
686+
:return: A list of dictionaries, containing shard depths and shard
687+
relevant column values.
688+
"""
689+
if limit is not None:
690+
assert limit > 0, "Limit must be a positive integer if specified"
691+
692+
base_depth_query = (
693+
cls.objects.values(*cls.sharding_columns).annotate(depth=Count("*")).order_by("-depth")
694+
)
695+
696+
if limit is not None:
697+
base_depth_query = base_depth_query[0:limit]
698+
699+
aggregated_shard_information = list()
700+
for shard_row in base_depth_query:
701+
shard_information = {
702+
shard_column: shard_row[shard_column] for shard_column in cls.sharding_columns
703+
}
704+
shard_information["depth"] = shard_row["depth"]
705+
aggregated_shard_information.append(shard_information)
706+
707+
return aggregated_shard_information
708+
709+
@classmethod
710+
def get_total_outbox_count(cls) -> int:
711+
return cls.objects.count()
712+
678713

679714
# Outboxes bound from region silo -> control silo
680715
class RegionOutboxBase(OutboxBase):

src/sentry/tasks/deliver_from_outbox.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,11 @@ def schedule_batch(
7777
scheduled_count += hi - lo + 1
7878
batch_size = math.ceil((hi - lo + 1) / concurrency)
7979

80+
metrics_tags = dict(silo_mode=silo_mode.name, outbox_name=outbox_name)
8081
metrics.gauge(
8182
"deliver_from_outbox.queued_batch_size",
8283
value=batch_size,
83-
tags=dict(silo_mode=silo_mode.name),
84+
tags=metrics_tags,
8485
sample_rate=1.0,
8586
)
8687

@@ -92,8 +93,28 @@ def schedule_batch(
9293
outbox_identifier_low=lo + i * batch_size,
9394
outbox_identifier_hi=lo + (i + 1) * batch_size,
9495
)
96+
97+
deepest_shard_information = outbox_model.get_shard_depths_descending(limit=1)
98+
max_shard_depth = (
99+
deepest_shard_information[0]["depth"] if deepest_shard_information else 0
100+
)
101+
metrics.gauge(
102+
"deliver_from_outbox.maximum_shard_depth",
103+
value=max_shard_depth,
104+
tags=metrics_tags,
105+
sample_rate=1.0,
106+
)
107+
108+
outbox_count = outbox_model.get_total_outbox_count()
109+
metrics.gauge(
110+
"deliver_from_outbox.total_outbox_count",
111+
value=outbox_count,
112+
tags=metrics_tags,
113+
sample_rate=1.0,
114+
)
95115
if process_outbox_backfills:
96116
backfill_outboxes_for(silo_mode, scheduled_count)
117+
97118
except Exception:
98119
sentry_sdk.capture_exception()
99120
raise

tests/sentry/models/test_outbox.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,3 +649,64 @@ def test_bulk_operations(self):
649649

650650
assert RegionOutbox.objects.count() == 0
651651
assert OrganizationMemberTeamReplica.objects.count() == 1
652+
653+
654+
class OutboxAggregationTest(TestCase):
655+
def setUp(self):
656+
shard_counts = {1: (4, "eu"), 2: (7, "us"), 3: (1, "us")}
657+
with outbox_runner():
658+
pass
659+
660+
for shard_id, (shard_count, region_name) in shard_counts.items():
661+
for i in range(shard_count):
662+
ControlOutbox(
663+
region_name=region_name,
664+
shard_scope=OutboxScope.WEBHOOK_SCOPE,
665+
shard_identifier=shard_id,
666+
category=OutboxCategory.WEBHOOK_PROXY,
667+
object_identifier=shard_id * 10000 + i,
668+
payload='{"foo": "bar"}',
669+
).save()
670+
671+
def test_calculate_sharding_depths(self):
672+
shard_depths = ControlOutbox.get_shard_depths_descending()
673+
674+
assert shard_depths == [
675+
dict(
676+
shard_identifier=2,
677+
region_name="us",
678+
shard_scope=OutboxScope.WEBHOOK_SCOPE.value,
679+
depth=7,
680+
),
681+
dict(
682+
shard_identifier=1,
683+
region_name="eu",
684+
shard_scope=OutboxScope.WEBHOOK_SCOPE.value,
685+
depth=4,
686+
),
687+
dict(
688+
shard_identifier=3,
689+
region_name="us",
690+
shard_scope=OutboxScope.WEBHOOK_SCOPE.value,
691+
depth=1,
692+
),
693+
]
694+
695+
# Test limiting the query to a single entry
696+
shard_depths = ControlOutbox.get_shard_depths_descending(limit=1)
697+
assert shard_depths == [
698+
dict(
699+
shard_identifier=2,
700+
region_name="us",
701+
shard_scope=OutboxScope.WEBHOOK_SCOPE.value,
702+
depth=7,
703+
)
704+
]
705+
706+
def test_calculate_sharding_depths_empty(self):
707+
ControlOutbox.objects.all().delete()
708+
assert ControlOutbox.objects.count() == 0
709+
assert ControlOutbox.get_shard_depths_descending() == []
710+
711+
def test_total_count(self):
712+
assert ControlOutbox.get_total_outbox_count() == 7 + 4 + 1

0 commit comments

Comments
 (0)