Skip to content

Commit f3b5e93

Browse files
authored
[YQ-4213] Support streaming disposition (#17360)
1 parent 37c19de commit f3b5e93

File tree

11 files changed

+326
-25
lines changed

11 files changed

+326
-25
lines changed

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1548,8 +1548,8 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
15481548
dqConfiguration->FallbackPolicy = EFallbackPolicy::Never;
15491549

15501550
bool enableCheckpointCoordinator =
1551-
Params.QueryType == FederatedQuery::QueryContent::STREAMING &&
1552-
Params.Config.GetCheckpointCoordinator().GetEnabled() &&
1551+
Params.QueryType == FederatedQuery::QueryContent::STREAMING &&
1552+
Params.Config.GetCheckpointCoordinator().GetEnabled() &&
15531553
!dqConfiguration->DisableCheckpoints.Get().GetOrElse(false);
15541554

15551555
ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), enableCheckpointCoordinator));
@@ -1995,7 +1995,27 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
19951995
{
19961996
auto pqGateway = Params.PqGatewayFactory->CreatePqGateway();
19971997
pqGateway->UpdateClusterConfigs(std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()));
1998-
dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver));
1998+
NYql::NPq::NProto::StreamingDisposition disposition;
1999+
switch (Params.StreamingDisposition.GetDispositionCase()) {
2000+
case FederatedQuery::StreamingDisposition::kOldest:
2001+
*disposition.mutable_oldest() = Params.StreamingDisposition.oldest();
2002+
break;
2003+
case FederatedQuery::StreamingDisposition::kFresh:
2004+
*disposition.mutable_fresh() = Params.StreamingDisposition.fresh();
2005+
break;
2006+
case FederatedQuery::StreamingDisposition::kFromTime:
2007+
*disposition.mutable_from_time()->mutable_timestamp() = Params.StreamingDisposition.from_time().timestamp();
2008+
break;
2009+
case FederatedQuery::StreamingDisposition::kTimeAgo:
2010+
*disposition.mutable_time_ago()->mutable_duration() = Params.StreamingDisposition.time_ago().duration();
2011+
break;
2012+
case FederatedQuery::StreamingDisposition::kFromLastCheckpoint:
2013+
disposition.mutable_from_last_checkpoint()->set_force(Params.StreamingDisposition.from_last_checkpoint().force());
2014+
break;
2015+
case FederatedQuery::StreamingDisposition::DISPOSITION_NOT_SET:
2016+
break;
2017+
}
2018+
dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver, std::move(disposition)));
19992019
}
20002020

20012021
{

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,71 @@
11
#include "dq_pq_read_actor.h"
22

3+
#include <library/cpp/protobuf/interop/cast.h>
4+
#include <yql/essentials/minikql/comp_nodes/mkql_saveload.h>
5+
#include <yql/essentials/utils/log/log.h>
6+
#include <ydb/library/actors/core/log.h>
7+
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>
38
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
49
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
510
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
611
#include <ydb/library/yql/dq/common/dq_common.h>
7-
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>
8-
9-
#include <yql/essentials/minikql/comp_nodes/mkql_saveload.h>
1012
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h>
13+
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
1114
#include <ydb/library/yql/providers/pq/proto/dq_io_state.pb.h>
12-
#include <yql/essentials/utils/log/log.h>
1315

14-
#include <ydb/library/actors/core/log.h>
16+
namespace NYql::NDq::NInternal {
1517

16-
using namespace NYql::NDq::NInternal;
18+
namespace {
19+
20+
TInstant TrimToMillis(TInstant instant) {
21+
return TInstant::MilliSeconds(instant.MilliSeconds());
22+
}
23+
24+
// StartingMessageTimestamp is serialized as milliseconds, so drop microseconds part to be consistent with storage
25+
TInstant InitStartingMessageTimestamp(const NPq::NProto::StreamingDisposition& disposition) {
26+
return TrimToMillis([&]() -> TInstant {
27+
switch (disposition.GetDispositionCase()) {
28+
case NPq::NProto::StreamingDisposition::kOldest:
29+
return TInstant::Zero();
30+
case NPq::NProto::StreamingDisposition::kFresh:
31+
return TInstant::Now();
32+
case NPq::NProto::StreamingDisposition::kFromTime:
33+
return NProtoInterop::CastFromProto(disposition.from_time().timestamp());
34+
case NPq::NProto::StreamingDisposition::kTimeAgo:
35+
return TInstant::Now() - NProtoInterop::CastFromProto(disposition.time_ago().duration());
36+
case NPq::NProto::StreamingDisposition::kFromLastCheckpoint:
37+
[[fallthrough]];
38+
case NPq::NProto::StreamingDisposition::DISPOSITION_NOT_SET:
39+
return TInstant::Now();
40+
}
41+
}());
42+
}
43+
44+
} // anonymous namespace
1745

1846
constexpr ui32 StateVersion = 1;
1947

2048
#define SRC_LOG_D(s) \
2149
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
2250

51+
TDqPqReadActorBase::TDqPqReadActorBase(
52+
ui64 inputIndex,
53+
ui64 taskId,
54+
NActors::TActorId selfId,
55+
const TTxId& txId,
56+
NPq::NProto::TDqPqTopicSource&& sourceParams,
57+
NPq::NProto::TDqReadTaskParams&& readParams,
58+
const NActors::TActorId& computeActorId)
59+
: InputIndex(inputIndex)
60+
, TxId(txId)
61+
, SourceParams(std::move(sourceParams))
62+
, StartingMessageTimestamp(InitStartingMessageTimestamp(SourceParams.GetDisposition()))
63+
, LogPrefix(TStringBuilder() << "SelfId: " << selfId << ", TxId: " << txId << ", task: " << taskId << ". PQ source. ")
64+
, ReadParams(std::move(readParams))
65+
, ComputeActorId(computeActorId)
66+
, TaskId(taskId) {
67+
}
68+
2369
void TDqPqReadActorBase::SaveState(const NDqProto::TCheckpoint& /*checkpoint*/, TSourceState& state) {
2470
NPq::NProto::TDqPqTopicSourceState stateProto;
2571

@@ -91,3 +137,5 @@ ui64 TDqPqReadActorBase::GetInputIndex() const {
91137
const NYql::NDq::TDqAsyncStats& TDqPqReadActorBase::GetIngressStats() const {
92138
return IngressStats;
93139
}
140+
141+
} // namespace NYql::NDq::NInternal

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
namespace NYql::NDq::NInternal {
99

1010
class TDqPqReadActorBase : public IDqComputeActorAsyncInput {
11-
1211
public:
1312
using TPartitionKey = ::NPq::TPartitionKey;
1413

@@ -30,16 +29,7 @@ class TDqPqReadActorBase : public IDqComputeActorAsyncInput {
3029
const TTxId& txId,
3130
NPq::NProto::TDqPqTopicSource&& sourceParams,
3231
NPq::NProto::TDqReadTaskParams&& readParams,
33-
const NActors::TActorId& computeActorId)
34-
: InputIndex(inputIndex)
35-
, TxId(txId)
36-
, SourceParams(std::move(sourceParams))
37-
, StartingMessageTimestamp(TInstant::MilliSeconds(TInstant::Now().MilliSeconds())) // this field is serialized as milliseconds, so drop microseconds part to be consistent with storage
38-
, LogPrefix(TStringBuilder() << "SelfId: " << selfId << ", TxId: " << txId << ", task: " << taskId << ". PQ source. ")
39-
, ReadParams(std::move(readParams))
40-
, ComputeActorId(computeActorId)
41-
, TaskId(taskId) {
42-
}
32+
const NActors::TActorId& computeActorId);
4333

4434
public:
4535
void SaveState(const NDqProto::TCheckpoint& checkpoint, TSourceState& state) override;

ydb/library/yql/providers/pq/async_io/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SRCS(
1010
)
1111

1212
PEERDIR(
13+
library/cpp/protobuf/interop
1314
ydb/core/fq/libs/graph_params/proto
1415
ydb/core/fq/libs/protos
1516
ydb/core/fq/libs/row_dispatcher

ydb/library/yql/providers/pq/proto/dq_io.proto

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ option cc_enable_arenas = true;
33

44
package NYql.NPq.NProto;
55

6+
import "google/protobuf/duration.proto";
7+
import "google/protobuf/timestamp.proto";
8+
import "google/protobuf/empty.proto";
9+
610
message TToken {
711
string Name = 1; // Key in secure params.
812
// TODO: token type (oauth, tvm, iam etc).
@@ -28,6 +32,33 @@ message TDqPqFederatedCluster {
2832
uint32 PartitionsCount = 4;
2933
}
3034

35+
// For streaming queries only
36+
message StreamingDisposition {
37+
message FromTime {
38+
google.protobuf.Timestamp timestamp = 1;
39+
}
40+
41+
message TimeAgo {
42+
google.protobuf.Duration duration = 1;
43+
}
44+
45+
message FromLastCheckpoint {
46+
// By default if new query streams set doesn't equal to old query streams set,
47+
// error will occur and query won't be allowed to load offsets for streams for the last checkpoint.
48+
// If this flag is set all offsets that can be matched with previous query checkpoint will be matched.
49+
// Others will use "fresh" streaming disposition.
50+
bool force = 1;
51+
}
52+
53+
oneof disposition {
54+
google.protobuf.Empty oldest = 1; // Start processing with the oldest offset
55+
google.protobuf.Empty fresh = 2; // Start processing with the fresh offset
56+
FromTime from_time = 3; // Start processing with offset from the specified time
57+
TimeAgo time_ago = 4; // Start processing with offset some time ago
58+
FromLastCheckpoint from_last_checkpoint = 5; // Start processing with offset which corresponds to the last checkpoint
59+
}
60+
}
61+
3162
message TDqPqTopicSource {
3263
string TopicPath = 1;
3364
string ConsumerName = 2;
@@ -50,6 +81,7 @@ message TDqPqTopicSource {
5081
string Format = 19;
5182
string RowType = 20; // Final row type with metadata columns
5283
repeated TDqPqFederatedCluster FederatedClusters = 21;
84+
StreamingDisposition Disposition = 22;
5385
}
5486

5587
message TDqPqTopicSink {

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
251251
srcDesc.SetPredicate(predicateSql);
252252
srcDesc.SetSharedReading(true);
253253
}
254+
*srcDesc.MutableDisposition() = State_->Disposition;
254255
protoSettings.PackFrom(srcDesc);
255256
if (sharedReading && !predicateSql.empty()) {
256257
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "Row dispatcher will use the predicate: " + predicateSql));

ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ namespace NYql {
1212
TDataProviderInitializer GetPqDataProviderInitializer(
1313
IPqGateway::TPtr gateway,
1414
bool supportRtmrMode,
15-
std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver) {
16-
return [gateway, supportRtmrMode, dbResolver] (
15+
std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
16+
const NPq::NProto::StreamingDisposition& disposition) {
17+
return [gateway, supportRtmrMode, dbResolver, disposition] (
1718
const TString& userName,
1819
const TString& sessionId,
1920
const TGatewaysConfig* gatewaysConfig,
@@ -38,6 +39,7 @@ TDataProviderInitializer GetPqDataProviderInitializer(
3839
state->Types = typeCtx.Get();
3940
state->FunctionRegistry = functionRegistry;
4041
state->DbResolver = dbResolver;
42+
state->Disposition = disposition;
4143
if (gatewaysConfig) {
4244
state->Configuration->Init(gatewaysConfig->GetPq(), typeCtx, dbResolver, state->DatabaseIds);
4345
}

ydb/library/yql/providers/pq/provider/yql_pq_provider.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
77
#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h>
88
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
9+
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
910

1011
namespace NKikimr::NMiniKQL {
1112
class IFunctionRegistry;
@@ -54,12 +55,14 @@ struct TPqState : public TThrRefBase {
5455
THolder<IDqIntegration> DqIntegration;
5556
THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
5657
std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
58+
NPq::NProto::StreamingDisposition Disposition;
5759
};
5860

5961
TDataProviderInitializer GetPqDataProviderInitializer(
6062
IPqGateway::TPtr gateway,
6163
bool supportRtmrMode = false,
62-
std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr
64+
std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr,
65+
const NPq::NProto::StreamingDisposition& disposition = {}
6366
);
6467

6568
} // namespace NYql

0 commit comments

Comments
 (0)