From 116a1dafa9288bbbb486b3a96df3f0daf1e53c8d Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 18 Jun 2025 11:15:06 +0300 Subject: [PATCH 1/5] Supported override nodes placement --- .../kqp/executer_actor/kqp_data_executer.cpp | 5 +- .../kqp/executer_actor/kqp_executer_impl.h | 51 +++++-- .../kqp/executer_actor/kqp_scan_executer.cpp | 2 +- .../kqp/query_compiler/kqp_query_compiler.cpp | 141 +++++++++++++++--- ydb/core/protos/kqp_physical.proto | 1 + 5 files changed, 166 insertions(+), 34 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 9003a2b5d5d2..b9dd6763bce6 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1942,6 +1942,9 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetStages()) { + if (!stage.GetTasksNodes().empty()) { + ResourceSnapshotRequired = true; + } if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) { ResourceSnapshotRequired = true; HasExternalSources = true; @@ -2059,7 +2062,7 @@ class TKqpDataExecuter : public TKqpExecuterBase 0) { - BuildComputeTasks(stageInfo, std::max(ShardsOnNode.size(), ResourcesSnapshot.size())); + BuildComputeTasks(stageInfo, std::max(ShardsOnNode.size(), ResourcesSnapshot.size()), ResourcesSnapshot); } else { BuildDatashardTasks(stageInfo); } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index a36a7fd0a43e..0e17e0015209 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1023,6 +1023,27 @@ class TKqpExecuterBase : public TActor { } } + std::function GetStageNodesHint(const NKqpProto::TKqpPhyStage& stage, const TVector& resourceSnapshot) { + const auto& nodesHint = stage.GetTasksNodes(); + if (nodesHint.empty()) { + return nullptr; + } + + std::unordered_set availableNodes; + availableNodes.reserve(resourceSnapshot.size()); + for (const auto& nodeResource : resourceSnapshot) { + availableNodes.emplace(nodeResource.GetNodeId()); + } + + for (const auto nodeId : nodesHint) { + YQL_ENSURE(availableNodes.contains(nodeId), "Invalid node id " << nodeId << " in stage override, available nodes: " << JoinSeq(", ", availableNodes)); + } + + return [nodesHint](ui32 taskIdx) { + return nodesHint[taskIdx % nodesHint.size()]; + }; + } + void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector& resourceSnapshot, ui32 scheduledTaskCount) { const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -1056,12 +1077,18 @@ class TKqpExecuterBase : public TActor { structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson(); } - ui64 selfNodeIdx = 0; - for (size_t i = 0; i < resourceSnapshot.size(); ++i) { - if (resourceSnapshot[i].GetNodeId() == SelfId().NodeId()) { - selfNodeIdx = i; - break; + auto nodesPlaner = GetStageNodesHint(stage, resourceSnapshot); + if (!nodesPlaner && !resourceSnapshot.empty()) { + ui64 selfNodeIdx = 0; + for (size_t i = 0; i < resourceSnapshot.size(); ++i) { + if (resourceSnapshot[i].GetNodeId() == SelfId().NodeId()) { + selfNodeIdx = i; + break; + } } + nodesPlaner = [&resourceSnapshot, selfNodeIdx](ui32 taskIdx) { + return resourceSnapshot[(selfNodeIdx + taskIdx) % resourceSnapshot.size()].GetNodeId(); + }; } TVector tasksIds; @@ -1082,10 +1109,10 @@ class TKqpExecuterBase : public TActor { } FillSecureParamsFromStage(task.Meta.SecureParams, stage); - if (resourceSnapshot.empty()) { + if (!nodesPlaner) { task.Meta.Type = TTaskMeta::TTaskType::Compute; } else { - task.Meta.NodeId = resourceSnapshot[(selfNodeIdx + i) % resourceSnapshot.size()].GetNodeId(); + task.Meta.NodeId = nodesPlaner(i); task.Meta.Type = TTaskMeta::TTaskType::Scan; } @@ -1413,7 +1440,7 @@ class TKqpExecuterBase : public TActor { } } - void BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCount) { + void BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCount, const TVector& resourceSnapshot) { auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); ui32 partitionsCount = 1; @@ -1478,9 +1505,15 @@ class TKqpExecuterBase : public TActor { } } + auto nodesPlaner = GetStageNodesHint(stage, resourceSnapshot); for (ui32 i = 0; i < partitionsCount; ++i) { auto& task = TasksGraph.AddTask(stageInfo); - task.Meta.Type = TTaskMeta::TTaskType::Compute; + if (!nodesPlaner) { + task.Meta.Type = TTaskMeta::TTaskType::Compute; + } else { + task.Meta.NodeId = nodesPlaner(i); + task.Meta.Type = TTaskMeta::TTaskType::Scan; + } task.Meta.ExecuterId = SelfId(); FillSecureParamsFromStage(task.Meta.SecureParams, stage); BuildSinks(stage, stageInfo, task); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 75ddddab5808..b3064069010b 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -191,7 +191,7 @@ class TKqpScanExecuter : public TKqpExecuterBase> FindOneSecureParam(const TExprNode::T return *secureParams.begin(); } +TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpProto::TKqpPhyQuery& queryProto) { + TIssues issues; + NJson::TJsonValue jsonNode; + try { + NJson::TJsonReaderConfig jsonConfig; + NJson::ReadJsonTree(overridePlannerJson, &jsonConfig, &jsonNode, true); + if (!jsonNode.IsArray()) { + issues.AddIssue("Expected array json value"); + return issues; + } + } catch (const std::exception& e) { + issues.AddIssue(TStringBuilder() << "Failed to parse json: " << e.what()); + return issues; + } + + const auto extractUint = [](const NJson::TJsonValue& node, ui32* result) -> TString { + if (!node.IsInteger()) { + return "Expected integer json value"; + } + + const auto value = node.GetIntegerSafe(); + if (value < 0) { + return "Expected non negative integer json value"; + } + + *result = value; + return ""; + }; + + THashSet> updatedStages; + for (ui64 overrideIdx = 0; const auto& stageOverride : jsonNode.GetArray()) { + overrideIdx++; + if (!stageOverride.IsMap()) { + issues.AddIssue(TStringBuilder() << "Expected map json value for stage override " << overrideIdx); + continue; + } + + ui32 txId = 0; + ui32 stageId = 0; + std::optional tasks; + std::vector nodes; + for (const auto& [key, value] : stageOverride.GetMap()) { + if (key == "nodes") { + if (!value.IsArray()) { + issues.AddIssue(TStringBuilder() << "Expected array json value for key 'nodes' in stage override " << overrideIdx); + continue; + } + nodes.reserve(value.GetArray().size()); + for (const auto& node : value.GetArray()) { + nodes.emplace_back(); + if (const auto& error = extractUint(node, &nodes.back())) { + issues.AddIssue(TStringBuilder() << error << " for key 'nodes' on position " << nodes.size() - 1 << " in stage override " << overrideIdx); + continue; + } + } + continue; + } + + ui32* result = nullptr; + if (key == "tx") { + result = &txId; + } else if (key == "stage") { + result = &stageId; + } else if (key == "tasks") { + tasks = 0; + result = &(*tasks); + } else { + issues.AddIssue(TStringBuilder() << "Unknown key '" << key << "' in stage override " << overrideIdx); + continue; + } + + if (const auto& error = extractUint(value, result)) { + issues.AddIssue(TStringBuilder() << error << " for key '" << key << "' in stage override " << overrideIdx); + continue; + } + } + + if (!updatedStages.emplace(txId, stageId).second) { + issues.AddIssue(TStringBuilder() << "Duplicate stage override " << overrideIdx << " for tx " << txId << " and stage " << stageId); + continue; + } + + if (!tasks && nodes.empty()) { + issues.AddIssue(TStringBuilder() << "Missing stage settings for tx " << txId << " and stage " << stageId << " in stage override " << overrideIdx); + continue; + } + + auto& txs = *queryProto.MutableTransactions(); + if (txId >= static_cast(txs.size())) { + issues.AddIssue(TStringBuilder() << "Invalid tx id: " << txId << " in stage override " << overrideIdx << ", number of transactions in query: " << txs.size()); + continue; + } + + auto& stages = *txs[txId].MutableStages(); + if (stageId >= static_cast(stages.size())) { + issues.AddIssue(TStringBuilder() << "Invalid stage id: " << stageId << " in stage override " << overrideIdx << ", number of stages in transaction " << txId << ": " << stages.size()); + continue; + } + + auto& stage = stages[stageId]; + if (tasks) { + stage.SetTaskCount(*tasks); + } + if (!nodes.empty()) { + stage.MutableTasksNodes()->Assign(nodes.begin(), nodes.end()); + } + } + + return issues; +} + class TKqpQueryCompiler : public IKqpQueryCompiler { public: TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr tablesData, @@ -545,30 +656,14 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { CompileTransaction(tx, *queryProto.AddTransactions(), ctx); } - auto overridePlanner = Config->OverridePlanner.Get(); - if (overridePlanner) { - NJson::TJsonReaderConfig jsonConfig; - NJson::TJsonValue jsonNode; - if (NJson::ReadJsonTree(*overridePlanner, &jsonConfig, &jsonNode)) { - for (auto& stageOverride : jsonNode.GetArray()) { - ui32 txId = 0; - if (auto* txNode = stageOverride.GetValueByPath("tx")) { - txId = txNode->GetIntegerSafe(); - } - if (txId < static_cast(queryProto.GetTransactions().size())) { - auto& tx = *queryProto.MutableTransactions(txId); - ui32 stageId = 0; - if (auto* stageNode = stageOverride.GetValueByPath("stage")) { - stageId = stageNode->GetIntegerSafe(); - } - if (stageId < static_cast(tx.GetStages().size())) { - auto& stage = *tx.MutableStages(stageId); - if (auto* tasksNode = stageOverride.GetValueByPath("tasks")) { - stage.SetTaskCount(tasksNode->GetIntegerSafe()); - } - } - } + if (const auto overridePlanner = Config->OverridePlanner.Get()) { + if (const auto& issues = ApplyOverridePlannerSettings(*overridePlanner, queryProto)) { + NYql::TIssue rootIssue("Invalid override planner settings"); + rootIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO); + for (auto issue : issues) { + rootIssue.AddSubIssue(MakeIntrusive(issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO))); } + ctx.AddError(rootIssue); } } diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index aceb46845842..f7425aac17fd 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -416,6 +416,7 @@ message TKqpPhyStage { uint32 TaskCount = 14; double StageCost = 15; bool IsShuffleEliminated = 16; + repeated uint32 TasksNodes = 17; } message TKqpPhyResult { From 6a8ae6e077f6d84afbc0bba27d39b9e1a90eabb9 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 18 Jun 2025 11:17:44 +0300 Subject: [PATCH 2/5] Removed continue --- ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index eaa6aa077344..cc0f1eeaf56b 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -545,7 +545,6 @@ TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpPro nodes.emplace_back(); if (const auto& error = extractUint(node, &nodes.back())) { issues.AddIssue(TStringBuilder() << error << " for key 'nodes' on position " << nodes.size() - 1 << " in stage override " << overrideIdx); - continue; } } continue; From df383cc3ce92702b65c46bf0efc3478da1f4753a Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 18 Jun 2025 11:19:41 +0300 Subject: [PATCH 3/5] Fixed integer extraction --- ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index cc0f1eeaf56b..90749690682b 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -509,16 +509,11 @@ TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpPro } const auto extractUint = [](const NJson::TJsonValue& node, ui32* result) -> TString { - if (!node.IsInteger()) { - return "Expected integer json value"; - } - - const auto value = node.GetIntegerSafe(); - if (value < 0) { + if (!node.IsUInteger()) { return "Expected non negative integer json value"; } - *result = value; + *result = node.GetIntegerSafe(); return ""; }; From 4faadce709f41fb1cf1a1011e808557668016b69 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 18 Jun 2025 11:24:38 +0300 Subject: [PATCH 4/5] Fixed integer extraction 2 --- ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 90749690682b..c323738db8d0 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -513,7 +513,7 @@ TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpPro return "Expected non negative integer json value"; } - *result = node.GetIntegerSafe(); + *result = node.GetUIntegerSafe(); return ""; }; From 07a634bdfaa6ed53ef2ecf1af06a47d6fac86fa6 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 18 Jun 2025 11:30:52 +0300 Subject: [PATCH 5/5] Fixed typos --- ydb/core/kqp/executer_actor/kqp_executer_impl.h | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 0e17e0015209..604297f4bc25 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1077,8 +1077,8 @@ class TKqpExecuterBase : public TActor { structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson(); } - auto nodesPlaner = GetStageNodesHint(stage, resourceSnapshot); - if (!nodesPlaner && !resourceSnapshot.empty()) { + auto nodesPlanner = GetStageNodesHint(stage, resourceSnapshot); + if (!nodesPlanner && !resourceSnapshot.empty()) { ui64 selfNodeIdx = 0; for (size_t i = 0; i < resourceSnapshot.size(); ++i) { if (resourceSnapshot[i].GetNodeId() == SelfId().NodeId()) { @@ -1086,7 +1086,7 @@ class TKqpExecuterBase : public TActor { break; } } - nodesPlaner = [&resourceSnapshot, selfNodeIdx](ui32 taskIdx) { + nodesPlanner = [&resourceSnapshot, selfNodeIdx](ui32 taskIdx) { return resourceSnapshot[(selfNodeIdx + taskIdx) % resourceSnapshot.size()].GetNodeId(); }; } @@ -1109,10 +1109,10 @@ class TKqpExecuterBase : public TActor { } FillSecureParamsFromStage(task.Meta.SecureParams, stage); - if (!nodesPlaner) { + if (!nodesPlanner) { task.Meta.Type = TTaskMeta::TTaskType::Compute; } else { - task.Meta.NodeId = nodesPlaner(i); + task.Meta.NodeId = nodesPlanner(i); task.Meta.Type = TTaskMeta::TTaskType::Scan; } @@ -1505,13 +1505,13 @@ class TKqpExecuterBase : public TActor { } } - auto nodesPlaner = GetStageNodesHint(stage, resourceSnapshot); + auto nodesPlanner = GetStageNodesHint(stage, resourceSnapshot); for (ui32 i = 0; i < partitionsCount; ++i) { auto& task = TasksGraph.AddTask(stageInfo); - if (!nodesPlaner) { + if (!nodesPlanner) { task.Meta.Type = TTaskMeta::TTaskType::Compute; } else { - task.Meta.NodeId = nodesPlaner(i); + task.Meta.NodeId = nodesPlanner(i); task.Meta.Type = TTaskMeta::TTaskType::Scan; } task.Meta.ExecuterId = SelfId();