Skip to content

Thread wakeup: replace serial wakeup with tree wakeup #57649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ typedef struct _jl_tls_states_t {
// some hidden state (usually just because we don't have the type's size declaration)
#ifdef JL_LIBRARY_EXPORTS
uv_mutex_t sleep_lock;
int wake_next;
uv_cond_t wake_signal;
#endif
} jl_tls_states_t;
Expand Down
9 changes: 8 additions & 1 deletion src/safepoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ void jl_safepoint_wait_gc(jl_task_t *ct) JL_NOTSAFEPOINT
}
}

int wake_next_threads(int16_t) JL_NOTSAFEPOINT;

// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead
void jl_safepoint_wait_thread_resume(jl_task_t *ct)
{
Expand All @@ -288,8 +290,13 @@ void jl_safepoint_wait_thread_resume(jl_task_t *ct)
uv_cond_broadcast(&safepoint_cond_begin);
uv_mutex_unlock(&safepoint_lock);
uv_mutex_lock(&ct->ptls->sleep_lock);
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count))
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) {
uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock);
if (ct->ptls->wake_next) {
ct->ptls->wake_next = 0;
wake_next_threads(ct->ptls->tid);
}
}
}
// must exit gc while still holding the mutex_unlock, so we know other
// threads in jl_safepoint_suspend_thread will observe this thread in the
Expand Down
46 changes: 36 additions & 10 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void jl_threadfun(void *arg)
void jl_init_thread_scheduler(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
uv_mutex_init(&ptls->sleep_lock);
ptls->wake_next = 0;
uv_cond_init(&ptls->wake_signal);
// record that there is now another thread that may be used to schedule work
// we will decrement this again in scheduler_delete_thread, only slightly
Expand Down Expand Up @@ -213,25 +214,49 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
return 0;
}

static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
int wake_next_threads(int16_t) JL_NOTSAFEPOINT;

static int wake_thread(int16_t tid, int wake_next) JL_NOTSAFEPOINT
{
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];

// only do something if tid is sleeping
if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) {
int8_t state = sleeping;
if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) {
int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); // increment in-flight wakeup count
assert(wasrunning); (void)wasrunning;
JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state);
uv_mutex_lock(&ptls2->sleep_lock);
ptls2->wake_next = wake_next;
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
return 1;
}
// TODO: tid was sleeping, but we failed to transition its state?!
}

// tid wasn't sleeping so we might have to wake its children for it
if (wake_next)
return wake_next_threads(tid);

return 0;
}

int wake_next_threads(int16_t tid) JL_NOTSAFEPOINT
{
int any_asleep = 0;
int nthreads = jl_atomic_load_acquire(&jl_n_threads) - jl_n_gcthreads;
int16_t tid1 = (2 * tid) + 1;
if (tid1 < nthreads) {
any_asleep |= wake_thread(tid1, 1);
int16_t tid2 = (2 * tid) + 2;
if (tid2 < nthreads) {
any_asleep |= wake_thread(tid2, 1);
}
}
return any_asleep;
}

static void wake_libuv(void) JL_NOTSAFEPOINT
{
Expand Down Expand Up @@ -262,7 +287,8 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls
}
else {
// something added to the sticky-queue: notify that thread
if (wake_thread(tid) && uvlock != ct) {
wake_thread(tid, 0);
if (uvlock != ct) {
// check if we need to notify uv_run too
jl_fence();
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
Expand All @@ -279,14 +305,9 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
int anysleep = 0;
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
for (tid = 0; tid < nthreads; tid++) {
if (tid != self)
anysleep |= wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != ct && anysleep) {
wake_thread(0, 1);
if (uvlock != ct) {
// check if we need to notify uv_run too
jl_fence();
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL)
wake_libuv();
Expand Down Expand Up @@ -518,6 +539,11 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
// else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
// we were woken up; should we wake others?
if (!may_sleep(ptls) && ptls->wake_next) {
wake_next_threads(ptls->tid);
ptls->wake_next = 0;
}
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&n_threads_running));
Expand Down