Skip to content

YQ supported override nodes placement #19787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
SecretNames.push_back(secretName);
}
for (const auto& stage : transaction.Body->GetStages()) {
if (!stage.GetTasksNodes().empty()) {
ResourceSnapshotRequired = true;
}
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
ResourceSnapshotRequired = true;
HasExternalSources = true;
Expand Down Expand Up @@ -2059,7 +2062,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
} else if (stageInfo.Meta.IsSysView()) {
BuildSysViewScanTasks(stageInfo);
} else if (stageInfo.Meta.ShardOperations.empty() || stage.SinksSize() > 0) {
BuildComputeTasks(stageInfo, std::max<ui32>(ShardsOnNode.size(), ResourcesSnapshot.size()));
BuildComputeTasks(stageInfo, std::max<ui32>(ShardsOnNode.size(), ResourcesSnapshot.size()), ResourcesSnapshot);
} else {
BuildDatashardTasks(stageInfo);
}
Expand Down
51 changes: 42 additions & 9 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,27 @@ class TKqpExecuterBase : public TActor<TDerived> {
}
}

std::function<ui32(ui32 taskIdx)> GetStageNodesHint(const NKqpProto::TKqpPhyStage& stage, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
Copy link
Member

@gridnevvvit gridnevvvit Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем аж std::function ? можно проще?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

я предлагаю подумать и отказаться от этой идеи как таковой, а решать задачу в какой то другой парадигме. например если задача резделения нагрузки то запрещать использовать какие-то ноды.

назначать сотни тасок по нодам не имеет примерно никакого юзкейса

const auto& nodesHint = stage.GetTasksNodes();
if (nodesHint.empty()) {
return nullptr;
}

std::unordered_set<ui32> availableNodes;
availableNodes.reserve(resourceSnapshot.size());
for (const auto& nodeResource : resourceSnapshot) {
availableNodes.emplace(nodeResource.GetNodeId());
}

for (const auto nodeId : nodesHint) {
YQL_ENSURE(availableNodes.contains(nodeId), "Invalid node id " << nodeId << " in stage override, available nodes: " << JoinSeq(", ", availableNodes));
}

return [nodesHint](ui32 taskIdx) {
return nodesHint[taskIdx % nodesHint.size()];
};
}

void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot, ui32 scheduledTaskCount) {
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

Expand Down Expand Up @@ -1056,12 +1077,18 @@ class TKqpExecuterBase : public TActor<TDerived> {
structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson();
}

ui64 selfNodeIdx = 0;
for (size_t i = 0; i < resourceSnapshot.size(); ++i) {
if (resourceSnapshot[i].GetNodeId() == SelfId().NodeId()) {
selfNodeIdx = i;
break;
auto nodesPlanner = GetStageNodesHint(stage, resourceSnapshot);
if (!nodesPlanner && !resourceSnapshot.empty()) {
ui64 selfNodeIdx = 0;
for (size_t i = 0; i < resourceSnapshot.size(); ++i) {
if (resourceSnapshot[i].GetNodeId() == SelfId().NodeId()) {
selfNodeIdx = i;
break;
}
}
nodesPlanner = [&resourceSnapshot, selfNodeIdx](ui32 taskIdx) {
return resourceSnapshot[(selfNodeIdx + taskIdx) % resourceSnapshot.size()].GetNodeId();
};
}

TVector<ui64> tasksIds;
Expand All @@ -1082,10 +1109,10 @@ class TKqpExecuterBase : public TActor<TDerived> {
}
FillSecureParamsFromStage(task.Meta.SecureParams, stage);

if (resourceSnapshot.empty()) {
if (!nodesPlanner) {
task.Meta.Type = TTaskMeta::TTaskType::Compute;
} else {
task.Meta.NodeId = resourceSnapshot[(selfNodeIdx + i) % resourceSnapshot.size()].GetNodeId();
task.Meta.NodeId = nodesPlanner(i);
task.Meta.Type = TTaskMeta::TTaskType::Scan;
}

Expand Down Expand Up @@ -1413,7 +1440,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
}
}

void BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCount) {
void BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCount, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

ui32 partitionsCount = 1;
Expand Down Expand Up @@ -1478,9 +1505,15 @@ class TKqpExecuterBase : public TActor<TDerived> {
}
}

auto nodesPlanner = GetStageNodesHint(stage, resourceSnapshot);
for (ui32 i = 0; i < partitionsCount; ++i) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.Type = TTaskMeta::TTaskType::Compute;
if (!nodesPlanner) {
task.Meta.Type = TTaskMeta::TTaskType::Compute;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

что тут происходит, зачем это?

} else {
task.Meta.NodeId = nodesPlanner(i);
task.Meta.Type = TTaskMeta::TTaskType::Scan;
}
task.Meta.ExecuterId = SelfId();
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, stageInfo, task);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
YQL_ENSURE(false, "unknown source type");
}
} else if (stageInfo.Meta.ShardOperations.empty()) {
BuildComputeTasks(stageInfo, ShardsOnNode.size());
BuildComputeTasks(stageInfo, ShardsOnNode.size(), {});
} else if (stageInfo.Meta.IsSysView()) {
BuildSysViewScanTasks(stageInfo);
} else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) {
Expand Down
135 changes: 112 additions & 23 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,111 @@ std::optional<std::pair<TString, TString>> FindOneSecureParam(const TExprNode::T
return *secureParams.begin();
}

TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpProto::TKqpPhyQuery& queryProto) {
TIssues issues;
NJson::TJsonValue jsonNode;
try {
NJson::TJsonReaderConfig jsonConfig;
NJson::ReadJsonTree(overridePlannerJson, &jsonConfig, &jsonNode, true);
if (!jsonNode.IsArray()) {
issues.AddIssue("Expected array json value");
return issues;
}
} catch (const std::exception& e) {
issues.AddIssue(TStringBuilder() << "Failed to parse json: " << e.what());
return issues;
}

const auto extractUint = [](const NJson::TJsonValue& node, ui32* result) -> TString {
if (!node.IsUInteger()) {
return "Expected non negative integer json value";
}

*result = node.GetUIntegerSafe();
return "";
};

THashSet<std::pair<ui32, ui32>> updatedStages;
for (ui64 overrideIdx = 0; const auto& stageOverride : jsonNode.GetArray()) {
overrideIdx++;
if (!stageOverride.IsMap()) {
issues.AddIssue(TStringBuilder() << "Expected map json value for stage override " << overrideIdx);
continue;
}

ui32 txId = 0;
ui32 stageId = 0;
std::optional<ui32> tasks;
std::vector<ui32> nodes;
for (const auto& [key, value] : stageOverride.GetMap()) {
if (key == "nodes") {
if (!value.IsArray()) {
issues.AddIssue(TStringBuilder() << "Expected array json value for key 'nodes' in stage override " << overrideIdx);
continue;
}
nodes.reserve(value.GetArray().size());
for (const auto& node : value.GetArray()) {
nodes.emplace_back();
if (const auto& error = extractUint(node, &nodes.back())) {
issues.AddIssue(TStringBuilder() << error << " for key 'nodes' on position " << nodes.size() - 1 << " in stage override " << overrideIdx);
}
}
continue;
}

ui32* result = nullptr;
if (key == "tx") {
result = &txId;
} else if (key == "stage") {
result = &stageId;
} else if (key == "tasks") {
tasks = 0;
result = &(*tasks);
} else {
issues.AddIssue(TStringBuilder() << "Unknown key '" << key << "' in stage override " << overrideIdx);
continue;
}

if (const auto& error = extractUint(value, result)) {
issues.AddIssue(TStringBuilder() << error << " for key '" << key << "' in stage override " << overrideIdx);
continue;
}
}

if (!updatedStages.emplace(txId, stageId).second) {
issues.AddIssue(TStringBuilder() << "Duplicate stage override " << overrideIdx << " for tx " << txId << " and stage " << stageId);
continue;
}

if (!tasks && nodes.empty()) {
issues.AddIssue(TStringBuilder() << "Missing stage settings for tx " << txId << " and stage " << stageId << " in stage override " << overrideIdx);
continue;
}

auto& txs = *queryProto.MutableTransactions();
if (txId >= static_cast<ui32>(txs.size())) {
issues.AddIssue(TStringBuilder() << "Invalid tx id: " << txId << " in stage override " << overrideIdx << ", number of transactions in query: " << txs.size());
continue;
}

auto& stages = *txs[txId].MutableStages();
if (stageId >= static_cast<ui32>(stages.size())) {
issues.AddIssue(TStringBuilder() << "Invalid stage id: " << stageId << " in stage override " << overrideIdx << ", number of stages in transaction " << txId << ": " << stages.size());
continue;
}

auto& stage = stages[stageId];
if (tasks) {
stage.SetTaskCount(*tasks);
}
if (!nodes.empty()) {
stage.MutableTasksNodes()->Assign(nodes.begin(), nodes.end());
}
}

return issues;
}

class TKqpQueryCompiler : public IKqpQueryCompiler {
public:
TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
Expand Down Expand Up @@ -545,30 +650,14 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
CompileTransaction(tx, *queryProto.AddTransactions(), ctx);
}

auto overridePlanner = Config->OverridePlanner.Get();
if (overridePlanner) {
NJson::TJsonReaderConfig jsonConfig;
NJson::TJsonValue jsonNode;
if (NJson::ReadJsonTree(*overridePlanner, &jsonConfig, &jsonNode)) {
for (auto& stageOverride : jsonNode.GetArray()) {
ui32 txId = 0;
if (auto* txNode = stageOverride.GetValueByPath("tx")) {
txId = txNode->GetIntegerSafe();
}
if (txId < static_cast<ui32>(queryProto.GetTransactions().size())) {
auto& tx = *queryProto.MutableTransactions(txId);
ui32 stageId = 0;
if (auto* stageNode = stageOverride.GetValueByPath("stage")) {
stageId = stageNode->GetIntegerSafe();
}
if (stageId < static_cast<ui32>(tx.GetStages().size())) {
auto& stage = *tx.MutableStages(stageId);
if (auto* tasksNode = stageOverride.GetValueByPath("tasks")) {
stage.SetTaskCount(tasksNode->GetIntegerSafe());
}
}
}
if (const auto overridePlanner = Config->OverridePlanner.Get()) {
if (const auto& issues = ApplyOverridePlannerSettings(*overridePlanner, queryProto)) {
NYql::TIssue rootIssue("Invalid override planner settings");
rootIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
for (auto issue : issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO)));
}
ctx.AddError(rootIssue);
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ message TKqpPhyStage {
uint32 TaskCount = 14;
double StageCost = 15;
bool IsShuffleEliminated = 16;
repeated uint32 TasksNodes = 17;
}

message TKqpPhyResult {
Expand Down
Loading