Skip to content

Commit c58582f

Browse files
authored
Merge pull request #17452 from dorooleg/stable-25-1-analytics-cs-main-nikvas0
merge main to stable-25-1-analytics
2 parents a709348 + c951e81 commit c58582f

File tree

542 files changed

+15213
-5852
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

542 files changed

+15213
-5852
lines changed

.github/config/muted_ya.txt

+11-19
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,6 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
2020
ydb/core/kqp/ut/cost KqpCost.OlapWriteRow
2121
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
2222
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
23-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1
24-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean
25-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts
26-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_3_1
27-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_3_2_1_clean
28-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit3_1
29-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.ChangeSchemaAndSplit
30-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.HugeSchemeHistory
31-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMerge
32-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait
33-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait
34-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSchemaVersions
35-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplits
36-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsThenMerges
37-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait
38-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
39-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
40-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
41-
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
4223
ydb/core/kqp/ut/olap KqpOlapJson.BloomIndexesVariants
4324
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictActualization
4425
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization
@@ -47,6 +28,9 @@ ydb/core/kqp/ut/olap [*/*] chunk chunk
4728
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
4829
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
4930
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
31+
ydb/core/kqp/ut/query KqpLimits.OutOfSpaceYQLUpsertFail+useSink
32+
ydb/core/kqp/ut/query KqpLimits.QSReplySizeEnsureMemoryLimits+useSink
33+
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
5034
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
5135
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
5236
ydb/core/kqp/ut/scheme [*/*] chunk chunk
@@ -163,6 +147,13 @@ ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[36]
163147
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[67]
164148
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[86]
165149
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[9]
150+
ydb/tests/functional/tpc/large test_tpch_spilling.py.TestTpchSpillingS10.test_tpch[7]
151+
ydb/tests/olap sole chunk chunk
152+
ydb/tests/olap test_quota_exhaustion.py.TestYdbWorkload.test_delete
153+
ydb/tests/olap/data_quotas test_quota_exhaustion.py.TestYdbWorkload.test_duplicates
154+
ydb/tests/olap/column_family/compression alter_compression.py.TestAlterCompression.test_all_supported_compression
155+
ydb/tests/olap/column_family/compression sole chunk chunk
156+
ydb/tests/olap/oom overlapping_portions.py.TestOverlappingPortions.test
166157
ydb/tests/olap/scenario sole chunk chunk
167158
ydb/tests/olap/scenario test_alter_compression.py.TestAlterCompression.test[alter_compression]
168159
ydb/tests/olap/scenario test_alter_tiering.py.TestAlterTiering.test[many_tables]
@@ -172,6 +163,7 @@ ydb/tests/olap/ttl_tiering data_migration_when_alter_ttl.py.TestDataMigrationWhe
172163
ydb/tests/olap/ttl_tiering sole chunk chunk
173164
ydb/tests/olap/ttl_tiering ttl_delete_s3.py.TestDeleteS3Ttl.test_data_unchanged_after_ttl_change
174165
ydb/tests/olap/ttl_tiering ttl_delete_s3.py.TestDeleteS3Ttl.test_ttl_delete
166+
ydb/tests/olap/ttl_tiering ttl_delete_s3.py.TestDeleteS3Ttl.test_delete_s3_tiering
175167
ydb/tests/olap/ttl_tiering ttl_unavailable_s3.py.TestUnavailableS3.test
176168
ydb/tests/olap/ttl_tiering ttl_delete_s3.py.TestDeleteS3Ttl.test_delete_s3_tiering
177169
ydb/tests/olap/ttl_tiering unstable_connection.py.TestUnstableConnection.test

ydb/core/driver_lib/run/run.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
#include <ydb/library/actors/util/memory_track.h>
150150
#include <ydb/library/actors/prof/tag.h>
151151
#include <ydb/library/security/ydb_credentials_provider_factory.h>
152+
#include <ydb/library/signal_backtrace/signal_backtrace.h>
152153
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
153154

154155
#include <util/charset/wide.h>
@@ -1939,6 +1940,10 @@ void TKikimrRunner::SetSignalHandlers() {
19391940
signal(SIGINT, &TKikimrRunner::OnTerminate);
19401941
signal(SIGTERM, &TKikimrRunner::OnTerminate);
19411942

1943+
if (IsTrue(GetEnv("YDB_ENABLE_SIGNAL_BACKTRACE"))) {
1944+
Singleton<TTraceCollector>(TTraceCollector::DEFAULT_SIGNALS);
1945+
}
1946+
19421947
#if !defined(_win_)
19431948
SetAsyncSignalHandler(SIGHUP, [](int) {
19441949
TLogBackend::ReopenAllBackends();

ydb/core/driver_lib/run/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ PEERDIR(
138138
ydb/library/grpc/server/actors
139139
ydb/library/pdisk_io
140140
ydb/library/security
141+
ydb/library/signal_backtrace
141142
ydb/library/yql/providers/pq/cm_client
142143
ydb/library/yql/providers/s3/actors
143144
ydb/public/lib/base

ydb/core/formats/arrow/accessor/abstract/accessor.cpp

+25-27
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,7 @@ std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 reco
2424
}
2525

2626
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
27-
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
28-
ui32 currentOffset = offset;
29-
ui32 countLeast = count;
30-
std::vector<std::shared_ptr<arrow::Array>> chunks;
31-
auto address = GetChunkSlow(offset);
32-
while (countLeast) {
33-
address = GetChunk(address.GetAddress(), currentOffset);
34-
const ui64 internalPos = address.GetAddress().GetLocalIndex(currentOffset);
35-
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
36-
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
37-
break;
38-
} else {
39-
const ui32 deltaCount = address.GetArray()->length() - internalPos;
40-
chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount));
41-
AFL_VERIFY(countLeast >= deltaCount);
42-
countLeast -= deltaCount;
43-
currentOffset += deltaCount;
44-
}
45-
}
46-
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
27+
return GetChunkedArray(TColumnConstructionContext().SetStartIndex(offset).SetRecordsCount(count));
4728
}
4829

4930
IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const {
@@ -62,10 +43,10 @@ IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(const std::optional<TAdd
6243
return TFullDataAddress(localAddress.GetArray(), std::move(addressChain));
6344
} else {
6445
auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr);
65-
if (chunkCurrent) {
66-
AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
67-
"chunked", chunkedArrayAddress.GetAddress().GetSize());
68-
}
46+
// if (chunkCurrent) {
47+
// AFL_VERIFY(chunkCurrent->GetSize() == chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
48+
// "chunked", chunkedArrayAddress.GetAddress().GetSize());
49+
// }
6950
auto localAddress = chunkedArrayAddress.GetArray()->GetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position));
7051
auto fullAddress = std::move(chunkedArrayAddress.MutableAddress());
7152
fullAddress.Add(localAddress.GetAddress());
@@ -112,7 +93,7 @@ std::shared_ptr<IChunkedArray> IChunkedArray::DoApplyFilter(const TColumnFilter&
11293
auto schema = std::make_shared<arrow::Schema>(fields);
11394
auto table = arrow::Table::Make(schema, { arr }, GetRecordsCount());
11495
AFL_VERIFY(table->num_columns() == 1);
115-
AFL_VERIFY(filter.Apply(table));
96+
filter.Apply(table);
11697
if (table->column(0)->num_chunks() == 1) {
11798
return std::make_shared<TTrivialArray>(table->column(0)->chunk(0));
11899
} else {
@@ -121,8 +102,8 @@ std::shared_ptr<IChunkedArray> IChunkedArray::DoApplyFilter(const TColumnFilter&
121102
}
122103

123104
std::shared_ptr<IChunkedArray> IChunkedArray::ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const {
124-
AFL_VERIFY(selfPtr);
125105
if (filter.IsTotalAllowFilter()) {
106+
AFL_VERIFY(selfPtr);
126107
return selfPtr;
127108
}
128109
if (filter.IsTotalDenyFilter()) {
@@ -134,7 +115,7 @@ std::shared_ptr<IChunkedArray> IChunkedArray::ApplyFilter(const TColumnFilter& f
134115
return result;
135116
}
136117

137-
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray() const {
118+
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArrayTrivial() const {
138119
std::vector<std::shared_ptr<arrow::Array>> chunks;
139120
std::optional<TFullDataAddress> address;
140121
for (ui32 position = 0; position < GetRecordsCount();) {
@@ -145,6 +126,23 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray() const {
145126
return std::make_shared<arrow::ChunkedArray>(chunks, GetDataType());
146127
}
147128

129+
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray(const TColumnConstructionContext& context) const {
130+
if (context.GetStartIndex() || context.GetRecordsCount()) {
131+
const ui32 start = context.GetStartIndex().value_or(0);
132+
const ui32 count = context.GetRecordsCount().value_or(GetRecordsCount() - start);
133+
auto slice = ISlice(start, count);
134+
if (context.GetFilter() && !context.GetFilter()->IsTotalAllowFilter()) {
135+
return slice->ApplyFilter(context.GetFilter()->Slice(start, count), slice)->GetChunkedArrayTrivial();
136+
} else {
137+
return slice->GetChunkedArrayTrivial();
138+
}
139+
} else if (context.GetFilter() && !context.GetFilter()->IsTotalAllowFilter()) {
140+
return ApplyFilter(*context.GetFilter(), nullptr)->GetChunkedArrayTrivial();
141+
} else {
142+
return GetChunkedArrayTrivial();
143+
}
144+
}
145+
148146
TString IChunkedArray::TReader::DebugString(const ui32 position) const {
149147
auto address = GetReadChunk(position);
150148
return NArrow::DebugString(address.GetArray(), address.GetPosition());

ydb/core/formats/arrow/accessor/abstract/accessor.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include "common.h"
4+
35
#include <ydb/library/accessor/accessor.h>
46
#include <ydb/library/accessor/validator.h>
57
#include <ydb/library/formats/arrow/splitter/similar_packer.h>
@@ -264,6 +266,8 @@ class IChunkedArray {
264266
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const = 0;
265267

266268
protected:
269+
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArrayTrivial() const;
270+
267271
std::shared_ptr<arrow::Schema> GetArraySchema() const {
268272
const arrow::FieldVector fields = { std::make_shared<arrow::Field>("val", GetDataType()) };
269273
return std::make_shared<arrow::Schema>(fields);
@@ -344,7 +348,6 @@ class IChunkedArray {
344348
}
345349

346350
virtual void Reallocate() {
347-
348351
}
349352

350353
void VisitValues(const TValuesSimpleVisitor& visitor) const {
@@ -444,7 +447,8 @@ class IChunkedArray {
444447
return *result;
445448
}
446449

447-
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const;
450+
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray(
451+
const TColumnConstructionContext& context = Default<TColumnConstructionContext>()) const;
448452
virtual ~IChunkedArray() = default;
449453

450454
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#include "common.h"
2+
3+
#include <ydb/core/formats/arrow/arrow_filter.h>
4+
5+
namespace NKikimr::NArrow::NAccessor {
6+
7+
TColumnConstructionContext& TColumnConstructionContext::SetFilter(const std::shared_ptr<TColumnFilter>& val) {
8+
if (!val || val->IsTotalAllowFilter()) {
9+
Filter = nullptr;
10+
} else {
11+
Filter = val;
12+
}
13+
return *this;
14+
}
15+
16+
std::optional<TColumnConstructionContext> TColumnConstructionContext::Slice(const ui32 offset, const ui32 count) const {
17+
std::optional<TColumnConstructionContext> result;
18+
const ui32 start = std::max<ui32>(offset, StartIndex.value_or(0));
19+
const ui32 finish = std::min<ui32>(offset + count, StartIndex.value_or(0) + RecordsCount.value_or(offset + count));
20+
if (finish <= start) {
21+
result = std::nullopt;
22+
} else {
23+
result = TColumnConstructionContext().SetStartIndex(start - offset).SetRecordsCount(finish - start, count);
24+
}
25+
if (result && Filter) {
26+
result->SetFilter(std::make_shared<TColumnFilter>(Filter->Slice(offset, count)));
27+
}
28+
return result;
29+
}
30+
31+
} // namespace NKikimr::NArrow::NAccessor
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#pragma once
2+
#include <ydb/library/accessor/accessor.h>
3+
4+
#include <util/system/types.h>
5+
6+
#include <memory>
7+
#include <optional>
8+
9+
namespace NKikimr::NArrow {
10+
class TColumnFilter;
11+
}
12+
13+
namespace NKikimr::NArrow::NAccessor {
14+
15+
class TColumnConstructionContext {
16+
private:
17+
YDB_READONLY_DEF(std::optional<ui32>, StartIndex);
18+
YDB_ACCESSOR_DEF(std::optional<ui32>, RecordsCount);
19+
YDB_READONLY_DEF(std::shared_ptr<TColumnFilter>, Filter);
20+
21+
public:
22+
TColumnConstructionContext& SetRecordsCount(const ui32 recordsCount, const ui32 defValue) {
23+
if (recordsCount == defValue) {
24+
RecordsCount.reset();
25+
} else {
26+
RecordsCount = recordsCount;
27+
}
28+
return *this;
29+
}
30+
31+
TColumnConstructionContext& SetStartIndex(const ui32 startIndex) {
32+
if (startIndex) {
33+
StartIndex = startIndex;
34+
} else {
35+
StartIndex.reset();
36+
}
37+
return *this;
38+
}
39+
40+
TColumnConstructionContext& SetFilter(const std::shared_ptr<TColumnFilter>& val);
41+
42+
std::optional<TColumnConstructionContext> Slice(const ui32 offset, const ui32 count) const;
43+
44+
TColumnConstructionContext() = default;
45+
};
46+
47+
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/abstract/constructor.h

+4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ class IConstructor {
3333
const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& originalArray, const TChunkConstructionData& externalInfo) const = 0;
3434

3535
public:
36+
virtual bool HasInternalConversion() const {
37+
return false;
38+
}
39+
3640
IConstructor(const IChunkedArray::EType type)
3741
: Type(type) {
3842
}

ydb/core/formats/arrow/accessor/abstract/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ PEERDIR(
1010
)
1111

1212
SRCS(
13+
common.cpp
1314
constructor.cpp
1415
request.cpp
1516
accessor.cpp

ydb/core/formats/arrow/accessor/composite/accessor.cpp

+20
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,24 @@ std::optional<bool> TCompositeChunkedArray::DoCheckOneValueAccessor(std::shared_
110110
return true;
111111
}
112112

113+
std::shared_ptr<arrow::ChunkedArray> TCompositeChunkedArray::GetChunkedArray(const TColumnConstructionContext& context) const {
114+
ui32 pos = 0;
115+
std::vector<std::shared_ptr<arrow::Array>> chunks;
116+
for (auto&& i : Chunks) {
117+
auto sliceCtx = context.Slice(pos, i->GetRecordsCount());
118+
if (!sliceCtx) {
119+
if (chunks.size()) {
120+
break;
121+
} else {
122+
pos += i->GetRecordsCount();
123+
continue;
124+
}
125+
}
126+
std::shared_ptr<arrow::ChunkedArray> arr = i->GetChunkedArray(*sliceCtx);
127+
chunks.insert(chunks.end(), arr->chunks().begin(), arr->chunks().end());
128+
pos += i->GetRecordsCount();
129+
}
130+
return std::make_shared<arrow::ChunkedArray>(std::move(chunks));
131+
}
132+
113133
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/composite/accessor.h

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
2222
private:
2323
YDB_READONLY_DEF(std::vector<std::shared_ptr<IChunkedArray>>, Chunks);
2424

25+
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray(const TColumnConstructionContext& context) const override;
26+
2527
virtual void DoVisitValues(const TValuesSimpleVisitor& visitor) const override {
2628
for (auto&& i : Chunks) {
2729
i->VisitValues(visitor);

0 commit comments

Comments
 (0)