diff --git a/src/sentry/hybridcloud/models/outbox.py b/src/sentry/hybridcloud/models/outbox.py index 7555a871014333..55d481d66a8b1e 100644 --- a/src/sentry/hybridcloud/models/outbox.py +++ b/src/sentry/hybridcloud/models/outbox.py @@ -9,7 +9,7 @@ import sentry_sdk from django import db -from django.db import OperationalError, connections, models, router, transaction +from django.db import DatabaseError, OperationalError, connections, models, router, transaction from django.db.models import Count, Max, Min from django.db.models.functions import Now from django.db.transaction import Atomic @@ -48,6 +48,10 @@ def __init__(self, message: str, outbox: OutboxBase) -> None: self.outbox = outbox +class OutboxDatabaseError(DatabaseError): + pass + + class InvalidOutboxError(Exception): pass @@ -325,32 +329,38 @@ def drain_shard( in_test_assert_no_transaction( "drain_shard should only be called outside of any active transaction!" ) - # When we are flushing in a local context, we don't care about outboxes created concurrently -- - # at best our logic depends on previously created outboxes. - latest_shard_row: OutboxBase | None = None - if not flush_all: - latest_shard_row = self.selected_messages_in_shard().last() - # If we're not flushing all possible shards, and we don't see any immediate values, - # drop. - if latest_shard_row is None: - return - - shard_row: OutboxBase | None - while True: - with self.process_shard(latest_shard_row) as shard_row: - if shard_row is None: - break - if _test_processing_barrier: - _test_processing_barrier.wait() + try: + # When we are flushing in a local context, we don't care about outboxes created concurrently -- + # at best our logic depends on previously created outboxes. + latest_shard_row: OutboxBase | None = None + if not flush_all: + latest_shard_row = self.selected_messages_in_shard().last() + # If we're not flushing all possible shards, and we don't see any immediate values, + # drop. + if latest_shard_row is None: + return + + shard_row: OutboxBase | None + while True: + with self.process_shard(latest_shard_row) as shard_row: + if shard_row is None: + break - processed = shard_row.process(is_synchronous_flush=not flush_all) + if _test_processing_barrier: + _test_processing_barrier.wait() - if _test_processing_barrier: - _test_processing_barrier.wait() + processed = shard_row.process(is_synchronous_flush=not flush_all) - if not processed: - break + if _test_processing_barrier: + _test_processing_barrier.wait() + + if not processed: + break + except DatabaseError as e: + raise OutboxDatabaseError( + f"Failed to process Outbox, {OutboxCategory(self.category).name} due to database error", + ) from e @classmethod def get_shard_depths_descending(cls, limit: int | None = 10) -> list[dict[str, int | str]]: diff --git a/src/sentry/sentry_apps/token_exchange/refresher.py b/src/sentry/sentry_apps/token_exchange/refresher.py index 3c9d5e2a457735..287b50d0530b23 100644 --- a/src/sentry/sentry_apps/token_exchange/refresher.py +++ b/src/sentry/sentry_apps/token_exchange/refresher.py @@ -5,6 +5,7 @@ from django.utils.functional import cached_property from sentry import analytics +from sentry.hybridcloud.models.outbox import OutboxDatabaseError from sentry.models.apiapplication import ApiApplication from sentry.models.apitoken import ApiToken from sentry.sentry_apps.models.sentry_app import SentryApp @@ -30,22 +31,35 @@ class Refresher: user: User def run(self) -> ApiToken: - with transaction.atomic(router.db_for_write(ApiToken)): - try: + try: + token = None + with transaction.atomic(router.db_for_write(ApiToken)): self._validate() self.token.delete() self._record_analytics() - return self._create_new_token() - except (SentryAppIntegratorError, SentryAppSentryError): - logger.info( - "refresher.context", - extra={ - "application_id": self.application.id, - "refresh_token": self.refresh_token[-4:], - }, + token = self._create_new_token() + return token + except OutboxDatabaseError as e: + context = { + "installation_uuid": self.install.uuid, + "client_id": self.application.client_id[:SENSITIVE_CHARACTER_LIMIT], + "sentry_app_id": self.install.sentry_app.id, + } + + if token is not None: + logger.warning( + "refresher.outbox-failure", + extra=context, + exc_info=e, ) - raise + return token + + raise SentryAppSentryError( + message="Failed to refresh given token", + status_code=500, + webhook_context=context, + ) from e def _record_analytics(self) -> None: analytics.record( @@ -106,7 +120,7 @@ def application(self) -> ApiApplication: try: return ApiApplication.objects.get(client_id=self.client_id) except ApiApplication.DoesNotExist: - raise SentryAppIntegratorError( + raise SentryAppSentryError( message="Could not find matching Application for given client_id", status_code=401, webhook_context={ diff --git a/tests/sentry/hybridcloud/models/test_outbox.py b/tests/sentry/hybridcloud/models/test_outbox.py index 152100ef7a3179..b26ad5823c5c90 100644 --- a/tests/sentry/hybridcloud/models/test_outbox.py +++ b/tests/sentry/hybridcloud/models/test_outbox.py @@ -6,11 +6,12 @@ from unittest.mock import Mock, call, patch import pytest -from django.db import connections +from django.db import OperationalError, connections from pytest import raises from sentry.hybridcloud.models.outbox import ( ControlOutbox, + OutboxDatabaseError, OutboxFlushError, RegionOutbox, outbox_context, @@ -518,6 +519,26 @@ def test_scheduling_with_past_and_future_outbox_times(self) -> None: # message is in the past. assert RegionOutbox.objects.count() == 0 + @patch("sentry.hybridcloud.models.outbox.OutboxBase.process_coalesced") + def test_catches_random_database_errors(self, mock_process: Mock) -> None: + mock_process.side_effect = OperationalError("ruh roh") + + with pytest.raises(OutboxDatabaseError) as e: + with outbox_context(flush=False): + Organization(id=10).outbox_for_update().save() + assert RegionOutbox.objects.count() == 1 + + with outbox_runner(): + # drain outboxes + pass + + assert ( + str(e.value) + == f"Failed to process Outbox, {Organization(id=10).outbox_for_update().category.name} due to database error" + ) + + assert RegionOutbox.objects.count() == 1 + class TestOutboxesManager(TestCase): def test_bulk_operations(self) -> None: diff --git a/tests/sentry/sentry_apps/token_exchange/test_refresher.py b/tests/sentry/sentry_apps/token_exchange/test_refresher.py index 6a10e18ec05b37..281264d1af3392 100644 --- a/tests/sentry/sentry_apps/token_exchange/test_refresher.py +++ b/tests/sentry/sentry_apps/token_exchange/test_refresher.py @@ -1,6 +1,7 @@ from unittest.mock import PropertyMock, patch import pytest +from django.db.utils import OperationalError from sentry.models.apiapplication import ApiApplication from sentry.models.apitoken import ApiToken @@ -92,9 +93,10 @@ def test_token_must_exist(self, _): } assert e.value.public_context == {} + @patch("sentry.sentry_apps.token_exchange.refresher.Refresher._validate") @patch("sentry.models.ApiApplication.objects.get", side_effect=ApiApplication.DoesNotExist) - def test_api_application_must_exist(self, _): - with pytest.raises(SentryAppIntegratorError) as e: + def test_api_application_must_exist(self, _, mock_validate): + with pytest.raises(SentryAppSentryError) as e: self.refresher.run() assert e.value.message == "Could not find matching Application for given client_id" @@ -133,3 +135,12 @@ def test_records_analytics(self, record): sentry_app_installation_id=self.install.id, exchange_type="refresh", ) + + def test_returns_token_on_outbox_error(self): + # Mock the transaction to raise OperationalError after token creation + with patch("sentry.hybridcloud.models.outbox.OutboxBase.process_coalesced") as mock_process: + mock_process.side_effect = OperationalError("Outbox issue") + + # The refresher should return the token even though there was an error + token = self.refresher.run() + assert SentryAppInstallation.objects.get(id=self.install.id).api_token == token