Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: NUMA-aware threading on CPU #3053

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion cpp/daal/include/services/daal_defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ struct IsSameType<U, U>
static const bool value = true;
};

const size_t DAAL_MALLOC_DEFAULT_ALIGNMENT = 64;
constexpr size_t DAAL_MALLOC_DEFAULT_ALIGNMENT = 64;
constexpr size_t DAAL_MAX_NUMA_COUNT = 8;

const int SERIALIZATION_HOMOGEN_NT_ID = 1000;
const int SERIALIZATION_AOS_NT_ID = 3000;
Expand Down
18 changes: 14 additions & 4 deletions cpp/daal/src/algorithms/covariance/covariance_impl.i
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"
#include "src/externals/service_profiler.h"
#include "src/services/service_environment.h" // getL2CacheSize()

using namespace daal::internal;
using namespace daal::services::internal;
Expand Down Expand Up @@ -161,15 +162,25 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu
if (((isNormalized) || ((!isNormalized) && ((method == defaultDense) || (method == sumDense)))))
{
/* Inverse number of rows (for normalization) */
algorithmFPType nVectorsInv = 1.0 / (double)(nVectors);

const algorithmFPType nVectorsInv = 1.0 / (double)(nVectors);
/* Split rows by blocks */
DAAL_INT64 numRowsInBlock = getBlockSize<cpu>(nVectors);
if (hyperparameter)
{
services::Status status = hyperparameter->find(denseUpdateStepBlockSize, numRowsInBlock);
DAAL_CHECK_STATUS_VAR(status);
}

/* TODO: make a hyperparameter */
constexpr double cacheCoeff = 0.8;

const size_t l2Size = (getL2CacheSize() > 256 * 1024 ? getL2CacheSize() : 256 * 1024);
const size_t nValuesPerThread = l2Size * cacheCoeff / sizeof(algorithmFPType);
const size_t nValues = nVectors * nFeatures;

/* Maximal number of threads to use in parallel region */
const size_t maxNThreads = (nValues > nValuesPerThread ? nValues / nValuesPerThread : 1);

size_t numBlocks = nVectors / numRowsInBlock;
if (numBlocks * numRowsInBlock < nVectors)
{
Expand All @@ -187,10 +198,9 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu
}
return tlsData;
});
DAAL_CHECK_SAFE_STATUS();

/* Threaded loop with syrk seq calls */
daal::static_threader_for(numBlocks, [&](int iBlock, size_t tid) {
daal::static_numa_threader_for(numBlocks, maxNThreads, [&](int iBlock, size_t tid) {
struct tls_data_t<algorithmFPType, cpu> * tls_data_local = tls_data.local(tid);
if (!tls_data_local)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Status collectCounters(const Parameter * nbPar, NumericTable * ntData, NumericTa
daal::tls<algorithmFPType *> tls_n_ci([=]() -> algorithmFPType * { return _CALLOC_<algorithmFPType, cpu>(p * c); });

SafeStatus safeStat;
daal::threader_for_blocked(n, n, [=, &tls_n_ci, &safeStat](algorithmFPType j0, algorithmFPType jn) {
daal::threader_for_blocked(n, 1, [=, &tls_n_ci, &safeStat](algorithmFPType j0, algorithmFPType jn) {
algorithmFPType * local_n_ci = tls_n_ci.local();
DAAL_CHECK_THR(local_n_ci, ErrorMemoryAllocationFailed);

Expand Down
Loading
Loading