Skip to content

YQ fixed kqprun grpc endpoints #19790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,7 @@ namespace Tests {
}

void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId, const std::optional<TString>& tenant) {
auto* grpcInfo = &RootGRpc;
if (tenant) {
grpcInfo = &TenantsGRpc[*tenant];
}

auto* grpcInfo = &TenantsGRpc[tenant ? *tenant : Settings->DomainName][grpcServiceNodeId];
grpcInfo->GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
auto& counters = grpcInfo->GRpcServerRootCounters;

Expand Down Expand Up @@ -1685,15 +1681,16 @@ namespace Tests {
}

const NYdbGrpc::TGRpcServer& TServer::GetGRpcServer() const {
Y_ABORT_UNLESS(RootGRpc.GRpcServer);
return *RootGRpc.GRpcServer;
return GetTenantGRpcServer(Settings->DomainName);
}

const NYdbGrpc::TGRpcServer& TServer::GetTenantGRpcServer(const TString& tenant) const {
const auto it = TenantsGRpc.find(tenant);
Y_ABORT_UNLESS(it != TenantsGRpc.end());
Y_ABORT_UNLESS(it->second.GRpcServer);
return *it->second.GRpcServer;
const auto tenantIt = TenantsGRpc.find(tenant);
Y_ABORT_UNLESS(tenantIt != TenantsGRpc.end());
const auto& nodesGRpc = tenantIt->second;
Y_ABORT_UNLESS(!nodesGRpc.empty());
Y_ABORT_UNLESS(nodesGRpc.begin()->second.GRpcServer);
return *nodesGRpc.begin()->second.GRpcServer;
}

void TServer::WaitFinalization() {
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/testlib/test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,17 @@ namespace Tests {
void SetupDefaultProfiles();

TIntrusivePtr<::NMonitoring::TDynamicCounters> GetGRpcServerRootCounters() const {
return RootGRpc.GRpcServerRootCounters;
const auto tenantIt = TenantsGRpc.find(Settings->DomainName);
Y_ABORT_UNLESS(tenantIt != TenantsGRpc.end());
Y_ABORT_UNLESS(!tenantIt->second.empty());
return tenantIt->second.begin()->second.GRpcServerRootCounters;
}

void ShutdownGRpc() {
RootGRpc.Shutdown();
for (auto& [_, tenantGRpc] : TenantsGRpc) {
tenantGRpc.Shutdown();
for (auto& [_, nodeGRpc] : tenantGRpc) {
nodeGRpc.Shutdown();
}
}
}

Expand Down Expand Up @@ -407,8 +411,7 @@ namespace Tests {
}
};

TGRpcInfo RootGRpc;
std::unordered_map<TString, TGRpcInfo> TenantsGRpc;
std::unordered_map<TString, std::unordered_map<ui32, TGRpcInfo>> TenantsGRpc; // tenant -> nodeIdx -> GRpcInfo
};

class TClient {
Expand Down
56 changes: 30 additions & 26 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
return serverSettings;
}

void CreateTenant(Ydb::Cms::CreateDatabaseRequest&& request, const TString& relativePath, const TString& type, TStorageMeta::TTenant tenantInfo, ui32 grpcPort) {
void CreateTenant(Ydb::Cms::CreateDatabaseRequest&& request, const TString& relativePath, const TString& type, TStorageMeta::TTenant tenantInfo) {
const auto absolutePath = request.path();
const auto [it, inserted] = StorageMeta_.MutableTenants()->emplace(relativePath, tenantInfo);
if (inserted || it->second.GetCreationInProgress()) {
Expand Down Expand Up @@ -231,24 +231,14 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
Tenants_->Run(absolutePath, tenantInfo.GetNodesCount());
}
}

if (tenantInfo.GetType() != TStorageMeta::TTenant::SERVERLESS) {
if (Settings_.GrpcEnabled) {
Server_->EnableGRpc(grpcPort, GetNodeIndexForDatabase(absolutePath), absolutePath);
} else if (Settings_.MonitoringEnabled) {
ui32 nodeIndex = GetNodeIndexForDatabase(absolutePath);
NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(nodeIndex);
GetRuntime()->Register(NKikimr::CreateBoardPublishActor(NKikimr::MakeEndpointsBoardPath(absolutePath), "", edgeActor, 0, true), nodeIndex, GetRuntime()->GetAppData(nodeIndex).UserPoolId);
}
}
}

static void AddTenantStoragePool(Ydb::Cms::StorageUnits* storage, const TString& name) {
storage->set_unit_kind(name);
storage->set_count(1);
}

void CreateTenants(TPortGenerator& grpcPortGen) {
void CreateTenants() {
std::set<TString> sharedTenants;
std::map<TString, TStorageMeta::TTenant> serverlessTenants;
for (const auto& [tenantPath, tenantInfo] : Settings_.Tenants) {
Expand All @@ -258,13 +248,13 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
switch (tenantInfo.GetType()) {
case TStorageMeta::TTenant::DEDICATED:
AddTenantStoragePool(request.mutable_resources()->add_storage_units(), tenantPath);
CreateTenant(std::move(request), tenantPath, "dedicated", tenantInfo, grpcPortGen.GetPort());
CreateTenant(std::move(request), tenantPath, "dedicated", tenantInfo);
break;

case TStorageMeta::TTenant::SHARED:
sharedTenants.emplace(tenantPath);
AddTenantStoragePool(request.mutable_shared_resources()->add_storage_units(), tenantPath);
CreateTenant(std::move(request), tenantPath, "shared", tenantInfo, grpcPortGen.GetPort());
CreateTenant(std::move(request), tenantPath, "shared", tenantInfo);
break;

case TStorageMeta::TTenant::SERVERLESS:
Expand Down Expand Up @@ -292,12 +282,11 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
request.set_path(GetTenantPath(tenantPath));
request.mutable_serverless_resources()->set_shared_database_path(GetTenantPath(tenantInfo.GetSharedTenant()));
ServerlessToShared_[request.path()] = request.serverless_resources().shared_database_path();
CreateTenant(std::move(request), tenantPath, "serverless", tenantInfo, 0);
CreateTenant(std::move(request), tenantPath, "serverless", tenantInfo);
}
}

void InitializeServer() {
TPortGenerator grpcPortGen(PortManager, Settings_.FirstGrpcPort);
void InitializeServer(TPortGenerator& grpcPortGen) {
const ui32 domainGrpcPort = grpcPortGen.GetPort();
NKikimr::Tests::TServerSettings serverSettings = GetServerSettings(domainGrpcPort);

Expand All @@ -316,7 +305,7 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
Client_->InitRootScheme();

Tenants_ = MakeHolder<NKikimr::Tests::TTenants>(Server_);
CreateTenants(grpcPortGen);
CreateTenants();
}

void InitializeYqlLogger() {
Expand All @@ -328,7 +317,7 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
NYql::NLog::InitLogger(NActors::CreateNullBackend());
}

NThreading::TFuture<void> RunHealthCheck(const TString& database) const {
NThreading::TFuture<void> InitializeTenantNodes(const TString& database, TPortGenerator& grpcPortGen) const {
EHealthCheck level = Settings_.HealthCheckLevel;
i32 nodesCount = Settings_.NodeCount;
TVector<ui32> tenantNodesIdx;
Expand All @@ -337,17 +326,29 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
nodesCount = tenantNodesIdx.size();
}

const auto& absolutePath = NKikimr::CanonizePath(database);
const TWaitResourcesSettings settings = {
.ExpectedNodeCount = nodesCount,
.HealthCheckLevel = level,
.HealthCheckTimeout = Settings_.HealthCheckTimeout,
.VerboseLevel = Settings_.VerboseLevel,
.Database = NKikimr::CanonizePath(database)
.Database = absolutePath
};

std::vector<NThreading::TFuture<void>> futures;
futures.reserve(nodesCount);
for (i32 nodeIdx = 0; nodeIdx < nodesCount; ++nodeIdx) {
const auto node = tenantNodesIdx ? tenantNodesIdx[nodeIdx] : nodeIdx;
if (Settings_.GrpcEnabled) {
if (node > 0) {
// Port for first static node also used in cluster initialization
Server_->EnableGRpc(grpcPortGen.GetPort(), node, absolutePath);
}
} else if (Settings_.MonitoringEnabled) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А тут точно должен быть else if, а не if? (так было и в оригинальном коде выше, если это ошибка -- то не новая, но чуть странно)

NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node);
GetRuntime()->Register(NKikimr::CreateBoardPublishActor(NKikimr::MakeEndpointsBoardPath(absolutePath), "", edgeActor, 0, true), node, GetRuntime()->GetAppData(node).UserPoolId);
}

const auto promise = NThreading::NewPromise();
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), tenantNodesIdx ? tenantNodesIdx[nodeIdx] : nodeIdx, GetRuntime()->GetAppData().SystemPoolId);
futures.emplace_back(promise.GetFuture());
Expand All @@ -356,11 +357,13 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
return NThreading::WaitAll(futures);
}

void WaitResourcesPublishing() const {
std::vector<NThreading::TFuture<void>> futures(1, RunHealthCheck(Settings_.DomainName));
void InitializeTenants(TPortGenerator& grpcPortGen) const {
std::vector<NThreading::TFuture<void>> futures(1, InitializeTenantNodes(Settings_.DomainName, grpcPortGen));
futures.reserve(StorageMeta_.GetTenants().size() + 1);
for (const auto& [tenantName, _] : StorageMeta_.GetTenants()) {
futures.emplace_back(RunHealthCheck(GetTenantPath(tenantName)));
for (const auto& [tenantName, tenantInfo] : StorageMeta_.GetTenants()) {
if (tenantInfo.GetType() != TStorageMeta::TTenant::SERVERLESS) {
futures.emplace_back(InitializeTenantNodes(GetTenantPath(tenantName), grpcPortGen));
}
}

try {
Expand All @@ -375,9 +378,10 @@ class TYdbSetup::TImpl : public TKikimrSetupBase {
: Settings_(settings)
, CoutColors_(NColorizer::AutoColors(Cout))
{
TPortGenerator grpcPortGen(PortManager, Settings_.FirstGrpcPort);
InitializeYqlLogger();
InitializeServer();
WaitResourcesPublishing();
InitializeServer(grpcPortGen);
InitializeTenants(grpcPortGen);

if (Settings_.MonitoringEnabled && Settings_.VerboseLevel >= EVerbose::Info) {
for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) {
Expand Down
Loading