Skip to content

Commit eded43a

Browse files
committed
Thread wakeup: replace serial wakeup with tree wakeup
1 parent f207224 commit eded43a

File tree

3 files changed

+36
-10
lines changed

3 files changed

+36
-10
lines changed

src/julia_threads.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ typedef struct _jl_tls_states_t {
219219
// some hidden state (usually just because we don't have the type's size declaration)
220220
#ifdef JL_LIBRARY_EXPORTS
221221
uv_mutex_t sleep_lock;
222+
int wake_all;
222223
uv_cond_t wake_signal;
223224
#endif
224225
} jl_tls_states_t;

src/safepoint.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ void jl_safepoint_wait_gc(jl_task_t *ct) JL_NOTSAFEPOINT
270270
}
271271
}
272272

273+
int wake_child_threads(int16_t) JL_NOTSAFEPOINT;
274+
273275
// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead
274276
void jl_safepoint_wait_thread_resume(jl_task_t *ct)
275277
{
@@ -288,8 +290,13 @@ void jl_safepoint_wait_thread_resume(jl_task_t *ct)
288290
uv_cond_broadcast(&safepoint_cond_begin);
289291
uv_mutex_unlock(&safepoint_lock);
290292
uv_mutex_lock(&ct->ptls->sleep_lock);
291-
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count))
293+
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) {
292294
uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock);
295+
if (ct->ptls->wake_all) {
296+
ct->ptls->wake_all = 0;
297+
wake_child_threads(ct->ptls->tid);
298+
}
299+
}
293300
}
294301
// must exit gc while still holding the mutex_unlock, so we know other
295302
// threads in jl_safepoint_suspend_thread will observe this thread in the

src/scheduler.c

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ void jl_threadfun(void *arg)
127127
void jl_init_thread_scheduler(jl_ptls_t ptls) JL_NOTSAFEPOINT
128128
{
129129
uv_mutex_init(&ptls->sleep_lock);
130+
ptls->wake_all = 0;
130131
uv_cond_init(&ptls->wake_signal);
131132
// record that there is now another thread that may be used to schedule work
132133
// we will decrement this again in scheduler_delete_thread, only slightly
@@ -213,7 +214,7 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
213214
return 0;
214215
}
215216

216-
static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
217+
static int wake_thread(int16_t tid, int wake_all) JL_NOTSAFEPOINT
217218
{
218219
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
219220

@@ -224,6 +225,7 @@ static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
224225
assert(wasrunning); (void)wasrunning;
225226
JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state);
226227
uv_mutex_lock(&ptls2->sleep_lock);
228+
ptls2->wake_all = 1;
227229
uv_cond_signal(&ptls2->wake_signal);
228230
uv_mutex_unlock(&ptls2->sleep_lock);
229231
return 1;
@@ -240,6 +242,21 @@ static void wake_libuv(void) JL_NOTSAFEPOINT
240242
JULIA_DEBUG_SLEEPWAKE( io_wakeup_leave = cycleclock() );
241243
}
242244

245+
int wake_child_threads(int16_t self) JL_NOTSAFEPOINT
246+
{
247+
int any_asleep = 0;
248+
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
249+
int16_t tid1 = (2 * self) + 1;
250+
if (tid1 < nthreads) {
251+
any_asleep |= wake_thread(tid1, 1);
252+
}
253+
int16_t tid2 = (2 * self) + 2;
254+
if (tid2 < nthreads) {
255+
any_asleep |= wake_thread(tid2, 1);
256+
}
257+
return any_asleep;
258+
}
259+
243260
void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls when we have it already available to save a lookup
244261
int16_t self = jl_atomic_load_relaxed(&ct->tid);
245262
if (tid != self)
@@ -262,7 +279,7 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls
262279
}
263280
else {
264281
// something added to the sticky-queue: notify that thread
265-
if (wake_thread(tid) && uvlock != ct) {
282+
if (wake_thread(tid, 0) && uvlock != ct) {
266283
// check if we need to notify uv_run too
267284
jl_fence();
268285
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
@@ -279,14 +296,10 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls
279296
// something added to the multi-queue: notify all threads
280297
// in the future, we might want to instead wake some fraction of threads,
281298
// and let each of those wake additional threads if they find work
282-
int anysleep = 0;
283-
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
284-
for (tid = 0; tid < nthreads; tid++) {
285-
if (tid != self)
286-
anysleep |= wake_thread(tid);
287-
}
299+
wake_child_threads(self);
300+
288301
// check if we need to notify uv_run too
289-
if (uvlock != ct && anysleep) {
302+
if (uvlock != ct) {
290303
jl_fence();
291304
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL)
292305
wake_libuv();
@@ -518,6 +531,11 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
518531
}
519532
// else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0?
520533
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
534+
// we were woken up; should we wake others?
535+
if (!may_sleep(ptls) && ptls->wake_all) {
536+
ptls->wake_all = 0;
537+
wake_child_threads(ptls->tid);
538+
}
521539
}
522540
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
523541
assert(jl_atomic_load_relaxed(&n_threads_running));

0 commit comments

Comments
 (0)