Skip to content

Commit 0b821c5

Browse files
Solomon in Cloud reading support (#17324)
1 parent 8ada07d commit 0b821c5

File tree

43 files changed

+414
-500
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+414
-500
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
135135
},
136136
{
137137
ToString(NYql::EDatabaseType::Solomon),
138-
CreateExternalDataSource(TString{NYql::SolomonProviderName}, {"NONE", "TOKEN"}, {"use_ssl", "grpc_port"}, hostnamePatternsRegEx)
138+
CreateExternalDataSource(TString{NYql::SolomonProviderName}, {"NONE", "TOKEN"}, {"use_ssl", "grpc_location", "project", "cluster"}, hostnamePatternsRegEx)
139139
},
140140
{
141141
ToString(NYql::EDatabaseType::Iceberg),

ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,12 @@ TString GetOrEmpty(const NYql::TCreateObjectSettings& container, const TString&
8888
"service_name", // oracle
8989
"folder_id", // logging
9090
"use_ssl", // solomon
91-
"grpc_port", // solomon
9291
"reading_mode", // mongodb
9392
"unexpected_type_display_mode", // mongodb
9493
"unsupported_type_display_mode", // mongodb
94+
"grpc_location", // solomon
95+
"project", // solomon
96+
"cluster" // solomon
9597
};
9698

9799
auto& featuresExtractor = settings.GetFeaturesExtractor();

ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,10 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
274274

275275
void Fetch() {
276276
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
277+
std::map<TString, TString> selectors(ReadParams.Source.GetSelectors().begin(), ReadParams.Source.GetSelectors().end());
277278
ListingFuture =
278279
SolomonClient
279-
->ListMetrics(ReadParams.Source.GetSelectors(), PageSize, CurrentPage++)
280+
->ListMetrics(selectors, PageSize, CurrentPage++)
280281
.Subscribe([actorSystem, selfId = SelfId()](
281282
NThreading::TFuture<NSo::TListMetricsResponse> future) -> void {
282283
try {

ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
109109
SOURCE_LOG_D("Init");
110110
IngressStats.Level = statsLevel;
111111

112-
UseMetricsQueue = ReadParams.Source.HasSelectors();
112+
UseMetricsQueue = !ReadParams.Source.HasProgram();
113113

114114
auto stringType = ProgramBuilder.NewDataType(NYql::NUdf::TDataType<char*>::Id);
115115
DictType = ProgramBuilder.NewDictType(stringType, stringType, false);
@@ -219,11 +219,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
219219
return;
220220
}
221221

222-
auto& metrics = batch.Metrics;
222+
auto& metric = batch.Metric;
223223
auto& pointsCount = batch.Response.Result.PointsCount;
224-
ParsePointsCount(metrics, pointsCount);
224+
ParsePointsCount(metric, pointsCount);
225225

226-
SOURCE_LOG_D("HandlePointsCountBatch batch of size " << metrics.size());
227226
TryRequestData();
228227
}
229228

@@ -317,7 +316,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
317316
auto value = HolderFactory.CreateDirectArrayHolder(ReadParams.Source.GetSystemColumns().size() + ReadParams.Source.GetLabelNames().size(), items);
318317

319318
if (auto it = Index.find(SOLOMON_SCHEME_VALUE); it != Index.end()) {
320-
items[it->second] = NUdf::TUnboxedValuePod(values[i]);
319+
items[it->second] = isnan(values[i]) ? NUdf::TUnboxedValuePod() : NUdf::TUnboxedValuePod(values[i]).MakeOptional();
321320
}
322321

323322
if (auto it = Index.find(SOLOMON_SCHEME_TYPE); it != Index.end()) {
@@ -412,21 +411,17 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
412411
}
413412

414413
void RequestPointsCount() {
415-
std::vector<NSo::TMetric> requestMetrics;
416-
requestMetrics.reserve(std::min<ui64>(MetricsPerPointsCountQuery, ListedMetrics.size()));
417-
while (!ListedMetrics.empty() && requestMetrics.size() < MetricsPerPointsCountQuery) {
418-
requestMetrics.push_back(ListedMetrics.back());
419-
ListedMetrics.pop_back();
420-
}
414+
NSo::TMetric requestMetric = ListedMetrics.back();
415+
ListedMetrics.pop_back();
421416

422-
auto getPointsCountFuture = SolomonClient->GetPointsCount(std::move(requestMetrics));
417+
auto getPointsCountFuture = SolomonClient->GetPointsCount(requestMetric.Labels);
423418

424419
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
425-
getPointsCountFuture.Subscribe([actorSystem, metrics = std::move(requestMetrics), selfId = SelfId()](
420+
getPointsCountFuture.Subscribe([actorSystem, metric = std::move(requestMetric), selfId = SelfId()](
426421
const NThreading::TFuture<NSo::TGetPointsCountResponse>& response) mutable -> void
427422
{
428423
actorSystem->Send(selfId, new TEvSolomonProvider::TEvPointsCountBatch(
429-
std::move(metrics),
424+
std::move(metric),
430425
response.GetValue())
431426
);
432427
});
@@ -454,7 +449,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
454449
from = request.From;
455450
to = request.To;
456451

457-
getDataFuture = SolomonClient->GetData(metric, from, to);
452+
getDataFuture = SolomonClient->GetData(metric.Labels, from, to);
458453
} else {
459454
getDataFuture = SolomonClient->GetData(
460455
ReadParams.Source.GetProgram(),
@@ -476,18 +471,19 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
476471
});
477472
}
478473

479-
void ParsePointsCount(const std::vector<NSo::TMetric>& metrics, const std::vector<ui64>& pointsCount) {
474+
void ParsePointsCount(const NSo::TMetric& metric, ui64 pointsCount) {
480475
TInstant from = TInstant::Seconds(ReadParams.Source.GetFrom());
481476
TInstant to = TInstant::Seconds(ReadParams.Source.GetTo());
482477

483-
for (size_t i = 0; i < metrics.size(); ++i) {
484-
auto& metric = metrics[i];
485-
auto& count = pointsCount[i];
478+
auto ranges = SplitTimeIntervalIntoRanges(from, to, pointsCount);
486479

487-
auto ranges = SplitTimeIntervalIntoRanges(from, to, count);
488-
for (const auto& [fromRange, toRange] : ranges) {
489-
MetricsWithTimeRange.emplace_back(metric, fromRange, toRange);
490-
}
480+
if (ranges.empty()) {
481+
CompletedMetricsCount++;
482+
return;
483+
}
484+
485+
for (const auto& [fromRange, toRange] : ranges) {
486+
MetricsWithTimeRange.emplace_back(metric, fromRange, toRange);
491487
}
492488
}
493489

@@ -535,7 +531,6 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
535531
std::deque<NSo::TTimeseries> MetricsData;
536532
size_t ListedMetricsCount = 0;
537533
size_t CompletedMetricsCount = 0;
538-
const ui64 MetricsPerPointsCountQuery = 50;
539534
const ui64 MaxPointsPerOneMetric = 1000000;
540535

541536
TString SourceId;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#include "util.h"
2+
3+
#include <yql/essentials/utils/yql_panic.h>
4+
5+
#include <util/string/cast.h>
6+
#include <util/string/split.h>
7+
#include <util/string/strip.h>
8+
9+
namespace NYql::NSo {
10+
11+
NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) {
12+
switch (clusterType) {
13+
case TSolomonClusterConfig::SCT_SOLOMON:
14+
return NSo::NProto::ESolomonClusterType::CT_SOLOMON;
15+
case TSolomonClusterConfig::SCT_MONITORING:
16+
return NSo::NProto::ESolomonClusterType::CT_MONITORING;
17+
default:
18+
YQL_ENSURE(false, "Invalid cluster type " << ToString<ui32>(clusterType));
19+
}
20+
}
21+
22+
std::map<TString, TString> ExtractSelectorValues(const TString& selectors) {
23+
YQL_ENSURE(selectors.size() >= 2, "Selectors should be at least 2 characters long");
24+
std::map<TString, TString> result;
25+
26+
auto selectorValues = StringSplitter(selectors.substr(1, selectors.size() - 2)).Split(',').SkipEmpty().ToList<TString>();
27+
for (const auto& selectorValue : selectorValues) {
28+
size_t eqPos = selectorValue.find("=");
29+
YQL_ENSURE(eqPos <= selectorValue.size());
30+
31+
TString key = StripString(selectorValue.substr(0, eqPos));
32+
TString value = StripString(selectorValue.substr(eqPos + 1, selectorValue.size() - eqPos - 1));
33+
YQL_ENSURE(!key.empty());
34+
YQL_ENSURE(value.size() >= 2);
35+
36+
result[key] = value.substr(1, value.size() - 2);
37+
}
38+
39+
return result;
40+
}
41+
42+
NProto::TDqSolomonSource FillSolomonSource(const TSolomonClusterConfig* config, const TString& project) {
43+
NSo::NProto::TDqSolomonSource source;
44+
45+
source.SetClusterType(NSo::MapClusterType(config->GetClusterType()));
46+
source.SetUseSsl(config->GetUseSsl());
47+
48+
if (source.GetClusterType() == NSo::NProto::CT_MONITORING) {
49+
source.SetProject(config->GetPath().GetProject());
50+
source.SetCluster(config->GetPath().GetCluster());
51+
} else {
52+
source.SetProject(project);
53+
}
54+
55+
source.SetHttpEndpoint(config->GetCluster());
56+
for (const auto& attr : config->settings()) {
57+
if (attr.name() == "grpc_location"sv) {
58+
source.SetGrpcEndpoint(attr.value());
59+
}
60+
}
61+
62+
if (source.GetGrpcEndpoint().empty()) {
63+
source.SetGrpcEndpoint(config->GetCluster());
64+
}
65+
66+
return source;
67+
}
68+
69+
} // namespace NYql::NSo
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
4+
#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
5+
6+
namespace NYql::NSo {
7+
8+
NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType);
9+
10+
std::map<TString, TString> ExtractSelectorValues(const TString& selectors);
11+
12+
NProto::TDqSolomonSource FillSolomonSource(const TSolomonClusterConfig* config, const TString& project);
13+
14+
} // namespace NYql::NSo
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
util.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/library/yql/providers/solomon/proto
9+
yql/essentials/providers/common/proto
10+
yql/essentials/utils
11+
)
12+
13+
END()

ydb/library/yql/providers/solomon/events/events.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ struct TEvSolomonProvider {
7171
};
7272

7373
struct TEvPointsCountBatch : public NActors::TEventLocal<TEvPointsCountBatch, EvPointsCountBatch> {
74-
std::vector<NSo::TMetric> Metrics;
74+
NSo::TMetric Metric;
7575
NSo::TGetPointsCountResponse Response;
76-
TEvPointsCountBatch(std::vector<NSo::TMetric>&& metrics, const NSo::TGetPointsCountResponse& response)
77-
: Metrics(std::move(metrics))
76+
TEvPointsCountBatch(NSo::TMetric&& metric, const NSo::TGetPointsCountResponse& response)
77+
: Metric(std::move(metric))
7878
, Response(response)
7979
{}
8080
};

ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,14 @@ message TDownsampling {
4949

5050
message TDqSolomonSource {
5151
ESolomonClusterType ClusterType = 1;
52-
string Endpoint = 2;
52+
reserved 2;
5353
bool UseSsl = 3;
5454
string ServiceAccount = 4;
5555
TToken Token = 5;
5656

5757
string Project = 6;
58-
oneof p {
59-
string Selectors = 7;
60-
string Program = 8;
61-
}
58+
map<string, string> Selectors = 7;
59+
optional string Program = 8;
6260
// seconds since Epoch
6361
int64 From = 9;
6462
int64 To = 10;
@@ -67,4 +65,7 @@ message TDqSolomonSource {
6765
repeated string LabelNames = 13;
6866
map<string, string> Settings = 14;
6967
repeated string RequiredLabelNames = 15;
68+
string HttpEndpoint = 16;
69+
string GrpcEndpoint = 17;
70+
optional string Cluster = 18;
7071
}

ydb/library/yql/providers/solomon/provider/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ PEERDIR(
2323
ydb/library/yql/providers/common/token_accessor/client
2424
ydb/library/yql/providers/dq/expr_nodes
2525
ydb/library/yql/providers/solomon/actors
26+
ydb/library/yql/providers/solomon/common
2627
ydb/library/yql/providers/solomon/expr_nodes
2728
ydb/library/yql/providers/solomon/proto
2829
ydb/library/yql/providers/solomon/scheme

ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,17 @@ class TSolomonDataSource : public TDataProviderBase {
4242
cluster.SetToken(token);
4343
cluster.SetUseSsl(properties.Value("use_ssl", "true") == "true"sv);
4444

45-
if (auto value = properties.Value("grpc_port", ""); !value.empty()) {
45+
if (properties.Value("project", "") && properties.Value("cluster", "")) {
46+
cluster.SetClusterType(TSolomonClusterConfig::SCT_MONITORING);
47+
cluster.MutablePath()->SetProject(properties.Value("project", ""));
48+
cluster.MutablePath()->SetCluster(properties.Value("cluster", ""));
49+
} else {
50+
cluster.SetClusterType(TSolomonClusterConfig::SCT_SOLOMON);
51+
}
52+
53+
if (auto value = properties.Value("grpc_location", "")) {
4654
auto grpcPort = cluster.MutableSettings()->Add();
47-
*grpcPort->MutableName() = "grpcPort";
55+
*grpcPort->MutableName() = "grpc_location";
4856
*grpcPort->MutableValue() = value;
4957
}
5058

ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
55
#include <yql/essentials/providers/common/provider/yql_provider.h>
66

7+
#include <util/string/strip.h>
8+
#include <util/string/split.h>
9+
710
namespace NYql {
811

912
using namespace NNodes;

0 commit comments

Comments
 (0)