Skip to content

Commit b89c554

Browse files
authored
chore(hybridcloud) Cleanup logging and option usage in webhooks (#69192)
With multi-org webhook handling fixed, I don't need all this logging. The parallel delivery can be used more frequently to help deliver small backlogs faster. I've also simplified the parser interface as we had two very similar methods and no way to fully remove usage of the non-integration focused one. I think having fewer methods makes the design simpler.
1 parent 0277437 commit b89c554

File tree

10 files changed

+19
-68
lines changed

10 files changed

+19
-68
lines changed

src/sentry/hybridcloud/tasks/deliver_webhooks.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ def schedule_webhook_delivery(**kwargs) -> None:
9494
metrics.distribution(
9595
"hybridcloud.schedule_webhook_delivery.mailbox_count", scheduled_mailboxes.count()
9696
)
97-
use_parallel = options.get("hybridcloud.webhookpayload.use_parallel")
9897
for record in scheduled_mailboxes[:BATCH_SIZE]:
9998
# Reschedule the records that we will attempt to deliver next.
10099
# We update schedule_for in an attempt to minimize races for potentially in-flight batches.
@@ -106,8 +105,8 @@ def schedule_webhook_delivery(**kwargs) -> None:
106105
updated_count = WebhookPayload.objects.filter(id__in=Subquery(mailbox_batch)).update(
107106
schedule_for=timezone.now() + BATCH_SCHEDULE_OFFSET
108107
)
109-
# If we have a half-full batch we should process in parallel as we're likely behind.
110-
if use_parallel and updated_count >= int(MAX_MAILBOX_DRAIN / 2):
108+
# If we have 1/3 or more in a mailbox we should process in parallel as we're likely behind.
109+
if updated_count >= int(MAX_MAILBOX_DRAIN / 3):
111110
drain_mailbox_parallel.delay(record["id"])
112111
else:
113112
drain_mailbox.delay(record["id"])
@@ -216,20 +215,11 @@ def drain_mailbox_parallel(payload_id: int) -> None:
216215
},
217216
)
218217
return
219-
logger.info(
220-
"drain_mailbox_parallel.start",
221-
extra={"mailbox_name": payload.mailbox_name, "id": payload_id},
222-
)
223218

224219
# Remove batches payloads that have been backlogged for MAX_DELIVERY_AGE.
225220
# Once payloads are this old they are low value, and we're better off prioritizing new work.
226221
max_age = timezone.now() - MAX_DELIVERY_AGE
227222
if payload.date_added < max_age:
228-
logger.info(
229-
"drain_mailbox_parallel.max_age_start",
230-
extra={"mailbox_name": payload.mailbox_name, "id": payload_id},
231-
)
232-
233223
# We delete chunks of stale messages using a subquery
234224
# because postgres cannot do delete with limit
235225
stale_query = WebhookPayload.objects.filter(
@@ -283,14 +273,6 @@ def drain_mailbox_parallel(payload_id: int) -> None:
283273
threadpool.submit(deliver_message_parallel, record)
284274
for record in query[:worker_threads]
285275
}
286-
logger.info(
287-
"drain_mailbox_parallel.send_batch",
288-
extra={
289-
"mailbox_name": payload.mailbox_name,
290-
"count": len(futures),
291-
"threads": worker_threads,
292-
},
293-
)
294276
for future in as_completed(futures):
295277
payload_record, err = future.result()
296278

src/sentry/middleware/integrations/parsers/base.py

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -187,22 +187,14 @@ def get_mailbox_identifier(self, integration: RpcIntegration, data: Mapping[str,
187187
that can be delivered in parallel. Requires the integration to implement
188188
`mailbox_bucket_id`
189189
"""
190-
# One integration is misbehaving in saas, and only need logs from that instance
191-
extra_logging = integration.id == 122177
192-
193190
# If we get fewer than 3000 in 1 hour we don't need to split into buckets
194191
ratelimit_key = f"webhookpayload:{self.provider}:{integration.id}"
195192
use_buckets_key = f"{ratelimit_key}:use_buckets"
196193

197-
is_limited = False
198-
ratelimit_val = None
199-
reset = None
200194
use_buckets = cache.get(use_buckets_key)
201-
if not use_buckets:
202-
is_limited, ratelimit_val, reset = ratelimiter.is_limited_with_value(
203-
key=ratelimit_key, window=60 * 60, limit=3000
204-
)
205-
if not use_buckets and is_limited:
195+
if not use_buckets and ratelimiter.is_limited(
196+
key=ratelimit_key, window=60 * 60, limit=3000
197+
):
206198
# Once we have gone over the rate limit in a day, we use smaller
207199
# buckets for the next day.
208200
cache.set(use_buckets_key, 1, timeout=ONE_DAY)
@@ -211,19 +203,6 @@ def get_mailbox_identifier(self, integration: RpcIntegration, data: Mapping[str,
211203
"integrations.parser.activate_buckets",
212204
extra={"provider": self.provider, "integration_id": integration.id},
213205
)
214-
215-
if extra_logging:
216-
logger.info(
217-
"integrations.parser.use_buckets",
218-
extra={
219-
"provider": self.provider,
220-
"result": use_buckets or False,
221-
"ratelimit_key": ratelimit_key,
222-
"is_limited": is_limited,
223-
"ratelimit_val": ratelimit_val,
224-
"reset": reset,
225-
},
226-
)
227206
if not use_buckets:
228207
return str(integration.id)
229208

src/sentry/middleware/integrations/parsers/github.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,6 @@ def get_response(self):
6565
except (Integration.DoesNotExist, OrganizationIntegration.DoesNotExist):
6666
return self.get_default_missing_integration_response()
6767

68-
return self.get_response_from_webhookpayload_for_integration(
69-
regions=regions, integration=integration
68+
return self.get_response_from_webhookpayload(
69+
regions=regions, identifier=integration.id, integration_id=integration.id
7070
)

src/sentry/middleware/integrations/parsers/gitlab.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,9 @@ def get_response_from_gitlab_webhook(self):
8484
except ValueError:
8585
data = {}
8686

87-
identifier = self.get_mailbox_identifier(integration, data)
88-
logger.info(
89-
"gitlab.webhookpayload.save",
90-
extra={
91-
"identifier": identifier,
92-
"integration_id": integration.id,
93-
},
94-
)
9587
return self.get_response_from_webhookpayload(
9688
regions=regions,
97-
identifier=identifier,
89+
identifier=self.get_mailbox_identifier(integration, data),
9890
integration_id=integration.id,
9991
)
10092

src/sentry/middleware/integrations/parsers/jira.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ def get_response(self):
8181
return self.get_response_from_control_silo()
8282

8383
if self.view_class in self.outbox_response_region_classes:
84-
return self.get_response_from_webhookpayload_for_integration(
85-
regions=regions, integration=integration
84+
return self.get_response_from_webhookpayload(
85+
regions=regions, identifier=integration.id, integration_id=integration.id
8686
)
8787

8888
return self.get_response_from_control_silo()

src/sentry/middleware/integrations/parsers/msteams.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,6 @@ def get_response(self) -> HttpResponseBase:
132132
"Scheduling event for request",
133133
extra={"request_data": self.request_data},
134134
)
135-
return self.get_response_from_webhookpayload_for_integration(
136-
regions=regions, integration=integration
135+
return self.get_response_from_webhookpayload(
136+
regions=regions, identifier=integration.id, integration_id=integration.id
137137
)

src/sentry/middleware/integrations/parsers/vsts.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,6 @@ def get_response(self) -> HttpResponseBase:
4545
except (Integration.DoesNotExist, OrganizationIntegration.DoesNotExist):
4646
return self.get_default_missing_integration_response()
4747

48-
return self.get_response_from_webhookpayload_for_integration(
49-
regions=regions, integration=integration
48+
return self.get_response_from_webhookpayload(
49+
regions=regions, identifier=integration.id, integration_id=integration.id
5050
)

tests/sentry/hybridcloud/tasks/test_deliver_webhooks.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
)
1818
from sentry.testutils.cases import TestCase
1919
from sentry.testutils.factories import Factories
20-
from sentry.testutils.helpers.options import override_options
2120
from sentry.testutils.region import override_regions
2221
from sentry.testutils.silo import control_silo_test
2322
from sentry.types.region import Region, RegionCategory, RegionResolutionError
@@ -135,10 +134,9 @@ def test_schedule_mailbox_with_more_than_batch_size_records(self):
135134
# No messages delivered
136135
assert WebhookPayload.objects.count() == num_records
137136

138-
@override_options({"hybridcloud.webhookpayload.use_parallel": True})
139137
@patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox_parallel")
140138
def test_schedule_mailbox_parallel_task(self, mock_deliver):
141-
for _ in range(0, int(MAX_MAILBOX_DRAIN / 2 + 1)):
139+
for _ in range(0, int(MAX_MAILBOX_DRAIN / 3 + 1)):
142140
self.create_webhook_payload(
143141
mailbox_name="github:123",
144142
region_name="us",

tests/sentry/middleware/integrations/parsers/test_gitlab.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ def test_routing_webhook_with_mailbox_buckets(self):
175175
HTTP_X_GITLAB_EVENT="Push Hook",
176176
)
177177
with mock.patch(
178-
"sentry.middleware.integrations.parsers.base.ratelimiter.is_limited_with_value"
178+
"sentry.middleware.integrations.parsers.base.ratelimiter.is_limited"
179179
) as mock_is_limited:
180-
mock_is_limited.return_value = (True, 3500, 360)
180+
mock_is_limited.return_value = True
181181
parser = GitlabRequestParser(request=request, response_handler=self.get_response)
182182
response = parser.get_response()
183183

tests/sentry/middleware/integrations/parsers/test_jira_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@ def test_routing_webhook_with_mailbox_buckets_high_volume(self):
120120
parser = JiraServerRequestParser(request=request, response_handler=self.get_response)
121121

122122
with mock.patch(
123-
"sentry.middleware.integrations.parsers.base.ratelimiter.is_limited_with_value"
123+
"sentry.middleware.integrations.parsers.base.ratelimiter.is_limited"
124124
) as mock_is_limited, mock.patch(
125125
"sentry.middleware.integrations.parsers.jira_server.get_integration_from_token"
126126
) as mock_get_token:
127-
mock_is_limited.return_value = (True, 3500, 360)
127+
mock_is_limited.return_value = True
128128
mock_get_token.return_value = self.integration
129129
response = parser.get_response()
130130
assert isinstance(response, HttpResponse)

0 commit comments

Comments
 (0)