From 07a8b549ba78b1e6f8f8e65a978425492f5c848a Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Thu, 30 Jan 2025 02:01:30 -0800 Subject: [PATCH 1/9] Initial implementation of NUMA-aware threading --- cpp/daal/include/services/daal_defines.h | 3 +- .../algorithms/covariance/covariance_impl.i | 42 ++++----- .../naivebayes/naivebayes_train_impl.i | 2 +- cpp/daal/src/threading/threading.cpp | 86 +++++++++++++++++-- cpp/daal/src/threading/threading.h | 48 +++++++++-- 5 files changed, 148 insertions(+), 33 deletions(-) diff --git a/cpp/daal/include/services/daal_defines.h b/cpp/daal/include/services/daal_defines.h index 3f2636da158..64d686d0b20 100644 --- a/cpp/daal/include/services/daal_defines.h +++ b/cpp/daal/include/services/daal_defines.h @@ -202,7 +202,8 @@ struct IsSameType static const bool value = true; }; -const size_t DAAL_MALLOC_DEFAULT_ALIGNMENT = 64; +constexpr size_t DAAL_MALLOC_DEFAULT_ALIGNMENT = 64; +constexpr size_t DAAL_MAX_NUMA_COUNT = 8; const int SERIALIZATION_HOMOGEN_NT_ID = 1000; const int SERIALIZATION_AOS_NT_ID = 3000; diff --git a/cpp/daal/src/algorithms/covariance/covariance_impl.i b/cpp/daal/src/algorithms/covariance/covariance_impl.i index 775f26fbef1..bc1fed54c98 100644 --- a/cpp/daal/src/algorithms/covariance/covariance_impl.i +++ b/cpp/daal/src/algorithms/covariance/covariance_impl.i @@ -37,6 +37,8 @@ #include "src/threading/threading.h" #include "src/externals/service_profiler.h" +#include + using namespace daal::internal; using namespace daal::services::internal; @@ -170,16 +172,10 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu services::Status status = hyperparameter->find(denseUpdateStepBlockSize, numRowsInBlock); DAAL_CHECK_STATUS_VAR(status); } - size_t numBlocks = nVectors / numRowsInBlock; - if (numBlocks * numRowsInBlock < nVectors) - { - numBlocks++; - } - size_t numRowsInLastBlock = numRowsInBlock + (nVectors - numBlocks * numRowsInBlock); /* TLS data initialization */ SafeStatus safeStat; - daal::static_tls *> tls_data([=, &safeStat]() { + daal::tls *> tls_data([=, &safeStat]() { auto tlsData = tls_data_t::create(isNormalized, nFeatures); if (!tlsData) { @@ -187,11 +183,15 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu } return tlsData; }); - DAAL_CHECK_SAFE_STATUS(); /* Threaded loop with syrk seq calls */ - daal::static_threader_for(numBlocks, [&](int iBlock, size_t tid) { - struct tls_data_t * tls_data_local = tls_data.local(tid); + daal::numa_threader_for(nVectors, numRowsInBlock, [&](size_t startRow, size_t endRow) { + size_t nRows = endRow - startRow; + if (startRow < 0 || endRow < 0 || endRow <= startRow || endRow > nVectors) + { + return; + } + struct tls_data_t * tls_data_local = tls_data.local(); if (!tls_data_local) { return; @@ -202,21 +202,23 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu algorithmFPType alpha = 1.0; algorithmFPType beta = 1.0; - size_t nRows = (iBlock < (numBlocks - 1)) ? numRowsInBlock : numRowsInLastBlock; - size_t startRow = iBlock * numRowsInBlock; - - ReadRows dataTableBD(dataTable, startRow, nRows); + ReadRows dataTableBD(dataTable, size_t(startRow), size_t(nRows)); DAAL_CHECK_BLOCK_STATUS_THR(dataTableBD); - algorithmFPType * dataBlock_local = const_cast(dataTableBD.get()); + algorithmFPType * dataBlockLocal = const_cast(dataTableBD.get()); + if (!dataBlockLocal) + { + safeStat.add(services::ErrorMemoryAllocationFailed); + } - DAAL_INT nFeatures_local = nFeatures; + DAAL_INT nFeaturesLocal = nFeatures; + DAAL_INT nRowsLocal = nRows; algorithmFPType * crossProduct_local = tls_data_local->crossProduct; algorithmFPType * sums_local = tls_data_local->sums; { DAAL_ITTNOTIFY_SCOPED_TASK(gemmData); - BlasInst::xxsyrk(&uplo, &trans, (DAAL_INT *)&nFeatures_local, (DAAL_INT *)&nRows, &alpha, dataBlock_local, - (DAAL_INT *)&nFeatures_local, &beta, crossProduct_local, (DAAL_INT *)&nFeatures_local); + BlasInst::xxsyrk(&uplo, &trans, (DAAL_INT *)&nFeaturesLocal, (DAAL_INT *)&nRowsLocal, &alpha, dataBlockLocal, + (DAAL_INT *)&nFeaturesLocal, &beta, crossProduct_local, (DAAL_INT *)&nFeaturesLocal); } if (!isNormalized && (method == defaultDense) && !assumeCentered) @@ -227,9 +229,9 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu { PRAGMA_IVDEP PRAGMA_VECTOR_ALWAYS - for (DAAL_INT j = 0; j < nFeatures_local; j++) + for (DAAL_INT j = 0; j < nFeaturesLocal; j++) { - sums_local[j] += dataBlock_local[i * nFeatures_local + j]; + sums_local[j] += dataBlockLocal[i * nFeaturesLocal + j]; } } } diff --git a/cpp/daal/src/algorithms/naivebayes/naivebayes_train_impl.i b/cpp/daal/src/algorithms/naivebayes/naivebayes_train_impl.i index 74a6732140d..55765886330 100644 --- a/cpp/daal/src/algorithms/naivebayes/naivebayes_train_impl.i +++ b/cpp/daal/src/algorithms/naivebayes/naivebayes_train_impl.i @@ -174,7 +174,7 @@ Status collectCounters(const Parameter * nbPar, NumericTable * ntData, NumericTa daal::tls tls_n_ci([=]() -> algorithmFPType * { return _CALLOC_(p * c); }); SafeStatus safeStat; - daal::threader_for_blocked(n, n, [=, &tls_n_ci, &safeStat](algorithmFPType j0, algorithmFPType jn) { + daal::threader_for_blocked(n, 1, [=, &tls_n_ci, &safeStat](algorithmFPType j0, algorithmFPType jn) { algorithmFPType * local_n_ci = tls_n_ci.local(); DAAL_CHECK_THR(local_n_ci, ErrorMemoryAllocationFailed); diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 5f82f1b6a42..50492382076 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -25,10 +25,11 @@ #include "services/daal_memory.h" #include "src/algorithms/service_qsort.h" -#define TBB_PREVIEW_GLOBAL_CONTROL 1 -#define TBB_PREVIEW_TASK_ARENA 1 +/// #define TBB_PREVIEW_GLOBAL_CONTROL 1 +/// #define TBB_PREVIEW_TASK_ARENA 1 #include // std::min +#include // std::vector #include // malloc and free #include #include @@ -37,6 +38,8 @@ #include #include "services/daal_atomic_int.h" +#include + #if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002 #include #endif @@ -75,6 +78,35 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle) // #endif } +DAAL_EXPORT size_t _initArenas() +{ +#if defined(TARGET_X86_64) + size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); + if (nNUMA > daal::DAAL_MAX_NUMA_COUNT) + { + return -1; + } + if (nNUMA > 1) + { + std::vector numa_indexes = tbb::info::numa_nodes(); + for (size_t i = 0; i < nNUMA; ++i) + { + tbb::task_arena * arena = new tbb::task_arena(); + arena->initialize(tbb::task_arena::constraints(numa_indexes[i])); + daal::threader_env()->setArena(i, arena); + } + } + else + { + tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); + arena->initialize(); + daal::threader_env()->setArena(0, arena); + } + +#endif + return 0; +} + DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle) { #if defined(TARGET_X86_64) @@ -84,7 +116,7 @@ DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle) *schedulerHandle = reinterpret_cast(new tbb::task_scheduler_handle(tbb::attach {})); #endif // It is necessary for initializing tbb in cases where DAAL does not use it. - tbb::task_arena {}.initialize(); + _initArenas(); #endif return 0; } @@ -161,6 +193,38 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v } } +DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func) +{ + if (daal::threader_env()->getNumberOfThreads() > 1) + { + const size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); + if (nNUMA > 1 && n > nNUMA * block * 2) + { + const size_t nPerNUMA = (n + nNUMA - 1) / nNUMA; + for (size_t i = 0; i < nNUMA; ++i) + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(i)); + const size_t startIter = i * nPerNUMA; + const size_t endIter = std::min(startIter + nPerNUMA, n); + + arena->execute([&]() { + tbb::parallel_for(tbb::blocked_range(startIter, endIter, block * 2), + [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + }); + } + } + else + { + tbb::parallel_for(tbb::blocked_range(0ul, n, block * 2), + [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + } + } + else + { + func(0ul, n, a); + } +} + DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) @@ -319,11 +383,11 @@ DAAL_PARALLEL_SORT_IMPL(daal::IdxValType, pair_fp64_uint64) #undef DAAL_PARALLEL_SORT_IMPL -DAAL_EXPORT void _daal_threader_for_blocked(int n, int reserved, const void * a, daal::functype2 func) +DAAL_EXPORT void _daal_threader_for_blocked(int n, size_t grainsize, const void * a, daal::functype2 func) { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { func(r.begin(), r.end() - r.begin(), a); }); + tbb::parallel_for(tbb::blocked_range(0, n, grainsize * 2), [&](tbb::blocked_range r) { func(r.begin(), r.end() - r.begin(), a); }); } else { @@ -836,4 +900,14 @@ DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr) } namespace daal -{} +{ +ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads()) +{ +#if defined(TARGET_X86_64) + _numberOfNUMANodes = tbb::info::numa_nodes().size(); +#else + _numberOfNUMANodes = 1; +#endif +} + +} // namespace daal diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 46e23067fec..a167fd58467 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -45,6 +45,7 @@ typedef void (*functype_int32ptr)(const int * i, const void * a); typedef void (*functype_static)(size_t i, size_t tid, const void * a); typedef void (*functype2)(int i, int n, const void * a); typedef void (*functype_blocked_size)(size_t first, size_t last, const void * a); +typedef void (*functype)(int i, const void * a); typedef void * (*tls_functype)(const void * a); typedef void (*tls_reduce_functype)(void * p, const void * a); typedef void (*functype_break)(int i, bool & needBreak, const void * a); @@ -65,8 +66,9 @@ extern "C" DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func); DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func); - DAAL_EXPORT void _daal_threader_for_blocked(int n, int threads_request, const void * a, daal::functype2 func); + DAAL_EXPORT void _daal_threader_for_blocked(int n, size_t grainsize, const void * a, daal::functype2 func); DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func); + DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func); DAAL_EXPORT void _daal_threader_for_optional(int n, int threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void * a, daal::functype_break func); @@ -105,6 +107,7 @@ extern "C" DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle); DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl); DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle); + DAAL_EXPORT size_t _initArenas(); DAAL_EXPORT void * _daal_threader_env(); @@ -167,12 +170,25 @@ inline void threaded_scalable_free(void * ptr) class ThreaderEnvironment { public: - ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads()) {} + ThreaderEnvironment(); size_t getNumberOfThreads() const { return _numberOfThreads; } void setNumberOfThreads(size_t value) { _numberOfThreads = value; } + size_t getNumberOfNUMANodes() const { return _numberOfNUMANodes; } + void * getArena(size_t i) const + { + if (i >= _numberOfNUMANodes) return nullptr; + return _arenas[i]; + } + + void setArena(size_t i, void * arena) + { + if (i < _numberOfNUMANodes) _arenas[i] = arena; + } private: size_t _numberOfThreads; + size_t _numberOfNUMANodes; + void * _arenas[DAAL_MAX_NUMA_COUNT]; }; inline ThreaderEnvironment * threader_env() @@ -185,9 +201,16 @@ inline size_t threader_get_threads_number() return threader_env()->getNumberOfThreads(); } +inline size_t threader_get_numa_number() +{ + return threader_env()->getNumberOfNUMANodes(); +} + inline size_t setSchedulerHandle(void ** schedulerHandle) { - return _setSchedulerHandle(schedulerHandle); + size_t status = _setSchedulerHandle(schedulerHandle); + if (!status) return status; + return _initArenas(); } inline size_t setNumberOfThreads(const size_t numThreads, void ** globalControl) @@ -216,6 +239,13 @@ inline void threader_func_b(int i0, int in, const void * a) func(i0, in); } +template +inline void threader_func_b_size_t(size_t i0, size_t in, const void * a) +{ + const F & func = *static_cast(a); + func(i0, in); +} + template inline void threader_func_break(int i, bool & needBreak, const void * a) { @@ -244,6 +274,14 @@ inline void threader_for(int n, int reserved, const F & func) _daal_threader_for(n, reserved, a, threader_func); } +template +inline void numa_threader_for(int n, int block, const F & func) +{ + const void * a = static_cast(&func); + + _daal_threader_for_blocked_numa(n, block, a, threader_func_b_size_t); +} + /// Pass a function to be executed in a for loop to the threading layer. /// The maximal number of iterations in the loop is `2^63 - 1 (INT64_MAX)`. /// The default scheduling of the threading layer is used to assign @@ -353,11 +391,11 @@ inline void static_threader_for(size_t n, const F & func) /// @param[in] func Callable object that processes the block of loop's iterations /// `[beginRange, endRange)`. template -inline void threader_for_blocked(int n, int reserved, const F & func) +inline void threader_for_blocked(int n, size_t grainsize, const F & func) { const void * a = static_cast(&func); - _daal_threader_for_blocked(n, reserved, a, threader_func_b); + _daal_threader_for_blocked(n, grainsize, a, threader_func_b); } template From f7754ced21c5d07c6b958da02fbc139c59e6c5a9 Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Fri, 31 Jan 2025 07:03:24 -0800 Subject: [PATCH 2/9] Add static NUMA-aware parallel loop --- cpp/daal/src/threading/threading.cpp | 94 +++++++++++++++++++++++++--- cpp/daal/src/threading/threading.h | 10 +++ 2 files changed, 97 insertions(+), 7 deletions(-) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 50492382076..6974e47b310 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -201,17 +201,28 @@ DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const v if (nNUMA > 1 && n > nNUMA * block * 2) { const size_t nPerNUMA = (n + nNUMA - 1) / nNUMA; + + tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; + tbb::task_arena * arenas[daal::DAAL_MAX_NUMA_COUNT]; for (size_t i = 0; i < nNUMA; ++i) { - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(i)); - const size_t startIter = i * nPerNUMA; - const size_t endIter = std::min(startIter + nPerNUMA, n); - - arena->execute([&]() { - tbb::parallel_for(tbb::blocked_range(startIter, endIter, block * 2), - [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); + + arenas[i]->execute([&]() { // Run each arena on a dedicated NUMA node + const size_t startIter = i * nPerNUMA; + const size_t endIter = std::min(startIter + nPerNUMA, n); + tg[i].run([&startIter, &endIter, &block, &func, &a]{ // Run in task group + tbb::parallel_for(tbb::blocked_range(startIter, endIter, block * 2), + [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + }); }); } + + for (size_t i = 0; i < nNUMA; ++i) + { + // Wait for completion of the task group in the all the arenas. + arenas[i]->execute([&]{ tg[i].wait(); }); + } } else { @@ -225,6 +236,75 @@ DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const v } } + +DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, const void * a, daal::functype_static func) +{ + const size_t nthreads = std::min(daal::threader_env()->getNumberOfThreads(), static_cast(_daal_threader_get_max_threads())); + if (nthreads > 1) + { + const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); + const size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); + if (nNUMA > 1) + { + tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; + tbb::task_arena * arenas[daal::DAAL_MAX_NUMA_COUNT]; + int startThreadIndex = 0; + for (size_t i = 0; i < nNUMA; ++i) + { + arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); + const int concurrency = arenas[i]->max_concurrency(); + arenas[i]->execute([&]() { // Run each arena on a dedicated NUMA node + tg[i].run([&n, &startThreadIndex, &concurrency, &nblocks_per_thread, &func, &a]{ // Run in task group + tbb::parallel_for( + tbb::blocked_range(startThreadIndex, startThreadIndex + concurrency, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + }); + }); + startThreadIndex += concurrency; + } + + for (size_t i = 0; i < nNUMA; ++i) + { + // Wait for completion of the task group in the all the arenas. + arenas[i]->execute([&]{ tg[i].wait(); }); + } + } + else + { + tbb::parallel_for( + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + } + } + else + { + for (size_t i = 0; i < n; i++) + { + func(i, 0, a); + } + } +} + DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, daal::functype func) { if (daal::threader_env()->getNumberOfThreads() > 1) diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index a167fd58467..86375187acc 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -66,6 +66,7 @@ extern "C" DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func); DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func); + DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, const void * a, daal::functype_static func); DAAL_EXPORT void _daal_threader_for_blocked(int n, size_t grainsize, const void * a, daal::functype2 func); DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func); DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func); @@ -282,6 +283,15 @@ inline void numa_threader_for(int n, int block, const F & func) _daal_threader_for_blocked_numa(n, block, a, threader_func_b_size_t); } +template +inline void static_numa_threader_for(int n, const F & func) +{ + const void * a = static_cast(&func); + + _daal_static_numa_threader_for(n, a, static_threader_func); +} + + /// Pass a function to be executed in a for loop to the threading layer. /// The maximal number of iterations in the loop is `2^63 - 1 (INT64_MAX)`. /// The default scheduling of the threading layer is used to assign From 77688dddabb7f7cdb2ae7ffe854ad9877cb03735 Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Fri, 31 Jan 2025 07:24:19 -0800 Subject: [PATCH 3/9] Static for in covariance --- .../algorithms/covariance/covariance_impl.i | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/cpp/daal/src/algorithms/covariance/covariance_impl.i b/cpp/daal/src/algorithms/covariance/covariance_impl.i index bc1fed54c98..4d2b6d77fcf 100644 --- a/cpp/daal/src/algorithms/covariance/covariance_impl.i +++ b/cpp/daal/src/algorithms/covariance/covariance_impl.i @@ -173,9 +173,16 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu DAAL_CHECK_STATUS_VAR(status); } + size_t numBlocks = nVectors / numRowsInBlock; + if (numBlocks * numRowsInBlock < nVectors) + { + numBlocks++; + } + size_t numRowsInLastBlock = numRowsInBlock + (nVectors - numBlocks * numRowsInBlock); + /* TLS data initialization */ SafeStatus safeStat; - daal::tls *> tls_data([=, &safeStat]() { + daal::static_tls *> tls_data([=, &safeStat]() { auto tlsData = tls_data_t::create(isNormalized, nFeatures); if (!tlsData) { @@ -185,18 +192,17 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu }); /* Threaded loop with syrk seq calls */ - daal::numa_threader_for(nVectors, numRowsInBlock, [&](size_t startRow, size_t endRow) { - size_t nRows = endRow - startRow; - if (startRow < 0 || endRow < 0 || endRow <= startRow || endRow > nVectors) - { - return; - } - struct tls_data_t * tls_data_local = tls_data.local(); + daal::static_numa_threader_for(numBlocks, [&](int iBlock, size_t tid) { + + struct tls_data_t * tls_data_local = tls_data.local(tid); if (!tls_data_local) { return; } + size_t nRows = (iBlock < (numBlocks - 1)) ? numRowsInBlock : numRowsInLastBlock; + size_t startRow = iBlock * numRowsInBlock; + char uplo = 'U'; char trans = 'N'; algorithmFPType alpha = 1.0; From 7741b9ca47000eb6d671ff2332316fcd384e563f Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Fri, 31 Jan 2025 07:28:22 -0800 Subject: [PATCH 4/9] Static for in covariance --- .../algorithms/covariance/covariance_impl.i | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/cpp/daal/src/algorithms/covariance/covariance_impl.i b/cpp/daal/src/algorithms/covariance/covariance_impl.i index 4d2b6d77fcf..de53d09d393 100644 --- a/cpp/daal/src/algorithms/covariance/covariance_impl.i +++ b/cpp/daal/src/algorithms/covariance/covariance_impl.i @@ -193,38 +193,32 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu /* Threaded loop with syrk seq calls */ daal::static_numa_threader_for(numBlocks, [&](int iBlock, size_t tid) { - struct tls_data_t * tls_data_local = tls_data.local(tid); if (!tls_data_local) { return; } - size_t nRows = (iBlock < (numBlocks - 1)) ? numRowsInBlock : numRowsInLastBlock; - size_t startRow = iBlock * numRowsInBlock; - char uplo = 'U'; char trans = 'N'; algorithmFPType alpha = 1.0; algorithmFPType beta = 1.0; - ReadRows dataTableBD(dataTable, size_t(startRow), size_t(nRows)); + size_t nRows = (iBlock < (numBlocks - 1)) ? numRowsInBlock : numRowsInLastBlock; + size_t startRow = iBlock * numRowsInBlock; + + ReadRows dataTableBD(dataTable, startRow, nRows); DAAL_CHECK_BLOCK_STATUS_THR(dataTableBD); - algorithmFPType * dataBlockLocal = const_cast(dataTableBD.get()); - if (!dataBlockLocal) - { - safeStat.add(services::ErrorMemoryAllocationFailed); - } + algorithmFPType * dataBlock_local = const_cast(dataTableBD.get()); - DAAL_INT nFeaturesLocal = nFeatures; - DAAL_INT nRowsLocal = nRows; + DAAL_INT nFeatures_local = nFeatures; algorithmFPType * crossProduct_local = tls_data_local->crossProduct; algorithmFPType * sums_local = tls_data_local->sums; { DAAL_ITTNOTIFY_SCOPED_TASK(gemmData); - BlasInst::xxsyrk(&uplo, &trans, (DAAL_INT *)&nFeaturesLocal, (DAAL_INT *)&nRowsLocal, &alpha, dataBlockLocal, - (DAAL_INT *)&nFeaturesLocal, &beta, crossProduct_local, (DAAL_INT *)&nFeaturesLocal); + BlasInst::xxsyrk(&uplo, &trans, (DAAL_INT *)&nFeatures_local, (DAAL_INT *)&nRows, &alpha, dataBlock_local, + (DAAL_INT *)&nFeatures_local, &beta, crossProduct_local, (DAAL_INT *)&nFeatures_local); } if (!isNormalized && (method == defaultDense) && !assumeCentered) @@ -235,9 +229,9 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu { PRAGMA_IVDEP PRAGMA_VECTOR_ALWAYS - for (DAAL_INT j = 0; j < nFeaturesLocal; j++) + for (DAAL_INT j = 0; j < nFeatures_local; j++) { - sums_local[j] += dataBlockLocal[i * nFeaturesLocal + j]; + sums_local[j] += dataBlock_local[i * nFeatures_local + j]; } } } From 79f62a15774c22c00c88cc181447694c9c1cc1aa Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Tue, 11 Feb 2025 06:53:17 -0800 Subject: [PATCH 5/9] Add parallelism limiting in Covariance to reduce threading overheads --- .../algorithms/covariance/covariance_impl.i | 15 ++- cpp/daal/src/threading/threading.cpp | 98 +++++++++++-------- cpp/daal/src/threading/threading.h | 18 +++- 3 files changed, 82 insertions(+), 49 deletions(-) diff --git a/cpp/daal/src/algorithms/covariance/covariance_impl.i b/cpp/daal/src/algorithms/covariance/covariance_impl.i index de53d09d393..d808ff67e35 100644 --- a/cpp/daal/src/algorithms/covariance/covariance_impl.i +++ b/cpp/daal/src/algorithms/covariance/covariance_impl.i @@ -36,7 +36,8 @@ #include "src/algorithms/service_error_handling.h" #include "src/threading/threading.h" #include "src/externals/service_profiler.h" - +#include "src/services/service_environment.h" // getL2CacheSize() +#include #include using namespace daal::internal; @@ -164,7 +165,6 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu { /* Inverse number of rows (for normalization) */ algorithmFPType nVectorsInv = 1.0 / (double)(nVectors); - /* Split rows by blocks */ DAAL_INT64 numRowsInBlock = getBlockSize(nVectors); if (hyperparameter) @@ -173,6 +173,15 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu DAAL_CHECK_STATUS_VAR(status); } + /// TODO: make a hyperparameter + constexpr double cacheCoeff = 0.8; + + const size_t l2Size = std::max(getL2CacheSize(), 256ul * 1024ul); + const size_t nValuesPerThread = l2Size * cacheCoeff / sizeof(algorithmFPType); + const size_t nValues = nVectors * nFeatures; + + const size_t maxNThreads = (nValues > nValuesPerThread ? nValues / nValuesPerThread : 1); + size_t numBlocks = nVectors / numRowsInBlock; if (numBlocks * numRowsInBlock < nVectors) { @@ -192,7 +201,7 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu }); /* Threaded loop with syrk seq calls */ - daal::static_numa_threader_for(numBlocks, [&](int iBlock, size_t tid) { + daal::static_numa_threader_for(numBlocks, maxNThreads, [&](int iBlock, size_t tid) { struct tls_data_t * tls_data_local = tls_data.local(tid); if (!tls_data_local) { diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 6974e47b310..30bb1708762 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -193,57 +193,28 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v } } -DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func) +DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t maxConcurrency, const void * a, daal::functype_static func) { - if (daal::threader_env()->getNumberOfThreads() > 1) + const size_t nthreadsInEnv = daal::threader_env()->getNumberOfThreads(); + const size_t nthreads = std::min(nthreadsInEnv, maxConcurrency); + if (nthreads > 1) { - const size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); - if (nNUMA > 1 && n > nNUMA * block * 2) + size_t nNUMA = 0; + if (maxConcurrency < nthreadsInEnv) { - const size_t nPerNUMA = (n + nNUMA - 1) / nNUMA; - - tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; - tbb::task_arena * arenas[daal::DAAL_MAX_NUMA_COUNT]; - for (size_t i = 0; i < nNUMA; ++i) + nNUMA = 1; + size_t nthreadsInNUMA = daal::threader_env()->getArenaConcurrency(0); + while (nthreadsInNUMA < nthreads) { - arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); - - arenas[i]->execute([&]() { // Run each arena on a dedicated NUMA node - const size_t startIter = i * nPerNUMA; - const size_t endIter = std::min(startIter + nPerNUMA, n); - tg[i].run([&startIter, &endIter, &block, &func, &a]{ // Run in task group - tbb::parallel_for(tbb::blocked_range(startIter, endIter, block * 2), - [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); - }); - }); - } - - for (size_t i = 0; i < nNUMA; ++i) - { - // Wait for completion of the task group in the all the arenas. - arenas[i]->execute([&]{ tg[i].wait(); }); + nthreadsInNUMA += daal::threader_env()->getArenaConcurrency(nNUMA); + nNUMA++; } } else { - tbb::parallel_for(tbb::blocked_range(0ul, n, block * 2), - [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + nNUMA = daal::threader_env()->getNumberOfNUMANodes(); } - } - else - { - func(0ul, n, a); - } -} - - -DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, const void * a, daal::functype_static func) -{ - const size_t nthreads = std::min(daal::threader_env()->getNumberOfThreads(), static_cast(_daal_threader_get_max_threads())); - if (nthreads > 1) - { const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); - const size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); if (nNUMA > 1) { tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; @@ -252,7 +223,7 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, const void * a, daal:: for (size_t i = 0; i < nNUMA; ++i) { arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); - const int concurrency = arenas[i]->max_concurrency(); + const int concurrency = std::min(arenas[i]->max_concurrency(), int(maxConcurrency) - startThreadIndex); arenas[i]->execute([&]() { // Run each arena on a dedicated NUMA node tg[i].run([&n, &startThreadIndex, &concurrency, &nblocks_per_thread, &func, &a]{ // Run in task group tbb::parallel_for( @@ -436,6 +407,36 @@ DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::funct } } +DAAL_EXPORT void _daal_static_limited_threader_for(size_t n, size_t max_threads, const void * a, daal::functype_static func) +{ + const size_t nthreads = std::min(daal::threader_env()->getNumberOfThreads(), max_threads); + if (nthreads > 1) + { + const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); + + tbb::parallel_for( + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + } + else + { + for (size_t i = 0; i < n; i++) + { + func(i, 0, a); + } + } +} + template DAAL_EXPORT void _daal_parallel_sort_template(F * begin_p, F * end_p) { @@ -990,4 +991,17 @@ ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(_daal_threader_get #endif } +int ThreaderEnvironment::getArenaConcurrency(size_t i) const +{ +#if defined(TARGET_X86_64) + if (i < _numberOfNUMANodes) + { + tbb::task_arena * arena = reinterpret_cast(_arenas[i]); + return arena->max_concurrency(); + } + return 0; +#else + return 0; +#endif +} } // namespace daal diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 86375187acc..ea6c9506151 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -66,7 +66,8 @@ extern "C" DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func); DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func); - DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, const void * a, daal::functype_static func); + DAAL_EXPORT void _daal_static_limited_threader_for(size_t n, size_t max_threads, const void * a, daal::functype_static func); + DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, const void * a, daal::functype_static func); DAAL_EXPORT void _daal_threader_for_blocked(int n, size_t grainsize, const void * a, daal::functype2 func); DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func); DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func); @@ -186,10 +187,11 @@ class ThreaderEnvironment if (i < _numberOfNUMANodes) _arenas[i] = arena; } + int getArenaConcurrency(size_t i) const; private: size_t _numberOfThreads; size_t _numberOfNUMANodes; - void * _arenas[DAAL_MAX_NUMA_COUNT]; + void * _arenas[8]; }; inline ThreaderEnvironment * threader_env() @@ -284,11 +286,11 @@ inline void numa_threader_for(int n, int block, const F & func) } template -inline void static_numa_threader_for(int n, const F & func) +inline void static_numa_threader_for(int n, size_t max_threads, const F & func) { const void * a = static_cast(&func); - _daal_static_numa_threader_for(n, a, static_threader_func); + _daal_static_numa_threader_for(n, max_threads, a, static_threader_func); } @@ -384,6 +386,14 @@ inline void static_threader_for(size_t n, const F & func) _daal_static_threader_for(n, a, static_threader_func); } +template +inline void static_limited_threader_for(size_t n, size_t max_threads, const F & func) +{ + const void * a = static_cast(&func); + + _daal_static_limited_threader_for(n, max_threads, a, static_threader_func); +} + /// Pass a function to be executed in a for loop to the threading layer. /// The maximal number of iterations in the loop is `2^31 - 1 INT32_MAX`. /// The default scheduling of the threading layer is used to assign From f402ecc7bc1178a0a92b60de5648fb63a240fd32 Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Wed, 12 Feb 2025 03:57:21 -0800 Subject: [PATCH 6/9] Default arena added; thread pinning handling implermanted. --- .../algorithms/covariance/covariance_impl.i | 11 +- cpp/daal/src/threading/threading.cpp | 569 +++++++++++++----- cpp/daal/src/threading/threading.h | 20 +- 3 files changed, 426 insertions(+), 174 deletions(-) diff --git a/cpp/daal/src/algorithms/covariance/covariance_impl.i b/cpp/daal/src/algorithms/covariance/covariance_impl.i index d808ff67e35..1901f547acd 100644 --- a/cpp/daal/src/algorithms/covariance/covariance_impl.i +++ b/cpp/daal/src/algorithms/covariance/covariance_impl.i @@ -37,8 +37,6 @@ #include "src/threading/threading.h" #include "src/externals/service_profiler.h" #include "src/services/service_environment.h" // getL2CacheSize() -#include -#include using namespace daal::internal; using namespace daal::services::internal; @@ -164,7 +162,7 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu if (((isNormalized) || ((!isNormalized) && ((method == defaultDense) || (method == sumDense))))) { /* Inverse number of rows (for normalization) */ - algorithmFPType nVectorsInv = 1.0 / (double)(nVectors); + const algorithmFPType nVectorsInv = 1.0 / (double)(nVectors); /* Split rows by blocks */ DAAL_INT64 numRowsInBlock = getBlockSize(nVectors); if (hyperparameter) @@ -173,13 +171,14 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu DAAL_CHECK_STATUS_VAR(status); } - /// TODO: make a hyperparameter + /* TODO: make a hyperparameter */ constexpr double cacheCoeff = 0.8; - const size_t l2Size = std::max(getL2CacheSize(), 256ul * 1024ul); + const size_t l2Size = (getL2CacheSize() > 256 * 1024 ? getL2CacheSize() : 256 * 1024); const size_t nValuesPerThread = l2Size * cacheCoeff / sizeof(algorithmFPType); - const size_t nValues = nVectors * nFeatures; + const size_t nValues = nVectors * nFeatures; + /* Maximal number of threads to use in parallel region */ const size_t maxNThreads = (nValues > nValuesPerThread ? nValues / nValuesPerThread : 1); size_t numBlocks = nVectors / numRowsInBlock; diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 30bb1708762..750dbd6253f 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -24,6 +24,9 @@ #include "src/threading/threading.h" #include "services/daal_memory.h" #include "src/algorithms/service_qsort.h" +#include "src/services/service_defines.h" +#include "src/services/service_topo.h" +#include "src/threading/service_thread_pinner.h" /// #define TBB_PREVIEW_GLOBAL_CONTROL 1 /// #define TBB_PREVIEW_TASK_ARENA 1 @@ -78,10 +81,14 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle) // #endif } -DAAL_EXPORT size_t _initArenas() +size_t _initArenas() { -#if defined(TARGET_X86_64) - size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); + tbb::task_arena * safeArena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); + safeArena->initialize(); + daal::threader_env()->setSafeArena(safeArena); + + size_t nNUMA = 1; + DAAL_SAFE_CPU_CALL((nNUMA = daal::threader_env()->getNumberOfNUMANodes()), (nNUMA = 1)); if (nNUMA > daal::DAAL_MAX_NUMA_COUNT) { return -1; @@ -91,19 +98,11 @@ DAAL_EXPORT size_t _initArenas() std::vector numa_indexes = tbb::info::numa_nodes(); for (size_t i = 0; i < nNUMA; ++i) { - tbb::task_arena * arena = new tbb::task_arena(); + tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); arena->initialize(tbb::task_arena::constraints(numa_indexes[i])); daal::threader_env()->setArena(i, arena); } } - else - { - tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); - arena->initialize(); - daal::threader_env()->setArena(0, arena); - } - -#endif return 0; } @@ -125,11 +124,27 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalCo { static tbb::spin_mutex mt; tbb::spin_mutex::scoped_lock lock(mt); + + tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); + delete safeArena; + size_t nNUMA = 1; + DAAL_SAFE_CPU_CALL((nNUMA = daal::threader_env()->getNumberOfNUMANodes()), (nNUMA = 1)); + if (nNUMA) + { + std::vector numa_indexes = tbb::info::numa_nodes(); + for (size_t i = 0; i < nNUMA; ++i) + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(i)); + delete arena; + } + } if (numThreads != 0) { _daal_tbb_task_scheduler_free(*globalControl); *globalControl = reinterpret_cast(new tbb::global_control(tbb::global_control::max_allowed_parallelism, numThreads)); daal::threader_env()->setNumberOfThreads(numThreads); + daal::threader_env()->setNumberOfNUMANodes(tbb::info::numa_nodes().size()); + _initArenas(); return numThreads; } daal::threader_env()->setNumberOfThreads(1); @@ -140,13 +155,39 @@ DAAL_EXPORT void _daal_threader_for(int n, int reserved, const void * a, daal::f { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { - int i; - for (i = r.begin(); i < r.end(); i++) - { - func(i, a); - } - }); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + // The task is already runnig in thread pinner arena + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }); + } + else +#endif + { + // Run the task in the default arena + tbb::task_group tg; + tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); + safeArena->execute([&]() { + tg.run([&n, &a, &func] { + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }); + }); + }); + + safeArena->execute([&] { tg.wait(); }); + } } else { @@ -162,13 +203,37 @@ DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::funct { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { - int64_t i; - for (i = r.begin(); i < r.end(); i++) - { - func(i, a); - } - }); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + int64_t i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }); + } + else +#endif + { + // Run the task in the default arena + tbb::task_group tg; + tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); + safeArena->execute([&]() { + tg.run([&n, &a, &func] { + tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { + int64_t i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }); + }); + }); + safeArena->execute([&] { tg.wait(); }); + } } else { @@ -184,8 +249,26 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(0ul, n, block), - [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + tbb::parallel_for(tbb::blocked_range(0ul, n, block), + [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + } + else +#endif + { + tbb::task_group tg; + tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); + safeArena->execute([&]() { + tg.run([&n, &block, &a, &func] { + tbb::parallel_for(tbb::blocked_range(0ul, n, block), + [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); + }); + }); + safeArena->execute([&] { tg.wait(); }); + } } else { @@ -193,16 +276,16 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v } } -DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t maxConcurrency, const void * a, daal::functype_static func) +DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, const void * a, daal::functype_static func) { const size_t nthreadsInEnv = daal::threader_env()->getNumberOfThreads(); - const size_t nthreads = std::min(nthreadsInEnv, maxConcurrency); + const size_t nthreads = std::min(nthreadsInEnv, max_threads); if (nthreads > 1) { size_t nNUMA = 0; - if (maxConcurrency < nthreadsInEnv) + if (max_threads < nthreadsInEnv) { - nNUMA = 1; + nNUMA = 1; size_t nthreadsInNUMA = daal::threader_env()->getArenaConcurrency(0); while (nthreadsInNUMA < nthreads) { @@ -215,19 +298,40 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t maxConcurrency, nNUMA = daal::threader_env()->getNumberOfNUMANodes(); } const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); - if (nNUMA > 1) +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + tbb::parallel_for( + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + } + else +#endif { - tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; - tbb::task_arena * arenas[daal::DAAL_MAX_NUMA_COUNT]; - int startThreadIndex = 0; - for (size_t i = 0; i < nNUMA; ++i) + if (nNUMA > 1) { - arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); - const int concurrency = std::min(arenas[i]->max_concurrency(), int(maxConcurrency) - startThreadIndex); - arenas[i]->execute([&]() { // Run each arena on a dedicated NUMA node - tg[i].run([&n, &startThreadIndex, &concurrency, &nblocks_per_thread, &func, &a]{ // Run in task group - tbb::parallel_for( - tbb::blocked_range(startThreadIndex, startThreadIndex + concurrency, 1), + tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; + tbb::task_arena * arenas[daal::DAAL_MAX_NUMA_COUNT]; + int startThreadIndex = 0; + for (size_t i = 0; i < nNUMA; ++i) + { + arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); + const int concurrency = std::min(arenas[i]->max_concurrency(), int(max_threads) - startThreadIndex); + arenas[i]->execute([&]() { // Run each arena on a dedicated NUMA node + tg[i].run([&n, &startThreadIndex, &concurrency, &nblocks_per_thread, &func, &a] { // Run in task group + tbb::parallel_for( + tbb::blocked_range(startThreadIndex, startThreadIndex + concurrency, 1), [&](tbb::blocked_range r) { const size_t tid = r.begin(); const size_t begin = tid * nblocks_per_thread; @@ -239,33 +343,41 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t maxConcurrency, } }, tbb::static_partitioner()); - }); - }); - startThreadIndex += concurrency; - } + }); + }); + startThreadIndex += concurrency; + } - for (size_t i = 0; i < nNUMA; ++i) + for (size_t i = 0; i < nNUMA; ++i) + { + // Wait for completion of the task group in the all the arenas. + arenas[i]->execute([&] { tg[i].wait(); }); + } + } + else { - // Wait for completion of the task group in the all the arenas. - arenas[i]->execute([&]{ tg[i].wait(); }); + tbb::task_group tg; + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + arena->execute([&]() { // Run parallel task in arena + tg.run([&n, &nthreads, &nblocks_per_thread, &func, &a] { + tbb::parallel_for( + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + }); + }); + arena->execute([&] { tg.wait(); }); } } - else - { - tbb::parallel_for( - tbb::blocked_range(0, nthreads, 1), - [&](tbb::blocked_range r) { - const size_t tid = r.begin(); - const size_t begin = tid * nblocks_per_thread; - const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; - - for (size_t i = begin; i < end; ++i) - { - func(i, tid, a); - } - }, - tbb::static_partitioner()); - } } else { @@ -280,16 +392,42 @@ DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for( - tbb::blocked_range(0, n, 1), - [&](tbb::blocked_range r) { - int i; - for (i = r.begin(); i < r.end(); i++) - { - func(i, a); - } - }, - tbb::simple_partitioner {}); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + tbb::parallel_for( + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }, + tbb::simple_partitioner {}); + } + else +#endif + { + tbb::task_group tg; + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + arena->execute([&]() { // Run parallel task in arena + tg.run([&n, &func, &a] { + tbb::parallel_for( + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); i++) + { + func(i, a); + } + }, + tbb::simple_partitioner {}); + }); + }); + arena->execute([&] { tg.wait(); }); + } } else { @@ -305,13 +443,36 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { - const int * i; - for (i = r.begin(); i != r.end(); i++) - { - func(i, a); - } - }); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { + const int * i; + for (i = r.begin(); i != r.end(); i++) + { + func(i, a); + } + }); + } + else +#endif + { + tbb::task_group tg; + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + arena->execute([&]() { // Run parallel task in arena + tg.run([&begin, &end, &func, &a] { + tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { + const int * i; + for (i = r.begin(); i != r.end(); i++) + { + func(i, a); + } + }); + }); + }); + arena->execute([&] { tg.wait(); }); + } } else { @@ -328,10 +489,28 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, c { if (daal::threader_env()->getNumberOfThreads() > 1) { - return tbb::parallel_reduce( - tbb::blocked_range(0, n), init, - [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, - [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::auto_partitioner {}); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + return tbb::parallel_reduce( + tbb::blocked_range(0, n), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::auto_partitioner {}); + } + else +#endif + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + return arena->execute([&]() -> int64_t { // Run parallel task in arena + return tbb::parallel_reduce( + tbb::blocked_range(0, n), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { + return loop_func(r.begin(), r.end(), value_for_reduce, a); + }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::auto_partitioner {}); + }); + } } else { @@ -345,10 +524,28 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64_simple(int32_t n, int64_t { if (daal::threader_env()->getNumberOfThreads() > 1) { - return tbb::parallel_reduce( - tbb::blocked_range(0, n), init, - [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, - [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + return tbb::parallel_reduce( + tbb::blocked_range(0, n), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { return loop_func(r.begin(), r.end(), value_for_reduce, a); }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + } + else +#endif + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + return arena->execute([&]() { // Run parallel task in arena + return tbb::parallel_reduce( + tbb::blocked_range(0, n), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { + return loop_func(r.begin(), r.end(), value_for_reduce, a); + }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + }); + } } else { @@ -363,12 +560,30 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32ptr_int64_simple(const int32_t * { if (daal::threader_env()->getNumberOfThreads() > 1) { - return tbb::parallel_reduce( - tbb::blocked_range(begin, end), init, - [&](const tbb::blocked_range & r, int64_t value_for_reduce) { - return loop_func(r.begin(), r.end(), value_for_reduce, a); - }, - [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + return tbb::parallel_reduce( + tbb::blocked_range(begin, end), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { + return loop_func(r.begin(), r.end(), value_for_reduce, a); + }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + } + else +#endif + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + return arena->execute([&]() { // Run parallel task in arena + return tbb::parallel_reduce( + tbb::blocked_range(begin, end), init, + [&](const tbb::blocked_range & r, int64_t value_for_reduce) { + return loop_func(r.begin(), r.end(), value_for_reduce, a); + }, + [&](int64_t x, int64_t y) { return reduction_func(x, y, b); }, tbb::simple_partitioner {}); + }); + } } else { @@ -384,49 +599,48 @@ DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::funct { const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); - tbb::parallel_for( - tbb::blocked_range(0, nthreads, 1), - [&](tbb::blocked_range r) { - const size_t tid = r.begin(); - const size_t begin = tid * nblocks_per_thread; - const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; - - for (size_t i = begin; i < end; ++i) - { - func(i, tid, a); - } - }, - tbb::static_partitioner()); - } - else - { - for (size_t i = 0; i < n; i++) +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) { - func(i, 0, a); + tbb::parallel_for( + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + } + else +#endif + { + tbb::task_group tg; + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + arena->execute([&]() { // Run parallel task in arena + tg.run([&n, &nthreads, &nblocks_per_thread, &func, &a] { + tbb::parallel_for( + tbb::blocked_range(0, nthreads, 1), + [&](tbb::blocked_range r) { + const size_t tid = r.begin(); + const size_t begin = tid * nblocks_per_thread; + const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; + + for (size_t i = begin; i < end; ++i) + { + func(i, tid, a); + } + }, + tbb::static_partitioner()); + }); + }); + arena->execute([&] { tg.wait(); }); } - } -} - -DAAL_EXPORT void _daal_static_limited_threader_for(size_t n, size_t max_threads, const void * a, daal::functype_static func) -{ - const size_t nthreads = std::min(daal::threader_env()->getNumberOfThreads(), max_threads); - if (nthreads > 1) - { - const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); - - tbb::parallel_for( - tbb::blocked_range(0, nthreads, 1), - [&](tbb::blocked_range r) { - const size_t tid = r.begin(); - const size_t begin = tid * nblocks_per_thread; - const size_t end = n < begin + nblocks_per_thread ? n : begin + nblocks_per_thread; - - for (size_t i = begin; i < end; ++i) - { - func(i, tid, a); - } - }, - tbb::static_partitioner()); } else { @@ -442,7 +656,22 @@ DAAL_EXPORT void _daal_parallel_sort_template(F * begin_p, F * end_p) { if (daal::threader_env()->getNumberOfThreads() > 1) { - tbb::parallel_sort(begin_p, end_p); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + tbb::parallel_sort(begin_p, end_p); + } + else +#endif + { + tbb::task_group tg; + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + arena->execute([&]() { // Run parallel task in arena + tg.run([&begin_p, &end_p] { tbb::parallel_sort(begin_p, end_p); }); + }); + arena->execute([&] { tg.wait(); }); + } } else { @@ -504,18 +733,46 @@ DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void if (daal::threader_env()->getNumberOfThreads() > 1) { tbb::task_group_context context; - tbb::parallel_for( - tbb::blocked_range(0, n, 1), - [&](tbb::blocked_range r) { - int i; - for (i = r.begin(); i < r.end(); ++i) - { - bool needBreak = false; - func(i, needBreak, a); - if (needBreak) context.cancel_group_execution(); - } - }, - context); +#if !(defined DAAL_THREAD_PINNING_DISABLED) + daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); + if (pinner != NULL && pinner->get_pinning()) + { + tbb::parallel_for( + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); ++i) + { + bool needBreak = false; + func(i, needBreak, a); + if (needBreak) context.cancel_group_execution(); + } + }, + context); + } + else +#endif + { + tbb::task_group tg; + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + arena->execute([&]() { // Run parallel task in arena + tg.run([&n, &threads_request, &a, &func, &context] { + tbb::parallel_for( + tbb::blocked_range(0, n, 1), + [&](tbb::blocked_range r) { + int i; + for (i = r.begin(); i < r.end(); ++i) + { + bool needBreak = false; + func(i, needBreak, a); + if (needBreak) context.cancel_group_execution(); + } + }, + context); + }); + }); + arena->execute([&] { tg.wait(); }); + } } else { @@ -654,18 +911,18 @@ class SimpleAllocator }; template -class Collection +class StorageCollection { public: /** * Default constructor. Sets the size and capacity to 0. */ - Collection() : _array(NULL), _size(0), _capacity(0) {} + StorageCollection() : _array(NULL), _size(0), _capacity(0) {} /** * Destructor */ - virtual ~Collection() + virtual ~StorageCollection() { for (size_t i = 0; i < _capacity; i++) _array[i].~T(); Allocator::free(_array); @@ -900,8 +1157,8 @@ class LocalStorage private: void * _a; daal::tls_functype _func; - Collection _free; //sorted by tid - Collection _used; //sorted by value + StorageCollection _free; //sorted by tid + StorageCollection _used; //sorted by value tbb::spin_mutex _mt; }; diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index ea6c9506151..3aa314c3319 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -66,11 +66,9 @@ extern "C" DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func); DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func); - DAAL_EXPORT void _daal_static_limited_threader_for(size_t n, size_t max_threads, const void * a, daal::functype_static func); DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, const void * a, daal::functype_static func); DAAL_EXPORT void _daal_threader_for_blocked(int n, size_t grainsize, const void * a, daal::functype2 func); DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func); - DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func); DAAL_EXPORT void _daal_threader_for_optional(int n, int threads_request, const void * a, daal::functype func); DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void * a, daal::functype_break func); @@ -176,22 +174,29 @@ class ThreaderEnvironment size_t getNumberOfThreads() const { return _numberOfThreads; } void setNumberOfThreads(size_t value) { _numberOfThreads = value; } size_t getNumberOfNUMANodes() const { return _numberOfNUMANodes; } + void setNumberOfNUMANodes(size_t value) { _numberOfNUMANodes = value; } void * getArena(size_t i) const { if (i >= _numberOfNUMANodes) return nullptr; return _arenas[i]; } + void * getSafeArena() const { return _safeArena; } + void setArena(size_t i, void * arena) { if (i < _numberOfNUMANodes) _arenas[i] = arena; } + void setSafeArena(void * arena) { _safeArena = arena; } + int getArenaConcurrency(size_t i) const; + private: size_t _numberOfThreads; size_t _numberOfNUMANodes; - void * _arenas[8]; + void * _arenas[daal::DAAL_MAX_NUMA_COUNT]; // NUMA-aware arenas + void * _safeArena; // default arena }; inline ThreaderEnvironment * threader_env() @@ -293,7 +298,6 @@ inline void static_numa_threader_for(int n, size_t max_threads, const F & func) _daal_static_numa_threader_for(n, max_threads, a, static_threader_func); } - /// Pass a function to be executed in a for loop to the threading layer. /// The maximal number of iterations in the loop is `2^63 - 1 (INT64_MAX)`. /// The default scheduling of the threading layer is used to assign @@ -386,14 +390,6 @@ inline void static_threader_for(size_t n, const F & func) _daal_static_threader_for(n, a, static_threader_func); } -template -inline void static_limited_threader_for(size_t n, size_t max_threads, const F & func) -{ - const void * a = static_cast(&func); - - _daal_static_limited_threader_for(n, max_threads, a, static_threader_func); -} - /// Pass a function to be executed in a for loop to the threading layer. /// The maximal number of iterations in the loop is `2^31 - 1 INT32_MAX`. /// The default scheduling of the threading layer is used to assign From d7dafc19d3a00fb4ea036442873ed317cc8bdc25 Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Thu, 13 Feb 2025 01:36:16 -0800 Subject: [PATCH 7/9] Fix failures in graph algorithms --- cpp/daal/src/threading/threading.cpp | 32 ++++++++++++++++++++++------ cpp/daal/src/threading/threading.h | 31 ++++++--------------------- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 750dbd6253f..ef998c0cc65 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -28,11 +28,7 @@ #include "src/services/service_topo.h" #include "src/threading/service_thread_pinner.h" -/// #define TBB_PREVIEW_GLOBAL_CONTROL 1 -/// #define TBB_PREVIEW_TASK_ARENA 1 - #include // std::min -#include // std::vector #include // malloc and free #include #include @@ -41,8 +37,6 @@ #include #include "services/daal_atomic_int.h" -#include - #if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002 #include #endif @@ -103,6 +97,18 @@ size_t _initArenas() daal::threader_env()->setArena(i, arena); } } + daal::threader_env()->setInitialized(true); + return 0; +} + +size_t _initArenasThreadsafe() +{ + if (!daal::threader_env()->isInitialized()) + { + static tbb::spin_mutex mt; + tbb::spin_mutex::scoped_lock lock(mt); + return _initArenas(); + } return 0; } @@ -171,6 +177,7 @@ DAAL_EXPORT void _daal_threader_for(int n, int reserved, const void * a, daal::f else #endif { + _initArenasThreadsafe(); // Run the task in the default arena tbb::task_group tg; tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); @@ -218,6 +225,7 @@ DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::funct else #endif { + _initArenasThreadsafe(); // Run the task in the default arena tbb::task_group tg; tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); @@ -259,6 +267,7 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v else #endif { + _initArenasThreadsafe(); tbb::task_group tg; tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); safeArena->execute([&]() { @@ -319,6 +328,7 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, co else #endif { + _initArenasThreadsafe(); if (nNUMA > 1) { tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; @@ -410,6 +420,7 @@ DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, else #endif { + _initArenasThreadsafe(); tbb::task_group tg; tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); arena->execute([&]() { // Run parallel task in arena @@ -458,6 +469,7 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, else #endif { + _initArenasThreadsafe(); tbb::task_group tg; tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); arena->execute([&]() { // Run parallel task in arena @@ -501,6 +513,7 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, c else #endif { + _initArenasThreadsafe(); tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); return arena->execute([&]() -> int64_t { // Run parallel task in arena return tbb::parallel_reduce( @@ -536,6 +549,7 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64_simple(int32_t n, int64_t else #endif { + _initArenasThreadsafe(); tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); return arena->execute([&]() { // Run parallel task in arena return tbb::parallel_reduce( @@ -574,6 +588,7 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32ptr_int64_simple(const int32_t * else #endif { + _initArenasThreadsafe(); tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); return arena->execute([&]() { // Run parallel task in arena return tbb::parallel_reduce( @@ -620,6 +635,7 @@ DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::funct else #endif { + _initArenasThreadsafe(); tbb::task_group tg; tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); arena->execute([&]() { // Run parallel task in arena @@ -665,6 +681,7 @@ DAAL_EXPORT void _daal_parallel_sort_template(F * begin_p, F * end_p) else #endif { + _initArenasThreadsafe(); tbb::task_group tg; tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); arena->execute([&]() { // Run parallel task in arena @@ -753,6 +770,7 @@ DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void else #endif { + _initArenasThreadsafe(); tbb::task_group tg; tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); arena->execute([&]() { // Run parallel task in arena @@ -1239,7 +1257,7 @@ DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr) namespace daal { -ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads()) +ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads()), _isInitialized(false) { #if defined(TARGET_X86_64) _numberOfNUMANodes = tbb::info::numa_nodes().size(); diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 3aa314c3319..289460ba99e 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -45,7 +45,6 @@ typedef void (*functype_int32ptr)(const int * i, const void * a); typedef void (*functype_static)(size_t i, size_t tid, const void * a); typedef void (*functype2)(int i, int n, const void * a); typedef void (*functype_blocked_size)(size_t first, size_t last, const void * a); -typedef void (*functype)(int i, const void * a); typedef void * (*tls_functype)(const void * a); typedef void (*tls_reduce_functype)(void * p, const void * a); typedef void (*functype_break)(int i, bool & needBreak, const void * a); @@ -192,11 +191,15 @@ class ThreaderEnvironment int getArenaConcurrency(size_t i) const; + bool isInitialized() const { return _isInitialized; } + void setInitialized(bool value) { _isInitialized = value; } + private: size_t _numberOfThreads; size_t _numberOfNUMANodes; void * _arenas[daal::DAAL_MAX_NUMA_COUNT]; // NUMA-aware arenas void * _safeArena; // default arena + bool _isInitialized; }; inline ThreaderEnvironment * threader_env() @@ -209,11 +212,6 @@ inline size_t threader_get_threads_number() return threader_env()->getNumberOfThreads(); } -inline size_t threader_get_numa_number() -{ - return threader_env()->getNumberOfNUMANodes(); -} - inline size_t setSchedulerHandle(void ** schedulerHandle) { size_t status = _setSchedulerHandle(schedulerHandle); @@ -247,13 +245,6 @@ inline void threader_func_b(int i0, int in, const void * a) func(i0, in); } -template -inline void threader_func_b_size_t(size_t i0, size_t in, const void * a) -{ - const F & func = *static_cast(a); - func(i0, in); -} - template inline void threader_func_break(int i, bool & needBreak, const void * a) { @@ -282,14 +273,6 @@ inline void threader_for(int n, int reserved, const F & func) _daal_threader_for(n, reserved, a, threader_func); } -template -inline void numa_threader_for(int n, int block, const F & func) -{ - const void * a = static_cast(&func); - - _daal_threader_for_blocked_numa(n, block, a, threader_func_b_size_t); -} - template inline void static_numa_threader_for(int n, size_t max_threads, const F & func) { @@ -402,9 +385,9 @@ inline void static_threader_for(size_t n, const F & func) /// `endRange` is the index after the end of the loop's iterations block to be /// processed by a thread, `beginRange < endRange <= n`; /// -/// @param[in] n Number of iterations in the for loop. -/// @param[in] reserved Parameter reserved for the future. Currently unused. -/// @param[in] func Callable object that processes the block of loop's iterations +/// @param[in] n Number of iterations in the for loop. +/// @param[in] grainsize Size of the block of consequent loop's iterations to be processed by a thread. +/// @param[in] func Callable object that processes the block of loop's iterations /// `[beginRange, endRange)`. template inline void threader_for_blocked(int n, size_t grainsize, const F & func) From 441dab9b37d0495b951599532d832f520a496eb0 Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Mon, 24 Feb 2025 05:37:19 -0800 Subject: [PATCH 8/9] Add adaptive change in the number of oneTBB thread slots available to oneDAL --- cpp/daal/src/threading/threading.cpp | 184 ++++++++++++++++++--------- cpp/daal/src/threading/threading.h | 84 +++++++++++- 2 files changed, 204 insertions(+), 64 deletions(-) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index ef998c0cc65..df7e96254f5 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -77,10 +77,6 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle) size_t _initArenas() { - tbb::task_arena * safeArena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); - safeArena->initialize(); - daal::threader_env()->setSafeArena(safeArena); - size_t nNUMA = 1; DAAL_SAFE_CPU_CALL((nNUMA = daal::threader_env()->getNumberOfNUMANodes()), (nNUMA = 1)); if (nNUMA > daal::DAAL_MAX_NUMA_COUNT) @@ -90,17 +86,91 @@ size_t _initArenas() if (nNUMA > 1) { std::vector numa_indexes = tbb::info::numa_nodes(); - for (size_t i = 0; i < nNUMA; ++i) + + tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); + + // Get maximal concurrency for the default arena (first NUMA node) + arena->initialize(tbb::task_arena::constraints(numa_indexes[0])); + arena->execute([]{ + daal::threader_env()->setMaxConcurrency(0, tbb::this_task_arena::max_concurrency()); + }); + arena->terminate(); + arena->initialize(tbb::task_arena::constraints(numa_indexes[0], daal::threader_env()->getNumberOfThreadsUsed())); + daal::threader_env()->setArena(0, arena); + + daal::threader_env()->setDefaultArena(arena); + + // Get maximal concurrency for each arena (other NUMA nodes except the first one) + for (size_t i = 1; i < nNUMA; ++i) { tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); arena->initialize(tbb::task_arena::constraints(numa_indexes[i])); - daal::threader_env()->setArena(i, arena); + arena->execute([&i]{ + daal::threader_env()->setMaxConcurrency(i, tbb::this_task_arena::max_concurrency()); + }); + arena->terminate(); + delete arena; + daal::threader_env()->setArena(i, nullptr); } } + else + { + tbb::task_arena * defaultArena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); + defaultArena->initialize(); + daal::threader_env()->setDefaultArena(defaultArena); + } daal::threader_env()->setInitialized(true); return 0; } +void _updateArenas(size_t nThreadsRequested) +{ + size_t nThreadsUsed = daal::threader_env()->getNumberOfThreadsUsed(); + if (nThreadsRequested <= nThreadsUsed) + return; + const size_t nThreads = daal::threader_env()->getNumberOfThreads(); + if (nThreadsRequested >= nThreads) + nThreadsRequested = nThreads; + + std::vector numa_indexes = tbb::info::numa_nodes(); + const size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); + size_t NUMAIndex = 0; + size_t totalMaxConcurrency = 0; + while (totalMaxConcurrency < nThreadsRequested && NUMAIndex < nNUMA) + { + totalMaxConcurrency += daal::threader_env()->getMaxConcurrency(NUMAIndex); + if (nThreadsRequested <= totalMaxConcurrency) + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(numa_indexes[NUMAIndex])); + arena->terminate(); + arena->initialize(tbb::task_arena::constraints(numa_indexes[NUMAIndex], nThreadsRequested)); + break; + } + else + { + if (totalMaxConcurrency < nThreadsRequested) + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(numa_indexes[NUMAIndex])); + arena->terminate(); + arena->initialize(tbb::task_arena::constraints(numa_indexes[NUMAIndex], daal::threader_env()->getMaxConcurrency(NUMAIndex))); + } + NUMAIndex++; + if (daal::threader_env()->getArena(numa_indexes[NUMAIndex]) == nullptr) + { + tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); + arena->initialize(tbb::task_arena::constraints( + numa_indexes[NUMAIndex], + std::min(nThreadsRequested - totalMaxConcurrency, daal::threader_env()->getMaxConcurrency(NUMAIndex))) + ); + daal::threader_env()->setArena(NUMAIndex, arena); + daal::threader_env()->incrementNumberOfNUMAArenas(); + } + } + } + daal::threader_env()->increaseNumberOfThreadsUsed(nThreadsRequested - nThreadsUsed); +} + + size_t _initArenasThreadsafe() { if (!daal::threader_env()->isInitialized()) @@ -112,6 +182,27 @@ size_t _initArenasThreadsafe() return 0; } +void _releaseArenas() +{ + size_t nArenas = 0; + DAAL_SAFE_CPU_CALL((nArenas = daal::threader_env()->getNumberOfNUMAArenas()), (nArenas = 0)); + if (nArenas) + { + std::vector numa_indexes = tbb::info::numa_nodes(); + for (size_t i = 0; i < nArenas; ++i) + { + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(i)); + delete arena; + } + } + if (daal::threader_env()->getNumberOfNUMANodes() == 1) + { + tbb::task_arena * defaultArena = reinterpret_cast(daal::threader_env()->getDefaultArena()); + delete defaultArena; + } + daal::threader_env()->resetNumberOfNUMAArenas(); +} + DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle) { #if defined(TARGET_X86_64) @@ -131,19 +222,7 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalCo static tbb::spin_mutex mt; tbb::spin_mutex::scoped_lock lock(mt); - tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); - delete safeArena; - size_t nNUMA = 1; - DAAL_SAFE_CPU_CALL((nNUMA = daal::threader_env()->getNumberOfNUMANodes()), (nNUMA = 1)); - if (nNUMA) - { - std::vector numa_indexes = tbb::info::numa_nodes(); - for (size_t i = 0; i < nNUMA; ++i) - { - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(i)); - delete arena; - } - } + _releaseArenas(); if (numThreads != 0) { _daal_tbb_task_scheduler_free(*globalControl); @@ -180,8 +259,8 @@ DAAL_EXPORT void _daal_threader_for(int n, int reserved, const void * a, daal::f _initArenasThreadsafe(); // Run the task in the default arena tbb::task_group tg; - tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); - safeArena->execute([&]() { + tbb::task_arena * defaultArena = reinterpret_cast(daal::threader_env()->getDefaultArena()); + defaultArena->execute([&]() { tg.run([&n, &a, &func] { tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { int i; @@ -193,7 +272,7 @@ DAAL_EXPORT void _daal_threader_for(int n, int reserved, const void * a, daal::f }); }); - safeArena->execute([&] { tg.wait(); }); + defaultArena->execute([&] { tg.wait(); }); } } else @@ -228,8 +307,8 @@ DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::funct _initArenasThreadsafe(); // Run the task in the default arena tbb::task_group tg; - tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); - safeArena->execute([&]() { + tbb::task_arena * defaultArena = reinterpret_cast(daal::threader_env()->getDefaultArena()); + defaultArena->execute([&]() { tg.run([&n, &a, &func] { tbb::parallel_for(tbb::blocked_range(0, n, 1), [&](tbb::blocked_range r) { int64_t i; @@ -240,7 +319,7 @@ DAAL_EXPORT void _daal_threader_for_int64(int64_t n, const void * a, daal::funct }); }); }); - safeArena->execute([&] { tg.wait(); }); + defaultArena->execute([&] { tg.wait(); }); } } else @@ -269,14 +348,14 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v { _initArenasThreadsafe(); tbb::task_group tg; - tbb::task_arena * safeArena = reinterpret_cast(daal::threader_env()->getSafeArena()); - safeArena->execute([&]() { + tbb::task_arena * defaultArena = reinterpret_cast(daal::threader_env()->getDefaultArena()); + defaultArena->execute([&]() { tg.run([&n, &block, &a, &func] { tbb::parallel_for(tbb::blocked_range(0ul, n, block), [=](tbb::blocked_range r) -> void { return func(r.begin(), r.end(), a); }); }); }); - safeArena->execute([&] { tg.wait(); }); + defaultArena->execute([&] { tg.wait(); }); } } else @@ -291,21 +370,8 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, co const size_t nthreads = std::min(nthreadsInEnv, max_threads); if (nthreads > 1) { - size_t nNUMA = 0; - if (max_threads < nthreadsInEnv) - { - nNUMA = 1; - size_t nthreadsInNUMA = daal::threader_env()->getArenaConcurrency(0); - while (nthreadsInNUMA < nthreads) - { - nthreadsInNUMA += daal::threader_env()->getArenaConcurrency(nNUMA); - nNUMA++; - } - } - else - { - nNUMA = daal::threader_env()->getNumberOfNUMANodes(); - } + _updateArenas(max_threads); + const size_t nArenas = daal::threader_env()->getNumberOfNUMAArenas(); const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); #if !(defined DAAL_THREAD_PINNING_DISABLED) daal::services::internal::thread_pinner_t * pinner = daal::services::internal::getThreadPinner(false, read_topology, delete_topology); @@ -328,13 +394,12 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, co else #endif { - _initArenasThreadsafe(); - if (nNUMA > 1) + if (nArenas > 1) { tbb::task_group tg[daal::DAAL_MAX_NUMA_COUNT]; tbb::task_arena * arenas[daal::DAAL_MAX_NUMA_COUNT]; int startThreadIndex = 0; - for (size_t i = 0; i < nNUMA; ++i) + for (size_t i = 0; i < nArenas; ++i) { arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); const int concurrency = std::min(arenas[i]->max_concurrency(), int(max_threads) - startThreadIndex); @@ -358,7 +423,7 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, co startThreadIndex += concurrency; } - for (size_t i = 0; i < nNUMA; ++i) + for (size_t i = 0; i < nArenas; ++i) { // Wait for completion of the task group in the all the arenas. arenas[i]->execute([&] { tg[i].wait(); }); @@ -367,7 +432,7 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, co else { tbb::task_group tg; - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); arena->execute([&]() { // Run parallel task in arena tg.run([&n, &nthreads, &nblocks_per_thread, &func, &a] { tbb::parallel_for( @@ -422,7 +487,7 @@ DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, { _initArenasThreadsafe(); tbb::task_group tg; - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); arena->execute([&]() { // Run parallel task in arena tg.run([&n, &func, &a] { tbb::parallel_for( @@ -471,7 +536,7 @@ DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, { _initArenasThreadsafe(); tbb::task_group tg; - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); arena->execute([&]() { // Run parallel task in arena tg.run([&begin, &end, &func, &a] { tbb::parallel_for(tbb::blocked_range(begin, end, 1), [&](tbb::blocked_range r) { @@ -514,7 +579,7 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, c #endif { _initArenasThreadsafe(); - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); return arena->execute([&]() -> int64_t { // Run parallel task in arena return tbb::parallel_reduce( tbb::blocked_range(0, n), init, @@ -550,7 +615,7 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64_simple(int32_t n, int64_t #endif { _initArenasThreadsafe(); - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); return arena->execute([&]() { // Run parallel task in arena return tbb::parallel_reduce( tbb::blocked_range(0, n), init, @@ -589,7 +654,7 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32ptr_int64_simple(const int32_t * #endif { _initArenasThreadsafe(); - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); return arena->execute([&]() { // Run parallel task in arena return tbb::parallel_reduce( tbb::blocked_range(begin, end), init, @@ -637,7 +702,7 @@ DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::funct { _initArenasThreadsafe(); tbb::task_group tg; - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); arena->execute([&]() { // Run parallel task in arena tg.run([&n, &nthreads, &nblocks_per_thread, &func, &a] { tbb::parallel_for( @@ -683,7 +748,7 @@ DAAL_EXPORT void _daal_parallel_sort_template(F * begin_p, F * end_p) { _initArenasThreadsafe(); tbb::task_group tg; - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); arena->execute([&]() { // Run parallel task in arena tg.run([&begin_p, &end_p] { tbb::parallel_sort(begin_p, end_p); }); }); @@ -772,7 +837,7 @@ DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void { _initArenasThreadsafe(); tbb::task_group tg; - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getSafeArena()); + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getDefaultArena()); arena->execute([&]() { // Run parallel task in arena tg.run([&n, &threads_request, &a, &func, &context] { tbb::parallel_for( @@ -1257,12 +1322,17 @@ DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr) namespace daal { -ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads()), _isInitialized(false) +ThreaderEnvironment::ThreaderEnvironment() : + _numberOfThreads(_daal_threader_get_max_threads()), + _numberOfNUMAArenas(0), + _isInitialized(false) { #if defined(TARGET_X86_64) _numberOfNUMANodes = tbb::info::numa_nodes().size(); + _numberOfThreadsUsed = std::min(_numberOfThreads, 16ul); #else _numberOfNUMANodes = 1; + _numberOfThreadsUsed = _numberOfThreads; #endif } diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 289460ba99e..b253ff6097b 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -166,39 +166,109 @@ inline void threaded_scalable_free(void * ptr) _threaded_scalable_free(ptr); } +/// Global oneDAL parallel execution environment class ThreaderEnvironment { public: ThreaderEnvironment(); + + /// Total number of oneTBB thread slots available to oneDAL size_t getNumberOfThreads() const { return _numberOfThreads; } void setNumberOfThreads(size_t value) { _numberOfThreads = value; } + + /// Get number of oneTBB thread slots used by oneDAL + size_t getNumberOfThreadsUsed() const { return _numberOfThreadsUsed; } + + /// Increase the number of oneTBB thread slots used by oneDAL + /// @param[in] value The number of thread slots to increase + void increaseNumberOfThreadsUsed(size_t value) { + size_t newValue = _numberOfThreadsUsed + value; + _numberOfThreadsUsed = newValue > _numberOfThreads ? _numberOfThreads : newValue; + } + + /// Total number of NUMA nodes available to oneDAL size_t getNumberOfNUMANodes() const { return _numberOfNUMANodes; } void setNumberOfNUMANodes(size_t value) { _numberOfNUMANodes = value; } + + /// Default oneTBB arena used by oneDAL for parallel computations + void * getDefaultArena() const { return _defaultArena; } + void setDefaultArena(void * arena) { _defaultArena = arena; } + + /// Get the arena that is attached to the specified NUMA node + /// + /// @param i Index of the NUMA node + /// @return The arena attached to the NUMA node void * getArena(size_t i) const { if (i >= _numberOfNUMANodes) return nullptr; return _arenas[i]; } - void * getSafeArena() const { return _safeArena; } - + /// Set the arena that is attached to the specified NUMA node + /// @param i Index of the NUMA node + /// @param arena The arena that is attached to i-th NUMA node void setArena(size_t i, void * arena) { if (i < _numberOfNUMANodes) _arenas[i] = arena; } - void setSafeArena(void * arena) { _safeArena = arena; } + /// Get the maximal concurrency of the arena that is attached to the specified NUMA node + /// + /// @param i Index of the NUMA node + /// @return The maximal concurrency of the arena attached to the NUMA node + /// @remark The actual number of thread slots in the the arena may be less than the maximal concurrency + size_t getMaxConcurrency(size_t i) const + { + if (i < _numberOfNUMANodes) return _maxConcurrency[i]; + return 0; + } + /// Set the maximal concurrency of the arena that is attached to the specified NUMA node + /// + /// @param i Index of the NUMA node + /// @param value The maximal concurrency of the arena attached to the NUMA node + /// @remark The actual number of thread slots in the the arena may be less than the maximal concurrency + void setMaxConcurrency(size_t i, size_t value) + { + if (i < _numberOfNUMANodes) _maxConcurrency[i] = value; + } + + /// Get the number of thread slots in the arena attached to the specified NUMA node + /// + /// @param i Index of the NUMA node + /// @return The number of thread slots in the arena attached to the NUMA node int getArenaConcurrency(size_t i) const; + /// Check if the oneDAL threading layer is initialized bool isInitialized() const { return _isInitialized; } + + /// Set the initialization status of the oneDAL threading layer void setInitialized(bool value) { _isInitialized = value; } + /// Reset the number of initialized NUMA-aware arenas + void resetNumberOfNUMAArenas() { _numberOfNUMAArenas = 0; } + + /// Increment the number of initialized NUMA-aware arenas + void incrementNumberOfNUMAArenas() + { + _numberOfNUMAArenas++; + if (_numberOfNUMAArenas > _numberOfNUMANodes) _numberOfNUMAArenas = _numberOfNUMANodes; + } + + /// Get the number of initialized NUMA-aware arenas + size_t getNumberOfNUMAArenas() const { return _numberOfNUMAArenas; } + private: - size_t _numberOfThreads; - size_t _numberOfNUMANodes; - void * _arenas[daal::DAAL_MAX_NUMA_COUNT]; // NUMA-aware arenas - void * _safeArena; // default arena + size_t _numberOfThreads; // maximal number of threads available + size_t _numberOfThreadsUsed; // number of threads currenlly used + // Equal to the sum of maximal concurrencies ao all the initialized arenas + + size_t _numberOfNUMANodes; // number of NUMA nodes available + size_t _numberOfNUMAArenas; // number of initialized NUMA-aware arenas + + void * _defaultArena; // default arena + void * _arenas[daal::DAAL_MAX_NUMA_COUNT]; // NUMA-aware arenas + size_t _maxConcurrency[daal::DAAL_MAX_NUMA_COUNT]; // maximal concurrency of each arena bool _isInitialized; }; From 337068d5b144c12a5de30134fa265c68493a08e4 Mon Sep 17 00:00:00 2001 From: Victoriya Fedotova Date: Thu, 27 Feb 2025 03:59:18 -0800 Subject: [PATCH 9/9] Fixes, comments --- cpp/daal/src/threading/threading.cpp | 145 ++++++++++++++++++--------- 1 file changed, 95 insertions(+), 50 deletions(-) diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index df7e96254f5..aa8825c20e7 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -21,6 +21,9 @@ //-- */ +// #include +// #define DO_PRINT + #include "src/threading/threading.h" #include "services/daal_memory.h" #include "src/algorithms/service_qsort.h" @@ -75,10 +78,22 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle) // #endif } +/// Initialize task arenas in threader environment. +/// +/// If the information about NUMA nodes is available, the default arena is created on the first NUMA node +/// and the maximal concurrency is limited to the default minimal number of threads used by oneDAL. +/// +/// If the information about NUMA nodes is not available, the default arena is created on the whole system +/// and the maximal concurrency is aligned with the maximal oneTBB concurrency +/// +/// \return 0 if success, -1 if failed size_t _initArenas() { size_t nNUMA = 1; DAAL_SAFE_CPU_CALL((nNUMA = daal::threader_env()->getNumberOfNUMANodes()), (nNUMA = 1)); +#ifdef DO_PRINT + std::cout << std::endl << "_initArenas: nNUMA = " << nNUMA << std::endl; +#endif if (nNUMA > daal::DAAL_MAX_NUMA_COUNT) { return -1; @@ -96,22 +111,15 @@ size_t _initArenas() }); arena->terminate(); arena->initialize(tbb::task_arena::constraints(numa_indexes[0], daal::threader_env()->getNumberOfThreadsUsed())); + + #ifdef DO_PRINT + std::cout << "_initArenas: NUMA 0 max_concurrency = " << daal::threader_env()->getMaxConcurrency(0) << std::endl; + std::cout << "_initArenas: NUMA 0 number of threads used = " << daal::threader_env()->getNumberOfThreadsUsed() << std::endl; + #endif daal::threader_env()->setArena(0, arena); daal::threader_env()->setDefaultArena(arena); - // Get maximal concurrency for each arena (other NUMA nodes except the first one) - for (size_t i = 1; i < nNUMA; ++i) - { - tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); - arena->initialize(tbb::task_arena::constraints(numa_indexes[i])); - arena->execute([&i]{ - daal::threader_env()->setMaxConcurrency(i, tbb::this_task_arena::max_concurrency()); - }); - arena->terminate(); - delete arena; - daal::threader_env()->setArena(i, nullptr); - } } else { @@ -123,46 +131,80 @@ size_t _initArenas() return 0; } +/// Update the number of threads in the task arenas in threader environment to reach the requested +/// number of threads if possible. +/// +/// The function updates the number of threads in the already created arenas by re-initializing the arenas +/// to reach the requested number of threads. +/// If the number of threads requested is greater than the number of threads available in already created arenas, +/// the function creates new arenas attached to the next NUMA nodes and initializes them with the number of threads +/// necessary to reach the requested number of threads. +/// +/// @param nThreadsRequested The number of threads requested by a threading primitive. void _updateArenas(size_t nThreadsRequested) { +#ifdef DO_PRINT + std::cout << std::endl << "_updateArenas: nThreadsRequested = " << nThreadsRequested << std::endl; +#endif size_t nThreadsUsed = daal::threader_env()->getNumberOfThreadsUsed(); - if (nThreadsRequested <= nThreadsUsed) - return; const size_t nThreads = daal::threader_env()->getNumberOfThreads(); + if (nThreadsRequested <= nThreadsUsed || nThreads <= nThreadsUsed) + // Do nothing if the number of threads requested is less than the number of threads already used by oneDAL + // or if the number of threads used by oneDAL has already reached the total number of available threads. + return; if (nThreadsRequested >= nThreads) + // If the number of threads requested is greater than the total number of threads available to oneDAL, + // limit the number of requested threads with the total number of threads. nThreadsRequested = nThreads; - std::vector numa_indexes = tbb::info::numa_nodes(); const size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes(); - size_t NUMAIndex = 0; - size_t totalMaxConcurrency = 0; - while (totalMaxConcurrency < nThreadsRequested && NUMAIndex < nNUMA) +#ifdef DO_PRINT + std::cout << "_updateArenas: nNUMA = " << nNUMA << std::endl; +#endif + + size_t iArena = 0; // Index of the arena to be re-initialized + size_t totalMaxConcurrency = 0; // Total number of threads available in arenas visited so far + size_t prevMaxConcurrency = 0; // Total number of threads available in arenas visited before the current one + while (totalMaxConcurrency < nThreadsRequested && iArena < nNUMA) { - totalMaxConcurrency += daal::threader_env()->getMaxConcurrency(NUMAIndex); + /// Update the number of threads available in arenas until the requested number of threads is reached + prevMaxConcurrency = totalMaxConcurrency; + totalMaxConcurrency += daal::threader_env()->getMaxConcurrency(iArena); if (nThreadsRequested <= totalMaxConcurrency) { - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(numa_indexes[NUMAIndex])); + // If the requested number of threads can be reached using the current arena (iArena), + // re-initialize the current arena with the requested number of threads + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(numa_indexes[iArena])); arena->terminate(); - arena->initialize(tbb::task_arena::constraints(numa_indexes[NUMAIndex], nThreadsRequested)); + arena->initialize(tbb::task_arena::constraints(numa_indexes[iArena], nThreadsRequested - prevMaxConcurrency)); + #ifdef DO_PRINT + std::cout << "_updateArenas: Arena " << iArena << " reinitiaslized with " << nThreadsRequested - prevMaxConcurrency << " threads" << std::endl; + #endif break; } else { - if (totalMaxConcurrency < nThreadsRequested) - { - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(numa_indexes[NUMAIndex])); - arena->terminate(); - arena->initialize(tbb::task_arena::constraints(numa_indexes[NUMAIndex], daal::threader_env()->getMaxConcurrency(NUMAIndex))); - } - NUMAIndex++; - if (daal::threader_env()->getArena(numa_indexes[NUMAIndex]) == nullptr) + // If the requested number of threads cannot be reached using the current arena (iArena), + // re-initialize the current arena with the maximal number of threads available in the arena + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(numa_indexes[iArena])); + arena->terminate(); + arena->initialize(tbb::task_arena::constraints(numa_indexes[iArena], daal::threader_env()->getMaxConcurrency(iArena))); + #ifdef DO_PRINT + std::cout << "_updateArenas: Arena " << iArena << " reinitiaslized with max concurrency: " << daal::threader_env()->getMaxConcurrency(iArena) << " threads" << std::endl; + #endif + // Move to the next arena + iArena++; + if (iArena < nNUMA && daal::threader_env()->getArena(numa_indexes[iArena]) == nullptr) { + // If the next arena does not exist, create a new arena attached to the next NUMA node tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1)); - arena->initialize(tbb::task_arena::constraints( - numa_indexes[NUMAIndex], - std::min(nThreadsRequested - totalMaxConcurrency, daal::threader_env()->getMaxConcurrency(NUMAIndex))) - ); - daal::threader_env()->setArena(NUMAIndex, arena); + arena->initialize(tbb::task_arena::constraints(numa_indexes[iArena])); + // Get the maximal concurrency for the new arena + arena->execute([&iArena] { + daal::threader_env()->setMaxConcurrency(iArena, tbb::this_task_arena::max_concurrency()); + }); + + daal::threader_env()->setArena(iArena, arena); daal::threader_env()->incrementNumberOfNUMAArenas(); } } @@ -184,21 +226,19 @@ size_t _initArenasThreadsafe() void _releaseArenas() { + static tbb::spin_mutex mt; + tbb::spin_mutex::scoped_lock lock(mt); + tbb::task_arena * defaultArena = reinterpret_cast(daal::threader_env()->getDefaultArena()); + delete defaultArena; + daal::threader_env()->setDefaultArena(nullptr); size_t nArenas = 0; DAAL_SAFE_CPU_CALL((nArenas = daal::threader_env()->getNumberOfNUMAArenas()), (nArenas = 0)); - if (nArenas) - { - std::vector numa_indexes = tbb::info::numa_nodes(); - for (size_t i = 0; i < nArenas; ++i) - { - tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(i)); - delete arena; - } - } - if (daal::threader_env()->getNumberOfNUMANodes() == 1) + + for (size_t i = 1; i < nArenas; ++i) { - tbb::task_arena * defaultArena = reinterpret_cast(daal::threader_env()->getDefaultArena()); - delete defaultArena; + tbb::task_arena * arena = reinterpret_cast(daal::threader_env()->getArena(i)); + delete arena; + daal::threader_env()->setArena(i, nullptr); } daal::threader_env()->resetNumberOfNUMAArenas(); } @@ -370,7 +410,7 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, co const size_t nthreads = std::min(nthreadsInEnv, max_threads); if (nthreads > 1) { - _updateArenas(max_threads); + _updateArenas(nthreads); const size_t nArenas = daal::threader_env()->getNumberOfNUMAArenas(); const size_t nblocks_per_thread = n / nthreads + !!(n % nthreads); #if !(defined DAAL_THREAD_PINNING_DISABLED) @@ -402,7 +442,7 @@ DAAL_EXPORT void _daal_static_numa_threader_for(size_t n, size_t max_threads, co for (size_t i = 0; i < nArenas; ++i) { arenas[i] = reinterpret_cast(daal::threader_env()->getArena(i)); - const int concurrency = std::min(arenas[i]->max_concurrency(), int(max_threads) - startThreadIndex); + const int concurrency = std::min(arenas[i]->max_concurrency(), int(nthreads) - startThreadIndex); arenas[i]->execute([&]() { // Run each arena on a dedicated NUMA node tg[i].run([&n, &startThreadIndex, &concurrency, &nblocks_per_thread, &func, &a] { // Run in task group tbb::parallel_for( @@ -1328,12 +1368,17 @@ ThreaderEnvironment::ThreaderEnvironment() : _isInitialized(false) { #if defined(TARGET_X86_64) - _numberOfNUMANodes = tbb::info::numa_nodes().size(); - _numberOfThreadsUsed = std::min(_numberOfThreads, 16ul); + DAAL_SAFE_CPU_CALL((_numberOfNUMANodes = tbb::info::numa_nodes().size()), (_numberOfNUMANodes = 1)); + DAAL_SAFE_CPU_CALL((_numberOfThreadsUsed = 16ul), (_numberOfThreadsUsed = _numberOfThreads)); + _numberOfThreadsUsed = std::min(_numberOfThreadsUsed, _numberOfThreads); #else _numberOfNUMANodes = 1; _numberOfThreadsUsed = _numberOfThreads; #endif +#ifdef DO_PRINT + std::cout << std::endl << "ThreaderEnvironment(): Number of threads: " << _numberOfThreads << std::endl; + std::cout << std::endl << "ThreaderEnvironment(): Number of threads used: " << _numberOfThreadsUsed << std::endl; +#endif } int ThreaderEnvironment::getArenaConcurrency(size_t i) const