Skip to content

Commit f99ad69

Browse files
markstoryandrewshie-sentry
authored andcommitted
fix(taskworker) Retain task to host mapping on unavailable errors (#92426)
When a broker is unavailable we should retain the task : host mapping so that we can redeliver the SetStatus request in the future.
1 parent e2f5ebd commit f99ad69

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

src/sentry/taskworker/client.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,23 +196,29 @@ def update_task(
196196
status=status,
197197
fetch_next_task=fetch_next_task,
198198
)
199+
200+
host = self._task_id_to_host.get(task_id, None)
201+
if host is None:
202+
metrics.incr("taskworker.client.task_id_not_in_client")
203+
return None
199204
try:
200205
with metrics.timer("taskworker.update_task.rpc"):
201-
if task_id not in self._task_id_to_host:
202-
metrics.incr("taskworker.client.task_id_not_in_client")
203-
return None
204-
host = self._task_id_to_host.pop(task_id)
205206
response = self._host_to_stubs[host].SetTaskStatus(request)
207+
del self._task_id_to_host[task_id]
206208
except grpc.RpcError as err:
207209
metrics.incr(
208210
"taskworker.client.rpc_error",
209211
tags={"method": "SetTaskStatus", "status": err.code().name},
210212
)
211213
if err.code() == grpc.StatusCode.NOT_FOUND:
214+
del self._task_id_to_host[task_id]
215+
212216
# The current broker is empty, switch.
213217
self._num_tasks_before_rebalance = 0
218+
214219
return None
215220
raise
221+
216222
if response.HasField("task"):
217223
self._task_id_to_host[response.task.id] = host
218224
return response.task

tests/sentry/taskworker/test_client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,25 @@ def test_update_task_not_found():
306306
assert result is None
307307

308308

309+
@django_db_all
310+
def test_update_task_unavailable_retain_task_to_host():
311+
channel = MockChannel()
312+
channel.add_response(
313+
"/sentry_protos.taskbroker.v1.ConsumerService/SetTaskStatus",
314+
MockGrpcError(grpc.StatusCode.UNAVAILABLE, "broker down"),
315+
)
316+
with patch("sentry.taskworker.client.grpc.insecure_channel") as mock_channel:
317+
mock_channel.return_value = channel
318+
client = TaskworkerClient("localhost:50051", 1)
319+
client._task_id_to_host = {"abc123": "localhost-0:50051"}
320+
with pytest.raises(MockGrpcError) as err:
321+
client.update_task(
322+
"abc123", TASK_ACTIVATION_STATUS_RETRY, FetchNextTask(namespace=None)
323+
)
324+
assert "broker down" in str(err.value)
325+
assert client._task_id_to_host == {"abc123": "localhost-0:50051"}
326+
327+
309328
@django_db_all
310329
def test_client_loadbalance():
311330
channel_0 = MockChannel()

0 commit comments

Comments
 (0)