Skip to content

Commit 61d4edd

Browse files
gridnevvvitdorooleg
authored andcommitted
parameters pushdown support prototype (#16251)
1 parent ab4964d commit 61d4edd

File tree

5 files changed

+124
-15
lines changed

5 files changed

+124
-15
lines changed

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,8 @@
679679
"Children": [
680680
{"Index": 0, "Name": "Type", "Type": "TExprBase"},
681681
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
682-
{"Index": 2, "Name": "Lambda", "Type": "TCoLambda"}
682+
{"Index": 2, "Name": "Parameters", "Type": "TExprList"},
683+
{"Index": 3, "Name": "Lambda", "Type": "TCoLambda"}
683684
]
684685
},
685686
{

ydb/core/kqp/host/kqp_type_ann.cpp

+19-2
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ TStatus AnnotateOlapFilter(const TExprNode::TPtr& node, TExprContext& ctx) {
10771077
}
10781078

10791079
TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
1080-
if (!EnsureArgsCount(*node, 3U, ctx)) {
1080+
if (!EnsureArgsCount(*node, 4U, ctx)) {
10811081
return TStatus::Error;
10821082
}
10831083

@@ -1097,7 +1097,8 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
10971097
}
10981098

10991099
const auto structType = argsType->Cast<TStructExprType>();
1100-
TTypeAnnotationNode::TListType argsTypes(columns->ChildrenSize());
1100+
std::vector<const NYql::TTypeAnnotationNode*> argsTypes(columns->ChildrenSize());
1101+
11011102
for (auto i = 0U; i < argsTypes.size(); ++i) {
11021103
if (const auto argType = structType->FindItemType(columns->Child(i)->Content()))
11031104
argsTypes[i] = argType;
@@ -1109,6 +1110,22 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
11091110
}
11101111
}
11111112

1113+
TExprList parameters = TExprList(node->Child(TKqpOlapApply::idx_Parameters));
1114+
1115+
for(auto expr: parameters) {
1116+
if (!EnsureArgsCount(*expr.Ptr(), 2U, ctx)) {
1117+
return TStatus::Error;
1118+
}
1119+
1120+
TCoParameter param = TMaybeNode<TCoParameter>(expr.Ptr()).Cast();
1121+
const auto& paramType = expr.Ptr()->Child(TCoParameter::idx_Type);
1122+
if (!EnsureType(*paramType, ctx)) {
1123+
return TStatus::Error;
1124+
}
1125+
1126+
argsTypes.push_back(paramType->GetTypeAnn()->Cast<TTypeExprType>()->GetType());
1127+
}
1128+
11121129
if (!EnsureLambda(node->Tail(), ctx)) {
11131130
return TStatus::Error;
11141131
}

ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

+13-9
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,6 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
214214
return false;
215215
});
216216

217-
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22216
218-
if (parameters.size()!=0) {
219-
return nullptr;
220-
}
221-
222217
const auto members = FindNodes(apply.Ptr(), [&argument] (const TExprNode::TPtr& node) {
223218
if (const auto maybeMember = TMaybeNode<TCoMember>(node))
224219
return maybeMember.Cast().Struct().Raw() == &argument;
@@ -231,10 +226,18 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
231226
arguments.reserve(members.size());
232227
for (const auto& member : members) {
233228
columns.emplace_back(member->TailPtr());
234-
arguments.emplace_back(ctx.NewArgument(member->Pos(), columns.back()->Content()));
229+
TString argumentName = "members_" + TString(columns.back()->Content());
230+
arguments.emplace_back(ctx.NewArgument(member->Pos(), TStringBuf(argumentName)));
235231
replacements.emplace(member.Get(), arguments.back());
236232
}
237233

234+
for(const auto& pptr : parameters) {
235+
TCoParameter parameter = TMaybeNode<TCoParameter>(pptr).Cast();
236+
TString argumentName = "parameter_" + TString(parameter.Name().StringValue());
237+
arguments.emplace_back(ctx.NewArgument(pptr->Pos(), TStringBuf(argumentName)));
238+
replacements.emplace(pptr.Get(), arguments.back());
239+
}
240+
238241
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
239242
if (!columns.size()) {
240243
return nullptr;
@@ -243,6 +246,7 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
243246
return Build<TKqpOlapApply>(ctx, apply.Pos())
244247
.Type(ExpandType(argument.Pos(), *argument.GetTypeAnn(), ctx))
245248
.Columns().Add(std::move(columns)).Build()
249+
.Parameters().Add(std::move(parameters)).Build()
246250
.Lambda(ctx.NewLambda(apply.Pos(), ctx.NewArguments(argument.Pos(), std::move(arguments)), ctx.ReplaceNodes(apply.Ptr(), replacements)))
247251
.Done();
248252
}
@@ -401,7 +405,7 @@ std::vector<TExprBase> ConvertComparisonNode(const TExprBase& nodeIn, const TExp
401405
return SafeCastPredicatePushdown(maybeFlatmap.Cast(), argument, ctx, pos);
402406
} else if (auto maybePredicate = node.Maybe<TCoCompare>()) {
403407
return SimplePredicatePushdown(maybePredicate.Cast(), argument, ctx, pos);
404-
}
408+
}
405409

406410
if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) {
407411
return YqlApplyPushdown(node, argument, ctx);
@@ -800,7 +804,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
800804
}
801805
remaining = std::move(remainingAfterApply);
802806
}
803-
807+
804808
if (pushedPredicates.empty()) {
805809
return node;
806810
}
@@ -810,7 +814,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
810814
const auto remainingFilter = CombinePredicatesWithAnd(remaining, ctx, node.Pos(), false, true);
811815

812816
TMaybeNode<TExprBase> olapFilter;
813-
if (pushedFilter.FirstLevelOps.IsValid()) {
817+
if (pushedFilter.FirstLevelOps.IsValid()) {
814818
olapFilter = Build<TKqpOlapFilter>(ctx, node.Pos())
815819
.Input(read.Process().Body())
816820
.Condition(pushedFilter.FirstLevelOps.Cast())

ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,12 @@ TTypedColumn CompileYqlKernelScalarApply(const TKqpOlapApply& apply, TKqpOlapCom
600600
argTypes.emplace_back(arg.Type);
601601
}
602602

603+
for(const auto& param: apply.Parameters()) {
604+
const auto& arg = GetOrCreateColumnIdAndType(param, ctx);
605+
ids.emplace_back(arg.Id);
606+
argTypes.emplace_back(arg.Type);
607+
}
608+
603609
auto *const command = ctx.CreateAssignCmd();
604610
auto *const function = command->MutableFunction();
605611
const auto idx = ctx.GetKernelRequestBuilder().AddScalarApply(apply.Lambda().Ref(), argTypes, ctx.ExprCtx());

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

+84-3
Original file line numberDiff line numberDiff line change
@@ -1510,7 +1510,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
15101510
//This check is based on an assumpltion, that for pushed down predicates column names are preserved in AST
15111511
//But for non-pushed down predicates column names are (usually) replaced with a label, started with $. It' not a rule, but a heuristic
15121512
//So this check may require a correction when some ast optimization rules are changed
1513-
UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos,
1513+
UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos,
15141514
TStringBuilder() << "Unsafe subpredicate is pushed down. Query: " << query);
15151515

15161516
UNIT_ASSERT_C(ast.find("NarrowMap") != std::string::npos,
@@ -1828,6 +1828,87 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
18281828
UNIT_ASSERT_VALUES_EQUAL(result, insertRows);
18291829
}
18301830

1831+
Y_UNIT_TEST(PredicatePushdownWithParametersILike) {
1832+
constexpr bool logQueries = true;
1833+
auto settings = TKikimrSettings()
1834+
.SetWithSampleTables(false);
1835+
TKikimrRunner kikimr(settings);
1836+
1837+
TStreamExecScanQuerySettings scanSettings;
1838+
scanSettings.Explain(true);
1839+
1840+
TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable();
1841+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
1842+
1843+
auto tableClient = kikimr.GetTableClient();
1844+
1845+
auto buildQuery = [](bool pushEnabled) {
1846+
TStringBuilder builder;
1847+
1848+
builder << "--!syntax_v1" << Endl;
1849+
1850+
if (!pushEnabled) {
1851+
builder << "PRAGMA Kikimr.OptEnableOlapPushdown = \"false\";" << Endl;
1852+
}
1853+
1854+
builder << R"(
1855+
DECLARE $in_uid AS Utf8;
1856+
DECLARE $in_level AS Int32;
1857+
1858+
SELECT `timestamp` FROM `/Root/olapStore/olapTable` WHERE
1859+
uid ILIKE "uid_%" || $in_uid || "%" AND level > $in_level
1860+
ORDER BY `timestamp`;
1861+
)" << Endl;
1862+
1863+
return builder;
1864+
};
1865+
1866+
auto normalQuery = buildQuery(false);
1867+
auto pushQuery = buildQuery(true);
1868+
1869+
auto params = tableClient.GetParamsBuilder()
1870+
.AddParam("$in_uid")
1871+
.Utf8("3000")
1872+
.Build()
1873+
.AddParam("$in_level")
1874+
.Int32(2)
1875+
.Build()
1876+
.Build();
1877+
1878+
auto it = tableClient.StreamExecuteScanQuery(normalQuery, params).GetValueSync();
1879+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
1880+
auto goodResult = CollectStreamResult(it);
1881+
1882+
it = tableClient.StreamExecuteScanQuery(pushQuery, params).GetValueSync();
1883+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
1884+
auto pushResult = CollectStreamResult(it);
1885+
1886+
if (logQueries) {
1887+
Cerr << "Query: " << normalQuery << Endl;
1888+
Cerr << "Expected: " << goodResult.ResultSetYson << Endl;
1889+
Cerr << "Received: " << pushResult.ResultSetYson << Endl;
1890+
}
1891+
1892+
CompareYson(goodResult.ResultSetYson, pushResult.ResultSetYson);
1893+
1894+
it = tableClient.StreamExecuteScanQuery(pushQuery, scanSettings).GetValueSync();
1895+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
1896+
1897+
auto result = CollectStreamResult(it);
1898+
auto ast = result.QueryStats->Getquery_ast();
1899+
1900+
UNIT_ASSERT_C(ast.find("KqpOlapFilter") != std::string::npos,
1901+
TStringBuilder() << "Predicate not pushed down. Query: " << pushQuery);
1902+
1903+
NJson::TJsonValue plan, readRange;
1904+
NJson::ReadJsonTree(*result.PlanJson, &plan, true);
1905+
1906+
Cerr << result.PlanJson << Endl;
1907+
1908+
readRange = FindPlanNodeByKv(plan, "Name", "TableFullScan");
1909+
UNIT_ASSERT(readRange.IsDefined());
1910+
}
1911+
18311912
Y_UNIT_TEST(PredicatePushdownWithParameters) {
18321913
constexpr bool logQueries = true;
18331914
auto settings = TKikimrSettings()
@@ -3332,9 +3413,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
33323413
33333414
)", noTx).GetValueSync();
33343415
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3335-
3416+
33363417
result = queryClient.ExecuteQuery(R"(
3337-
UPSERT INTO Test (Id, Name, Comment) VALUES
3418+
UPSERT INTO Test (Id, Name, Comment) VALUES
33383419
(10, "n1", "aa"),
33393420
(20, "n2", "bb"),
33403421
(30, "n3", "cc"),

0 commit comments

Comments
 (0)