Skip to content

feat(integrations): add endpoint for rotating client secret of Internal/Public integrations #69015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .organization_sentry_apps import OrganizationSentryAppsEndpoint
from .publish_request import SentryAppPublishRequestEndpoint
from .requests import SentryAppRequestsEndpoint
from .rotate_secret import SentryAppRotateSecretEndpoint
from .stats.details import SentryAppStatsEndpoint
from .stats.index import SentryAppsStatsEndpoint

Expand All @@ -34,6 +35,7 @@
"SentryAppInteractionEndpoint",
"SentryAppPublishRequestEndpoint",
"SentryAppRequestsEndpoint",
"SentryAppRotateSecretEndpoint",
"SentryAppsEndpoint",
"SentryAppsStatsEndpoint",
"SentryAppStatsEndpoint",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging

from django.http import Http404
from rest_framework.request import Request
from rest_framework.response import Response

from sentry.api.api_owners import ApiOwner
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import control_silo_endpoint
from sentry.api.bases.sentryapps import SentryAppBaseEndpoint
from sentry.api.permissions import SentryPermission
from sentry.api.serializers import serialize
from sentry.auth.superuser import superuser_has_permission
from sentry.models.apiapplication import generate_token
from sentry.models.integrations.sentry_app import SentryApp
from sentry.services.hybrid_cloud.organization import organization_service
from sentry.services.hybrid_cloud.user.service import user_service

logger = logging.getLogger(__name__)


class SentryAppRotateSecretPermission(SentryPermission):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cathteng should superusers be allowed to do this for other orgs on their behalf for support cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair. maybe we only let superusers with write privileges do this?

scope_map = {
"POST": ["org:write", "org:admin"],
}

def has_object_permission(self, request: Request, view: object, sentry_app: SentryApp):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to help us understand our decisions and debug in the future: should we log when we make a decision in this function?

i.e.

if org_context is None:
   log.info("org does not exist for request", extra={"app_id": app_id, "user_id": user_id, ...etc})
   return False

Or, maybe the exception we raise has a better error message to help both the user and us (for showing in the UI and debugging when users come to us for support)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logging for now.

log_info = {
"user_id": request.user.id,
"sentry_app_name": sentry_app.name,
"organization_id": sentry_app.owner_id,
}

# organization that owns an integration
org_context = organization_service.get_organization_by_id(
id=sentry_app.owner_id, user_id=request.user.id if request.user else None
)
if org_context is None:
logger.warning("owner organization for a sentry app was not found", extra=log_info)
raise Http404

self.determine_access(request, org_context)

if superuser_has_permission(request):
return True

# if user is not a member of an organization owning an integration,
# return 404 to avoid leaking integration slug
organizations = (
user_service.get_organizations(user_id=request.user.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here regarding the RPC try/catch

if request.user.id is not None
else ()
)
if not any(sentry_app.owner_id == org.id for org in organizations):
logger.info(
"user does not belong to the integration owner organization", extra=log_info
)
raise Http404

# permission check inside an organization
allowed_scopes = set(self.scope_map.get(request.method or "", []))
return any(request.access.has_scope(s) for s in allowed_scopes)


@control_silo_endpoint
class SentryAppRotateSecretEndpoint(SentryAppBaseEndpoint):
publish_status = {
"POST": ApiPublishStatus.PRIVATE,
}
owner = ApiOwner.ENTERPRISE
permission_classes = (SentryAppRotateSecretPermission,)

def post(self, request: Request, sentry_app: SentryApp) -> Response:
if sentry_app.application is None:
return Response({"detail": "Corresponding application was not found."}, status=404)

new_token = generate_token()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdtro I know we're making some changes to this method in another PR, is this still safe/intended to be used in this flow?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ykamo001 Yes, this is okay for now. :) When we come back through to improve the app API tokens we can adjust these calls then.

sentry_app.application.update(client_secret=new_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a try/catch here and respond accordingly? And also log the error? Because this is a DB transaction, there could be errors

return Response(serialize({"clientSecret": new_token}))
6 changes: 6 additions & 0 deletions src/sentry/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
SentryAppInteractionEndpoint,
SentryAppPublishRequestEndpoint,
SentryAppRequestsEndpoint,
SentryAppRotateSecretEndpoint,
SentryAppsEndpoint,
SentryAppsStatsEndpoint,
SentryAppStatsEndpoint,
Expand Down Expand Up @@ -2838,6 +2839,11 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]:
SentryInternalAppTokenDetailsEndpoint.as_view(),
name="sentry-api-0-sentry-internal-app-token-details",
),
re_path(
r"^(?P<sentry_app_slug>[^\/]+)/rotate-secret/$",
SentryAppRotateSecretEndpoint.as_view(),
name="sentry-api-0-sentry-app-rotate-secret",
),
re_path(
r"^(?P<sentry_app_slug>[^\/]+)/stats/$",
SentryAppStatsEndpoint.as_view(),
Expand Down
1 change: 1 addition & 0 deletions static/app/data/controlsiloUrlPatterns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const patterns: RegExp[] = [
new RegExp('^api/0/sentry-apps/[^/]+/avatar/$'),
new RegExp('^api/0/sentry-apps/[^/]+/api-tokens/$'),
new RegExp('^api/0/sentry-apps/[^/]+/api-tokens/[^/]+/$'),
new RegExp('^api/0/sentry-apps/[^/]+/rotate-secret/$'),
new RegExp('^api/0/sentry-apps/[^/]+/stats/$'),
new RegExp('^api/0/sentry-apps/[^/]+/publish-request/$'),
new RegExp('^api/0/sentry-app-installations/[^/]+/$'),
Expand Down
82 changes: 82 additions & 0 deletions tests/sentry/api/endpoints/test_sentry_app_rotate_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from django.urls import reverse

from sentry.models.apiapplication import ApiApplication
from sentry.models.integrations.sentry_app import SentryApp
from sentry.testutils.cases import APITestCase
from sentry.testutils.silo import control_silo_test


@control_silo_test
class SentryAppRotateSecretTest(APITestCase):
def setUp(self):
self.application = ApiApplication.objects.create(owner=self.user)
self.sentry_app = SentryApp.objects.create(
application=self.application, owner_id=self.organization.id, name="a", slug="a"
)
self.url = reverse("sentry-api-0-sentry-app-rotate-secret", args=[self.sentry_app.slug])

def test_unauthenticated_call(self):
response = self.client.post(self.url)
assert response.status_code == 401

def test_member_call(self):
"""
Tests that a low privileged user from the same org cannot rotate a secret.
"""
other_user = self.create_user()
other_member = self.create_member(
user=other_user, organization=self.organization, role="member"
)
self.login_as(other_member)
response = self.client.post(self.url)
assert response.status_code == 403

def test_non_owner_call(self):
"""
Tests that an authenticated user cannot rotate the secret for an app from other org.
"""
self.login_as(self.user)
other_user = self.create_user()
other_org = self.create_organization(owner=other_user)
other_app = ApiApplication.objects.create(owner=other_user, name="b")
other_sentry_app = SentryApp.objects.create(
application=other_app, owner_id=other_org.id, name="b", slug="b"
)
response = self.client.post(
reverse("sentry-api-0-sentry-app-rotate-secret", args=[other_sentry_app.slug])
)
assert response.status_code == 404

def test_invalid_app_id(self):
self.login_as(self.user)
path_with_invalid_id = reverse("sentry-api-0-sentry-app-rotate-secret", args=["invalid"])
response = self.client.post(path_with_invalid_id)
assert response.status_code == 404

def test_valid_call(self):
self.login_as(self.user)
old_secret = self.sentry_app.application.client_secret
response = self.client.post(self.url)
new_secret = response.data["clientSecret"]
assert len(new_secret) == len(old_secret)
assert new_secret != old_secret

def test_superuser_has_access(self):
superuser = self.create_user(is_superuser=True)
self.login_as(user=superuser, superuser=True)
old_secret = self.sentry_app.application.client_secret
response = self.client.post(self.url)
new_secret = response.data["clientSecret"]
assert len(new_secret) == len(old_secret)
assert new_secret != old_secret

def test_no_corresponding_application_found(self):
self.login_as(self.user)
other_sentry_app = SentryApp.objects.create(
application=None, owner_id=self.organization.id, name="c", slug="c"
)
response = self.client.post(
reverse("sentry-api-0-sentry-app-rotate-secret", args=[other_sentry_app.slug])
)
assert response.status_code == 404
assert "Corresponding application was not found." in response.data["detail"]
Loading