-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
feat(integrations): add endpoint for rotating client secret of Internal/Public integrations #69015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
29d60fb
5ec75fb
2e4aa9c
a8c75ed
c429586
6588418
4310cc2
6c64156
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import logging | ||
|
||
from django.http import Http404 | ||
from rest_framework.request import Request | ||
from rest_framework.response import Response | ||
|
||
from sentry.api.api_owners import ApiOwner | ||
from sentry.api.api_publish_status import ApiPublishStatus | ||
from sentry.api.base import control_silo_endpoint | ||
from sentry.api.bases.sentryapps import SentryAppBaseEndpoint | ||
from sentry.api.permissions import SentryPermission | ||
from sentry.api.serializers import serialize | ||
from sentry.auth.superuser import superuser_has_permission | ||
from sentry.models.apiapplication import generate_token | ||
from sentry.models.integrations.sentry_app import SentryApp | ||
from sentry.services.hybrid_cloud.organization import organization_service | ||
from sentry.services.hybrid_cloud.user.service import user_service | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class SentryAppRotateSecretPermission(SentryPermission): | ||
scope_map = { | ||
"POST": ["org:write", "org:admin"], | ||
} | ||
|
||
def has_object_permission(self, request: Request, view: object, sentry_app: SentryApp): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something to help us understand our decisions and debug in the future: should we log when we make a decision in this function? i.e.
Or, maybe the exception we raise has a better error message to help both the user and us (for showing in the UI and debugging when users come to us for support) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added logging for now. |
||
log_info = { | ||
"user_id": request.user.id, | ||
"sentry_app_name": sentry_app.name, | ||
"organization_id": sentry_app.owner_id, | ||
} | ||
|
||
# organization that owns an integration | ||
org_context = organization_service.get_organization_by_id( | ||
id=sentry_app.owner_id, user_id=request.user.id if request.user else None | ||
) | ||
if org_context is None: | ||
logger.warning("owner organization for a sentry app was not found", extra=log_info) | ||
raise Http404 | ||
|
||
self.determine_access(request, org_context) | ||
|
||
if superuser_has_permission(request): | ||
return True | ||
|
||
# if user is not a member of an organization owning an integration, | ||
# return 404 to avoid leaking integration slug | ||
organizations = ( | ||
user_service.get_organizations(user_id=request.user.id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question here regarding the RPC try/catch |
||
if request.user.id is not None | ||
else () | ||
) | ||
if not any(sentry_app.owner_id == org.id for org in organizations): | ||
logger.info( | ||
"user does not belong to the integration owner organization", extra=log_info | ||
) | ||
raise Http404 | ||
|
||
# permission check inside an organization | ||
allowed_scopes = set(self.scope_map.get(request.method or "", [])) | ||
return any(request.access.has_scope(s) for s in allowed_scopes) | ||
|
||
|
||
@control_silo_endpoint | ||
class SentryAppRotateSecretEndpoint(SentryAppBaseEndpoint): | ||
publish_status = { | ||
"POST": ApiPublishStatus.PRIVATE, | ||
} | ||
owner = ApiOwner.ENTERPRISE | ||
permission_classes = (SentryAppRotateSecretPermission,) | ||
|
||
def post(self, request: Request, sentry_app: SentryApp) -> Response: | ||
if sentry_app.application is None: | ||
return Response({"detail": "Corresponding application was not found."}, status=404) | ||
|
||
new_token = generate_token() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mdtro I know we're making some changes to this method in another PR, is this still safe/intended to be used in this flow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ykamo001 Yes, this is okay for now. :) When we come back through to improve the app API tokens we can adjust these calls then. |
||
sentry_app.application.update(client_secret=new_token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add a try/catch here and respond accordingly? And also log the error? Because this is a DB transaction, there could be errors |
||
return Response(serialize({"clientSecret": new_token})) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from django.urls import reverse | ||
|
||
from sentry.models.apiapplication import ApiApplication | ||
from sentry.models.integrations.sentry_app import SentryApp | ||
from sentry.testutils.cases import APITestCase | ||
from sentry.testutils.silo import control_silo_test | ||
|
||
|
||
@control_silo_test | ||
class SentryAppRotateSecretTest(APITestCase): | ||
def setUp(self): | ||
self.application = ApiApplication.objects.create(owner=self.user) | ||
self.sentry_app = SentryApp.objects.create( | ||
application=self.application, owner_id=self.organization.id, name="a", slug="a" | ||
) | ||
self.url = reverse("sentry-api-0-sentry-app-rotate-secret", args=[self.sentry_app.slug]) | ||
|
||
def test_unauthenticated_call(self): | ||
response = self.client.post(self.url) | ||
assert response.status_code == 401 | ||
|
||
def test_member_call(self): | ||
""" | ||
Tests that a low privileged user from the same org cannot rotate a secret. | ||
""" | ||
other_user = self.create_user() | ||
other_member = self.create_member( | ||
user=other_user, organization=self.organization, role="member" | ||
) | ||
self.login_as(other_member) | ||
response = self.client.post(self.url) | ||
assert response.status_code == 403 | ||
|
||
def test_non_owner_call(self): | ||
""" | ||
Tests that an authenticated user cannot rotate the secret for an app from other org. | ||
""" | ||
self.login_as(self.user) | ||
other_user = self.create_user() | ||
other_org = self.create_organization(owner=other_user) | ||
other_app = ApiApplication.objects.create(owner=other_user, name="b") | ||
other_sentry_app = SentryApp.objects.create( | ||
application=other_app, owner_id=other_org.id, name="b", slug="b" | ||
) | ||
response = self.client.post( | ||
reverse("sentry-api-0-sentry-app-rotate-secret", args=[other_sentry_app.slug]) | ||
) | ||
assert response.status_code == 404 | ||
|
||
def test_invalid_app_id(self): | ||
self.login_as(self.user) | ||
path_with_invalid_id = reverse("sentry-api-0-sentry-app-rotate-secret", args=["invalid"]) | ||
response = self.client.post(path_with_invalid_id) | ||
assert response.status_code == 404 | ||
|
||
def test_valid_call(self): | ||
self.login_as(self.user) | ||
old_secret = self.sentry_app.application.client_secret | ||
response = self.client.post(self.url) | ||
new_secret = response.data["clientSecret"] | ||
assert len(new_secret) == len(old_secret) | ||
assert new_secret != old_secret | ||
|
||
def test_superuser_has_access(self): | ||
superuser = self.create_user(is_superuser=True) | ||
self.login_as(user=superuser, superuser=True) | ||
old_secret = self.sentry_app.application.client_secret | ||
response = self.client.post(self.url) | ||
new_secret = response.data["clientSecret"] | ||
assert len(new_secret) == len(old_secret) | ||
assert new_secret != old_secret | ||
|
||
def test_no_corresponding_application_found(self): | ||
self.login_as(self.user) | ||
other_sentry_app = SentryApp.objects.create( | ||
application=None, owner_id=self.organization.id, name="c", slug="c" | ||
) | ||
response = self.client.post( | ||
reverse("sentry-api-0-sentry-app-rotate-secret", args=[other_sentry_app.slug]) | ||
) | ||
assert response.status_code == 404 | ||
assert "Corresponding application was not found." in response.data["detail"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cathteng should superusers be allowed to do this for other orgs on their behalf for support cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair. maybe we only let superusers with write privileges do this?