From 45aa762b3411b19c44bac06c746641236a7e983d Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 23 May 2025 14:58:52 -0400 Subject: [PATCH] fix(taskworker) Add shim for __start_time Our celery wrappers inject `__start_time` into the keyword args of tasks. Under celery this parameter is popped before the task function is invoked. This change emulates that behavior which will make partial rollouts of task namespaces less noisy. --- src/sentry/taskworker/workerchild.py | 7 +++++++ tests/sentry/taskworker/test_worker.py | 29 ++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/sentry/taskworker/workerchild.py b/src/sentry/taskworker/workerchild.py index f5e66a0ac24099..ccf5391a610bf8 100644 --- a/src/sentry/taskworker/workerchild.py +++ b/src/sentry/taskworker/workerchild.py @@ -310,6 +310,13 @@ def _execute_activation(task_func: Task[Any, Any], activation: TaskActivation) - ) span.set_data(SPANDATA.MESSAGING_SYSTEM, "taskworker") + # TODO(taskworker) remove this when doing cleanup + # The `__start_time` parameter is spliced into task parameters by + # sentry.celery.SentryTask._add_metadata and needs to be removed + # from kwargs like sentry.tasks.base.instrumented_task does. + if "__start_time" in kwargs: + kwargs.pop("__start_time") + try: task_func(*args, **kwargs) transaction.set_status(SPANSTATUS.OK) diff --git a/tests/sentry/taskworker/test_worker.py b/tests/sentry/taskworker/test_worker.py index bc8bb93a19cc8e..eff5b7c833b1f0 100644 --- a/tests/sentry/taskworker/test_worker.py +++ b/tests/sentry/taskworker/test_worker.py @@ -299,6 +299,35 @@ def test_child_process_complete(mock_capture_checkin) -> None: assert mock_capture_checkin.call_count == 0 +@pytest.mark.django_db +def test_child_process_remove_start_time_kwargs() -> None: + activation = TaskActivation( + id="6789", + taskname="examples.will_retry", + namespace="examples", + parameters='{"args": ["stuff"], "kwargs": {"__start_time": 123}}', + processing_deadline_duration=100000, + ) + todo: queue.Queue[TaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(activation) + child_process( + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + assert todo.empty() + result = processed.get() + assert result.task_id == activation.id + assert result.status == TASK_ACTIVATION_STATUS_COMPLETE + + @pytest.mark.django_db @mock.patch("sentry.usage_accountant.record") def test_child_process_complete_record_usage(mock_record: mock.Mock) -> None: