Skip to content

Commit f5e9c6d

Browse files
authored
fix(crons): Fix connection spikes during partition assignment (#70585)
`on_partitions_assigned` is called whenever partitions are assigned to a consumer. This calls `_create_strategy`, which calls `create_with_partitions`. Since we create the `ThreadPoolExecutor` in `create_parallel_worker`, this means we create a new threadpool whenever we partition assignment changes. The reason this causes spikes and isn't sustained is that presumably the previous `ProcessingStrategy` stops being used and ends up garbage collected, which results in the `ThreadPoolExecutor` being garbage collected too.
1 parent 0545798 commit f5e9c6d

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

src/sentry/monitors/consumers/monitor_consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,7 @@ def __init__(
10281028
) -> None:
10291029
if mode == "parallel":
10301030
self.parallel = True
1031+
self.parallel_executor = ThreadPoolExecutor(max_workers=self.max_workers)
10311032

10321033
if max_batch_size is not None:
10331034
self.max_batch_size = max_batch_size
@@ -1041,8 +1042,7 @@ def shutdown(self) -> None:
10411042
self.parallel_executor.shutdown()
10421043

10431044
def create_parallel_worker(self, commit: Commit) -> ProcessingStrategy[KafkaPayload]:
1044-
self.parallel_executor = ThreadPoolExecutor(max_workers=self.max_workers)
1045-
1045+
assert self.parallel_executor is not None
10461046
batch_processor = RunTask(
10471047
function=partial(process_batch, self.parallel_executor),
10481048
next_step=CommitOffsets(commit),

0 commit comments

Comments
 (0)