1
1
import logging
2
2
import multiprocessing
3
+ import multiprocessing .context
3
4
import threading
4
5
import time
5
6
from collections .abc import Callable
7
+ from functools import partial
6
8
7
9
import orjson
8
10
import sentry_sdk
15
17
from sentry .conf .types .kafka_definition import Topic
16
18
from sentry .spans .buffer import SpansBuffer
17
19
from sentry .utils import metrics
20
+ from sentry .utils .arroyo import run_with_initialized_sentry
18
21
from sentry .utils .kafka_config import get_kafka_producer_cluster_options , get_topic_definition
19
22
20
23
MAX_PROCESS_RESTARTS = 10
@@ -44,41 +47,45 @@ def __init__(
44
47
self .buffer = buffer
45
48
self .next_step = next_step
46
49
47
- self .stopped = multiprocessing .Value ("i" , 0 )
50
+ self .mp_context = mp_context = multiprocessing .get_context ("spawn" )
51
+ self .stopped = mp_context .Value ("i" , 0 )
48
52
self .redis_was_full = False
49
- self .current_drift = multiprocessing .Value ("i" , 0 )
50
- self .backpressure_since = multiprocessing .Value ("i" , 0 )
51
- self .healthy_since = multiprocessing .Value ("i" , 0 )
53
+ self .current_drift = mp_context .Value ("i" , 0 )
54
+ self .backpressure_since = mp_context .Value ("i" , 0 )
55
+ self .healthy_since = mp_context .Value ("i" , 0 )
52
56
self .process_restarts = 0
53
57
self .produce_to_pipe = produce_to_pipe
54
58
55
59
self ._create_process ()
56
60
57
61
def _create_process (self ):
58
- from sentry .utils .arroyo import _get_arroyo_subprocess_initializer
59
-
60
62
# Optimistically reset healthy_since to avoid a race between the
61
63
# starting process and the next flush cycle. Keep back pressure across
62
64
# the restart, however.
63
65
self .healthy_since .value = int (time .time ())
64
66
65
- make_process : Callable [..., multiprocessing .Process | threading .Thread ]
67
+ make_process : Callable [..., multiprocessing .context . SpawnProcess | threading .Thread ]
66
68
if self .produce_to_pipe is None :
67
- initializer = _get_arroyo_subprocess_initializer (None )
68
- make_process = multiprocessing .Process
69
+ target = run_with_initialized_sentry (
70
+ SpanFlusher .main ,
71
+ # unpickling buffer will import sentry, so it needs to be
72
+ # pickled separately. at the same time, pickling
73
+ # synchronization primitives like multiprocessing.Value can
74
+ # only be done by the Process
75
+ self .buffer ,
76
+ )
77
+ make_process = self .mp_context .Process
69
78
else :
70
- initializer = None
79
+ target = partial ( SpanFlusher . main , self . buffer )
71
80
make_process = threading .Thread
72
81
73
82
self .process = make_process (
74
- target = SpanFlusher . main ,
83
+ target = target ,
75
84
args = (
76
- initializer ,
77
85
self .stopped ,
78
86
self .current_drift ,
79
87
self .backpressure_since ,
80
88
self .healthy_since ,
81
- self .buffer ,
82
89
self .produce_to_pipe ,
83
90
),
84
91
daemon = True ,
@@ -88,17 +95,13 @@ def _create_process(self):
88
95
89
96
@staticmethod
90
97
def main (
91
- initializer : Callable | None ,
98
+ buffer : SpansBuffer ,
92
99
stopped ,
93
100
current_drift ,
94
101
backpressure_since ,
95
102
healthy_since ,
96
- buffer : SpansBuffer ,
97
103
produce_to_pipe : Callable [[KafkaPayload ], None ] | None ,
98
104
) -> None :
99
- if initializer :
100
- initializer ()
101
-
102
105
sentry_sdk .set_tag ("sentry_spans_buffer_component" , "flusher" )
103
106
104
107
try :
0 commit comments