Skip to content

Commit 8bf424f

Browse files
authored
fix(taskworker) Rebalance when a broker has no tasks (#92127)
In multi-broker deployments that are lower volume it is possible for a worker to select a broker that is empty. When we get a notfound response, we should switch brokers.
1 parent 266fd8d commit 8bf424f

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed

src/sentry/taskworker/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ def get_task(self, namespace: str | None = None) -> TaskActivation | None:
166166
"taskworker.client.rpc_error", tags={"method": "GetTask", "status": err.code().name}
167167
)
168168
if err.code() == grpc.StatusCode.NOT_FOUND:
169+
# Because our current broker doesn't have any tasks, try rebalancing.
170+
self._num_tasks_before_rebalance = 0
169171
return None
170172
raise
171173
if response.HasField("task"):
@@ -207,6 +209,8 @@ def update_task(
207209
tags={"method": "SetTaskStatus", "status": err.code().name},
208210
)
209211
if err.code() == grpc.StatusCode.NOT_FOUND:
212+
# The current broker is empty, switch.
213+
self._num_tasks_before_rebalance = 0
210214
return None
211215
raise
212216
if response.HasField("task"):

tests/sentry/taskworker/test_client.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,3 +429,83 @@ def test_client_loadbalance():
429429

430430
client.update_task(task_3.id, TASK_ACTIVATION_STATUS_COMPLETE, None)
431431
assert client._task_id_to_host == {}
432+
433+
434+
@django_db_all
435+
def test_client_loadbalance_on_notfound():
436+
channel_0 = MockChannel()
437+
channel_0.add_response(
438+
"/sentry_protos.taskbroker.v1.ConsumerService/GetTask",
439+
MockGrpcError(grpc.StatusCode.NOT_FOUND, "no pending task found"),
440+
)
441+
442+
channel_1 = MockChannel()
443+
channel_1.add_response(
444+
"/sentry_protos.taskbroker.v1.ConsumerService/GetTask",
445+
GetTaskResponse(
446+
task=TaskActivation(
447+
id="1",
448+
namespace="testing",
449+
taskname="do_thing",
450+
parameters="",
451+
headers={},
452+
processing_deadline_duration=10,
453+
)
454+
),
455+
)
456+
channel_1.add_response(
457+
"/sentry_protos.taskbroker.v1.ConsumerService/SetTaskStatus",
458+
MockGrpcError(grpc.StatusCode.NOT_FOUND, "no pending task found"),
459+
)
460+
461+
channel_2 = MockChannel()
462+
channel_2.add_response(
463+
"/sentry_protos.taskbroker.v1.ConsumerService/GetTask",
464+
GetTaskResponse(
465+
task=TaskActivation(
466+
id="2",
467+
namespace="testing",
468+
taskname="do_thing",
469+
parameters="",
470+
headers={},
471+
processing_deadline_duration=10,
472+
)
473+
),
474+
)
475+
476+
with patch("sentry.taskworker.client.grpc.insecure_channel") as mock_channel:
477+
mock_channel.side_effect = [channel_0, channel_1, channel_2]
478+
with patch("sentry.taskworker.client.random.choice") as mock_randchoice:
479+
mock_randchoice.side_effect = [
480+
"localhost-0:50051",
481+
"localhost-1:50051",
482+
"localhost-2:50051",
483+
]
484+
client = TaskworkerClient(
485+
"localhost:50051", num_brokers=3, max_tasks_before_rebalance=30
486+
)
487+
488+
# Fetch from the first channel, it should return notfound
489+
task_0 = client.get_task()
490+
assert task_0 is None
491+
492+
# Fetch again, this time from channel_1
493+
task_1 = client.get_task()
494+
assert task_1 and task_1.id == "1"
495+
496+
assert client._task_id_to_host == {
497+
"1": "localhost-1:50051",
498+
}
499+
500+
res = client.update_task(task_1.id, TASK_ACTIVATION_STATUS_COMPLETE, None)
501+
assert res is None
502+
assert client._task_id_to_host == {}
503+
504+
# Because SetStatus on channel_1 returned notfound the client
505+
# should switch brokers.
506+
task_2 = client.get_task()
507+
assert task_2 and task_2.id == "2"
508+
509+
assert client._task_id_to_host == {
510+
"2": "localhost-2:50051",
511+
}

0 commit comments

Comments
 (0)