Skip to content

Commit 9c77ec1

Browse files
authored
ggml : synchronize threads using barriers (ggml-org#7993)
1 parent a04a953 commit 9c77ec1

File tree

2 files changed

+81
-150
lines changed

2 files changed

+81
-150
lines changed

.github/workflows/server.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,22 @@ jobs:
8787
exit 1
8888
fi
8989
90+
- name: Build (no OpenMP)
91+
id: cmake_build_no_openmp
92+
if: ${{ matrix.sanitizer == 'THREAD' }}
93+
run: |
94+
cmake -B build \
95+
-DLLAMA_NATIVE=OFF \
96+
-DLLAMA_BUILD_SERVER=ON \
97+
-DLLAMA_CURL=ON \
98+
-DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \
99+
-DLLAMA_SANITIZE_${{ matrix.sanitizer }}=ON \
100+
-DLLAMA_OPENMP=OFF ;
101+
cmake --build build --config ${{ matrix.build_type }} -j $(nproc) --target llama-server
102+
90103
- name: Build
91104
id: cmake_build
105+
if: ${{ matrix.sanitizer != 'THREAD' }}
92106
run: |
93107
cmake -B build \
94108
-DLLAMA_NATIVE=OFF \

ggml.c

Lines changed: 67 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -1753,9 +1753,8 @@ struct ggml_compute_state_shared {
17531753
int n_threads;
17541754

17551755
// synchronization primitives
1756-
atomic_int n_active; // num active threads
1757-
atomic_int node_n; // active graph node
1758-
atomic_int node_task; // active graph node task phase
1756+
atomic_int n_barrier;
1757+
atomic_int n_barrier_passed;
17591758

17601759
ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
17611760
void* abort_callback_data;
@@ -18972,184 +18971,104 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_
1897218971
return n_tasks;
1897318972
}
1897418973

18975-
static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) {
18976-
// wait for other threads to finish
18977-
const int last_node_n = * node_n;
18978-
18979-
while (true) {
18980-
if (do_yield) {
18981-
sched_yield();
18982-
}
18983-
18984-
*node_n = atomic_load(&state->shared->node_n);
18985-
if (*node_n != last_node_n) {
18986-
break;
18987-
}
18988-
18989-
#if defined(__SSE3__)
18990-
// Tell the processor we're spinning. It's a processor hint for spinlocks.
18991-
_mm_pause();
18992-
#endif
18974+
#ifdef GGML_USE_OPENMP
18975+
static void ggml_barrier(struct ggml_compute_state * state) {
18976+
if (state->shared->n_threads == 1) {
18977+
return;
1899318978
}
18979+
18980+
#pragma omp barrier
1899418981
}
18982+
#else
18983+
static void ggml_barrier(struct ggml_compute_state * state) {
18984+
if (state->shared->n_threads == 1) {
18985+
return;
18986+
}
1899518987

18996-
static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) {
18997-
// wait for other threads to finish
18998-
const int last_task_phase = *task_phase;
18988+
atomic_int * n_barrier = &state->shared->n_barrier;
18989+
atomic_int * n_barrier_passed = &state->shared->n_barrier_passed;
1899918990

19000-
while (true) {
19001-
if (do_yield) {
19002-
sched_yield();
19003-
}
18991+
int n_threads = state->shared->n_threads;
18992+
int passed_old = atomic_load(n_barrier_passed);
1900418993

19005-
*task_phase = atomic_load(&state->shared->node_task);
19006-
if (*task_phase != last_task_phase) {
19007-
break;
18994+
if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
18995+
// last thread
18996+
atomic_store(n_barrier, 0);
18997+
atomic_fetch_add(n_barrier_passed, 1);
18998+
} else {
18999+
// wait for other threads
19000+
//while (atomic_load(n_barrier_passed) == passed_old) {
19001+
//}
19002+
const int n_spin_before_sleep = 100000;
19003+
while (true) {
19004+
for (int i = 0; i < n_spin_before_sleep; i++) {
19005+
if (atomic_load(n_barrier_passed) != passed_old) {
19006+
return;
19007+
}
19008+
#if defined(__SSE3__)
19009+
_mm_pause();
19010+
#endif
19011+
}
19012+
sched_yield();
1900819013
}
19009-
19010-
#if defined(__SSE3__)
19011-
// Tell the processor we're spinning. It's a processor hint for spinlocks.
19012-
_mm_pause();
19013-
#endif
1901419014
}
1901519015
}
19016+
#endif
1901619017

1901719018
static thread_ret_t ggml_graph_compute_thread(void * data) {
1901819019
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
1901919020

1902019021
const struct ggml_cgraph * cgraph = state->shared->cgraph;
1902119022
const struct ggml_cplan * cplan = state->shared->cplan;
1902219023

19023-
const int n_threads = state->shared->n_threads;
19024+
const int ith = state->ith;
19025+
const int n_threads = state->shared->n_threads;
1902419026

19025-
set_numa_thread_affinity(state->ith);
19027+
set_numa_thread_affinity(ith);
1902619028

19027-
int node_n = -1;
19028-
int task_phase = GGML_TASK_TYPE_FINALIZE;
19029+
struct ggml_compute_params params = {
19030+
/*.type =*/ GGML_TASK_TYPE_INIT,
19031+
/*.ith =*/ ith,
19032+
/*.nth =*/ state->shared->n_threads,
19033+
/*.wsize =*/ cplan->work_size,
19034+
/*.wdata =*/ cplan->work_data,
19035+
};
1902919036

19030-
while (true) {
19037+
for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
1903119038
if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
19032-
state->shared->node_n += 1;
1903319039
state->ec = GGML_STATUS_ABORTED;
1903419040
return 0;
1903519041
}
1903619042

19037-
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
19038-
// all other threads are finished and spinning
19039-
// do finalize and init here so we don't have synchronize again
19040-
struct ggml_compute_params params = {
19041-
/*.type =*/ GGML_TASK_TYPE_FINALIZE,
19042-
/*.ith =*/ 0,
19043-
/*.nth =*/ 0,
19044-
/*.wsize =*/ cplan->work_size,
19045-
/*.wdata =*/ cplan->work_data,
19046-
};
19047-
19048-
if (node_n != -1) {
19049-
/* FINALIZE */
19050-
struct ggml_tensor * node = cgraph->nodes[node_n];
19051-
if (GGML_OP_HAS_FINALIZE[node->op]) {
19052-
params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
19053-
ggml_compute_forward(&params, node, state);
19054-
}
19055-
ggml_graph_compute_perf_stats_node(node, state->shared);
19056-
}
19057-
19058-
// distribute new work or execute it direct if 1T
19059-
while (++node_n < cgraph->n_nodes) {
19060-
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
19061-
struct ggml_tensor * node = cgraph->nodes[node_n];
19062-
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
19063-
19064-
state->shared->perf_node_start_cycles = ggml_perf_cycles();
19065-
state->shared->perf_node_start_time_us = ggml_perf_time_us();
19066-
19067-
params.nth = n_tasks;
19068-
19069-
if (n_tasks == 1) {
19070-
/* INIT */
19071-
if (GGML_OP_HAS_INIT[node->op]) {
19072-
params.type = GGML_TASK_TYPE_INIT;
19073-
ggml_compute_forward(&params, node, state);
19074-
}
19075-
19076-
// TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1,
19077-
// they do something more efficient than spinning (?)
19078-
params.type = GGML_TASK_TYPE_COMPUTE;
19079-
ggml_compute_forward(&params, node, state);
19080-
19081-
if (GGML_OP_HAS_FINALIZE[node->op]) {
19082-
params.type = GGML_TASK_TYPE_FINALIZE;
19083-
ggml_compute_forward(&params, node, state);
19084-
}
19085-
19086-
ggml_graph_compute_perf_stats_node(node, state->shared);
19087-
} else {
19088-
break;
19089-
}
19090-
19091-
if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
19092-
break;
19093-
}
19094-
}
19095-
19096-
task_phase = GGML_TASK_TYPE_INIT;
19097-
atomic_store(&state->shared->n_active, n_threads);
19098-
atomic_store(&state->shared->node_n, node_n);
19099-
atomic_store(&state->shared->node_task, task_phase);
19100-
} else {
19101-
ggml_graph_compute_thread_sync_node(&node_n, state, false);
19102-
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
19103-
}
19104-
19105-
// check if we should stop
19106-
if (node_n >= cgraph->n_nodes) break;
19107-
19108-
/* INIT & COMPUTE */
1910919043
struct ggml_tensor * node = cgraph->nodes[node_n];
1911019044
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
1911119045

19112-
struct ggml_compute_params params = {
19113-
/*.type =*/ GGML_TASK_TYPE_INIT,
19114-
/*.ith =*/ state->ith,
19115-
/*.nth =*/ n_tasks,
19116-
/*.wsize =*/ cplan->work_size,
19117-
/*.wdata =*/ cplan->work_data,
19118-
};
19046+
params.nth = n_tasks;
1911919047

19120-
if (state->ith < n_tasks) {
19121-
if (GGML_OP_HAS_INIT[node->op]) {
19048+
/* INIT */
19049+
if (GGML_OP_HAS_INIT[node->op]) {
19050+
if (ith < n_tasks) {
19051+
params.type = GGML_TASK_TYPE_INIT;
1912219052
ggml_compute_forward(&params, node, state);
1912319053
}
19054+
ggml_barrier(state);
1912419055
}
1912519056

19126-
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
19127-
task_phase = GGML_TASK_TYPE_COMPUTE;
19128-
atomic_store(&state->shared->n_active, n_threads);
19129-
atomic_store(&state->shared->node_task, task_phase);
19130-
}
19131-
else {
19132-
// TODO: this sched_yield can have significant impact on the performance - either positive or negative
19133-
// depending on the workload and the operating system.
19134-
// since it is not clear what is the best approach, it should potentially become user-configurable
19135-
// ref: https://github.com/ggerganov/ggml/issues/291
19136-
// UPD: adding the do_yield flag seems to resolve the issue universally
19137-
const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT;
19138-
ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield);
19139-
}
19140-
19141-
if (state->ith < n_tasks) {
19057+
/* COMPUTE */
19058+
if (ith < n_tasks) {
1914219059
params.type = GGML_TASK_TYPE_COMPUTE;
1914319060
ggml_compute_forward(&params, node, state);
1914419061
}
1914519062

19146-
if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
19147-
task_phase = GGML_TASK_TYPE_FINALIZE;
19148-
atomic_store(&state->shared->n_active, n_threads);
19149-
atomic_store(&state->shared->node_task, task_phase);
19150-
}
19151-
else {
19152-
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
19063+
ggml_barrier(state);
19064+
19065+
/* FINALIZE */
19066+
if (GGML_OP_HAS_FINALIZE[node->op]) {
19067+
if (params.ith == 0) {
19068+
params.type = GGML_TASK_TYPE_FINALIZE;
19069+
ggml_compute_forward(&params, node, state);
19070+
}
19071+
ggml_barrier(state);
1915319072
}
1915419073
}
1915519074

@@ -19336,7 +19255,6 @@ static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state *
1933619255
// update the number of threads from the actual number of threads that we got from OpenMP
1933719256
n_threads = omp_get_num_threads();
1933819257
workers[0].shared->n_threads = n_threads;
19339-
workers[0].shared->n_active = n_threads;
1934019258
}
1934119259
ggml_graph_compute_thread(&workers[omp_get_thread_num()]);
1934219260
}
@@ -19399,9 +19317,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
1939919317
/*.perf_node_start_cycles =*/ 0,
1940019318
/*.perf_node_start_time_us =*/ 0,
1940119319
/*.n_threads =*/ n_threads,
19402-
/*.n_active =*/ n_threads,
19403-
/*.node_n =*/ -1,
19404-
/*.node_task =*/ GGML_TASK_TYPE_FINALIZE,
19320+
/*.n_barrier =*/ 0,
19321+
/*.n_barrier_passed =*/ 0,
1940519322
/*.abort_callback =*/ NULL,
1940619323
/*.abort_callback_data =*/ NULL,
1940719324
/*.current_chunk; =*/ 0,

0 commit comments

Comments
 (0)