Skip to content

Commit ef33a1e

Browse files
pzuevuniquelogin
authored andcommitted
multi-column hashmap values
1 parent 786b128 commit ef33a1e

File tree

7 files changed

+270
-176
lines changed

7 files changed

+270
-176
lines changed

ydb/core/kqp/tools/combiner_perf/converters.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ namespace NKikimr {
1313
namespace NMiniKQL {
1414

1515
template<bool Embedded>
16-
void NativeToUnboxed(const ui64 value, NUdf::TUnboxedValue& result)
16+
void NativeToUnboxed(const ui64 value, NUdf::TUnboxedValuePod& result)
1717
{
1818
result = NUdf::TUnboxedValuePod(value);
1919
}
2020

2121
template<bool Embedded>
22-
void NativeToUnboxed(const std::string& value, NUdf::TUnboxedValue& result)
22+
void NativeToUnboxed(const std::string& value, NUdf::TUnboxedValuePod& result)
2323
{
2424
if constexpr (Embedded) {
25-
result = NUdf::TUnboxedValue::Embedded(value);
25+
result = NUdf::TUnboxedValuePod::Embedded(value);
2626
} else {
2727
result = NUdf::TUnboxedValuePod(NUdf::TStringValue(value));
2828
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Hash map implementation wrappers
2+
3+
#pragma once
4+
5+
#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
6+
#include <yql/essentials/public/udf/udf_value.h>
7+
8+
#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
9+
10+
#include <unordered_map>
11+
12+
namespace NKikimr {
13+
namespace NMiniKQL {
14+
15+
template<typename K, typename V>
16+
struct TUnorderedMapImpl
17+
{
18+
using TValueType = V;
19+
using TMapType = std::unordered_map<K, V>;
20+
constexpr static bool CustomOps = false;
21+
};
22+
23+
template<typename K, typename V>
24+
struct TAbslMapImpl
25+
{
26+
using TValueType = V;
27+
using TMapType = absl::flat_hash_map<K, V>;
28+
constexpr static bool CustomOps = false;
29+
};
30+
31+
template<typename K, typename V, typename TEqualTo = std::equal_to<K>, typename THash = std::hash<K>>
32+
struct TRobinHoodMapImplBase
33+
{
34+
// Warning: this implementation leaks long strings because it can't call destructors.
35+
// Also it moves keys and values by simply copying bytes so take care.
36+
using TValueType = V;
37+
using TMapType = TRobinHoodHashFixedMap<K, V, TEqualTo, THash>;
38+
constexpr static bool CustomOps = true;
39+
40+
static void AggregateByKey(TMapType& map, const K& key, const V& delta)
41+
{
42+
bool isNew = false;
43+
auto ptr = map.Insert(key, isNew);
44+
if (isNew) {
45+
*(V*)map.GetMutablePayload(ptr) = delta;
46+
map.CheckGrow();
47+
} else {
48+
*(V*)map.GetMutablePayload(ptr) += delta;
49+
}
50+
}
51+
52+
template<typename Callback>
53+
static void IteratePairs(const TMapType& map, Callback&& callback)
54+
{
55+
// TODO: GetPayload and IsValid should be const
56+
for (const char* iter = map.Begin(); iter != map.End(); map.Advance(iter)) {
57+
if (!const_cast<TMapType&>(map).IsValid(iter)) {
58+
continue;
59+
}
60+
const auto& key = map.GetKey(iter);
61+
const auto& value = *(V*)(const_cast<TMapType&>(map)).GetPayload(iter);
62+
callback(key, value);
63+
}
64+
}
65+
66+
static size_t Size(const TMapType& map)
67+
{
68+
return map.GetSize();
69+
}
70+
};
71+
72+
template<typename K, typename V>
73+
struct TRobinHoodMapImpl: public TRobinHoodMapImplBase<K, V>
74+
{
75+
};
76+
77+
template<typename V>
78+
struct TRobinHoodMapImpl<std::string, V>
79+
{
80+
using TValueType = V;
81+
82+
struct TEqualTo
83+
{
84+
bool operator()(const NYql::NUdf::TUnboxedValuePod& lhs, const NYql::NUdf::TUnboxedValuePod& rhs) {
85+
return lhs.AsStringRef() == rhs.AsStringRef();
86+
}
87+
};
88+
89+
struct THash
90+
{
91+
absl::Hash<std::string_view> AbslHash;
92+
93+
size_t operator()(const NYql::NUdf::TUnboxedValuePod& val) {
94+
auto result = AbslHash(val.AsStringRef());
95+
return result;
96+
}
97+
};
98+
99+
using TBase = TRobinHoodMapImplBase<std::string, V>;
100+
using TRealBase = TRobinHoodMapImplBase<NYql::NUdf::TUnboxedValuePod, V, TEqualTo, THash>;
101+
using TMapType = TRealBase::TMapType;
102+
103+
constexpr static bool CustomOps = true;
104+
105+
static void AggregateByKey(TMapType& map, const std::string& key, const V& delta)
106+
{
107+
NYql::NUdf::TUnboxedValuePod ub = NYql::NUdf::TUnboxedValuePod::Embedded(NYql::NUdf::TStringRef(key));
108+
TRealBase::AggregateByKey(map, ub, delta);
109+
}
110+
111+
template<typename Callback>
112+
static void IteratePairs(const TMapType& map, Callback&& callback)
113+
{
114+
TRealBase::IteratePairs(map, [callback](const NYql::NUdf::TUnboxedValuePod& k, const V& v) {
115+
callback(std::string(k.AsStringRef()), v);
116+
});
117+
}
118+
119+
static size_t Size(const TMapType& map)
120+
{
121+
return TRealBase::Size(map);
122+
}
123+
};
124+
125+
template<typename TMapImpl>
126+
bool MapEmpty(const typename TMapImpl::TMapType& map)
127+
{
128+
if constexpr (TMapImpl::CustomOps) {
129+
return TMapImpl::Size(map) == 0;
130+
} else {
131+
return map.empty();
132+
}
133+
}
134+
135+
}
136+
}

ydb/core/kqp/tools/combiner_perf/run_params.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct TRunParams {
3535
size_t NumKeys = 0; // for numeric keys, the range is [0..NumKeys-1]
3636
size_t BlockSize = 0;
3737
size_t WideCombinerMemLimit = 0;
38+
size_t NumAggregations = 1;
3839
bool LongStringKeys = false;
3940
bool MeasureReferenceMemory = false;
4041
bool AlwaysSubprocess = false;

ydb/core/kqp/tools/combiner_perf/simple_last.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,13 @@ namespace {
2626
TDuration MeasureGeneratorTime(IComputationGraph& graph, const IDataSampler& sampler)
2727
{
2828
const auto devnullStream = sampler.MakeStream(graph.GetHolderFactory());
29-
const auto devnullStart = TInstant::Now();
29+
const auto devnullStart = GetThreadCPUTime();
3030
{
3131
NUdf::TUnboxedValue columns[2];
3232
while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) {
3333
}
3434
}
35-
const auto devnullTime = TInstant::Now() - devnullStart;
36-
return devnullTime;
35+
return GetThreadCPUTimeDelta(devnullStart);
3736
}
3837

3938
template<bool LLVM, bool Spilling>

ydb/core/kqp/tools/combiner_perf/streams.cpp

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,32 @@ THolder<IDataSampler> CreateWideSamplerFromParams(const TRunParams& params)
7878
Y_ENSURE(params.RandomSeed.has_value());
7979

8080
switch (params.SamplerType) {
81-
case ESamplerType::StringKeysUI64Values:
82-
return DispatchByMap<std::string, ui64, IDataSampler>(params.ReferenceHashType, [&](auto&& impl) {
81+
case ESamplerType::StringKeysUI64Values: {
82+
auto next = [&](auto&& impl) {
8383
using MapImpl = std::decay_t<decltype(impl)>;
84-
using SamplerType = TString64DataSampler<MapImpl>;
84+
using SamplerType = TString64DataSampler<MapImpl, MapImpl::TValueType::ArrayWidth>;
8585
return MakeHolder<SamplerType>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns, params.LongStringKeys);
86-
});
87-
case ESamplerType::UI64KeysUI64Values:
88-
return DispatchByMap<ui64, ui64, IDataSampler>(params.ReferenceHashType, [&](auto&& impl) {
86+
};
87+
if (params.NumAggregations == 1) {
88+
return DispatchByMap<std::string, TValueWrapper<ui64, 1>, IDataSampler>(params.ReferenceHashType, next);
89+
} else {
90+
// TODO: support other NumAggregations values?
91+
return DispatchByMap<std::string, TValueWrapper<ui64, 3>, IDataSampler>(params.ReferenceHashType, next);
92+
}
93+
}
94+
case ESamplerType::UI64KeysUI64Values: {
95+
auto next = [&](auto&& impl) {
8996
using MapImpl = std::decay_t<decltype(impl)>;
90-
using SamplerType = T6464DataSampler<MapImpl>;
97+
using SamplerType = T6464DataSampler<MapImpl, MapImpl::TValueType::ArrayWidth>;
9198
return MakeHolder<SamplerType>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns);
92-
});
99+
};
100+
if (params.NumAggregations == 1) {
101+
return DispatchByMap<ui64, TValueWrapper<ui64, 1>, IDataSampler>(params.ReferenceHashType, next);
102+
} else {
103+
// TODO: support other NumAggregations values?
104+
return DispatchByMap<ui64, TValueWrapper<ui64, 3>, IDataSampler>(params.ReferenceHashType, next);
105+
}
106+
}
93107
}
94108
}
95109

@@ -104,7 +118,6 @@ struct TUpdateMapFromBlocks<TMapImpl, ui64>
104118
{
105119
static void Update(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, typename TMapImpl::TMapType& result)
106120
{
107-
108121
auto datumKey = TArrowBlock::From(key).GetDatum();
109122
auto datumValue = TArrowBlock::From(value).GetDatum();
110123
UNIT_ASSERT(datumKey.is_array());

0 commit comments

Comments
 (0)