Skip to content

Commit c2c9b72

Browse files
committed
Add read iterator cancellation to ReadRows RPC
(cherry picked from commit 4ee8a57)
1 parent f3ea51b commit c2c9b72

File tree

5 files changed

+288
-13
lines changed

5 files changed

+288
-13
lines changed

ydb/core/grpc_services/rpc_read_rows.cpp

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ struct RequestedKeyColumn {
4040
struct TShardReadState {
4141
std::vector<TOwnedCellVec> Keys;
4242
ui32 FirstUnprocessedQuery = 0;
43+
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
4344
};
4445

4546
}
@@ -338,6 +339,14 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
338339
Become(&TThis::MainState);
339340

340341
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC bootstraped ");
342+
343+
auto selfId = ctx.SelfID;
344+
auto* actorSystem = ctx.ActorSystem();
345+
auto clientLostCb = [selfId, actorSystem]() {
346+
actorSystem->Send(selfId, new TRpcServices::TEvForgetOperation());
347+
};
348+
349+
Request->SetFinishAction(std::move(clientLostCb));
341350
}
342351

343352
bool ResolveTable() {
@@ -454,7 +463,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
454463
}
455464
}
456465

457-
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev) {
466+
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
458467
TEvTxProxySchemeCache::TEvResolveKeySetResult *msg = ev->Get();
459468
auto& resolvePartitionsResult = msg->Request;
460469

@@ -519,8 +528,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
519528
if (it == ShardIdToReadState.end()) {
520529
TStringStream ss;
521530
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
522-
ReplyWithError(statusCode, ss.Str(), &issues);
523-
return;
531+
return ReplyWithError(statusCode, ss.Str(), &issues);
524532
}
525533

526534
switch (statusCode) {
@@ -531,6 +539,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
531539
if (Retries < MaxTotalRetries) {
532540
TStringStream ss;
533541
ss << "Reached MaxRetries count for DataShard# " << shardId << ", status# " << statusCode;
542+
it->second.Status = statusCode;
534543
ReplyWithError(statusCode, ss.Str(), &issues);
535544
} else {
536545
SendRead(shardId, it->second);
@@ -545,8 +554,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
545554
if (statusCode != Ydb::StatusIds::OVERLOADED) {
546555
statusCode = Ydb::StatusIds::ABORTED;
547556
}
548-
ReplyWithError(statusCode, ss.Str(), &issues);
549-
return;
557+
it->second.Status = statusCode;
558+
return ReplyWithError(statusCode, ss.Str(), &issues);
550559
}
551560
}
552561
if (!msg->Record.HasFinished() || !msg->Record.GetFinished()) {
@@ -562,6 +571,9 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
562571
// we just wait for the next batch of results.
563572
it->second.FirstUnprocessedQuery = token.GetFirstUnprocessedQuery();
564573
ReadsInFlight++;
574+
} else {
575+
// Read for this shard has finished
576+
it->second.Status = statusCode;
565577
}
566578
}
567579
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC TEvReadResult RowsCount: " << msg->GetRowsCount());
@@ -720,14 +732,48 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
720732
PassAway();
721733
}
722734

735+
void CancelReads() {
736+
TStringStream ss;
737+
ss << "TReadRowsRPC CancelReads, shardIds# [";
738+
739+
bool hasActiveReads = false;
740+
741+
for (const auto& [shardId, state] : ShardIdToReadState) {
742+
if (state.Status != Ydb::StatusIds::STATUS_CODE_UNSPECIFIED) {
743+
// Read has already finished for this shard
744+
continue;
745+
}
746+
auto request = std::make_unique<TEvDataShard::TEvReadCancel>();
747+
auto& record = request->Record;
748+
record.SetReadId(shardId); // shardId is also a readId
749+
Send(PipeCache, new TEvPipeCache::TEvForward(request.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());
750+
ss << shardId << ", ";
751+
hasActiveReads = true;
752+
}
753+
754+
ss << "]";
755+
756+
if (hasActiveReads) {
757+
LOG_WARN_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, ss.Str());
758+
}
759+
}
760+
723761
void HandleTimeout(TEvents::TEvWakeup::TPtr&) {
724-
return ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "ReadRows from table " << GetTable()
762+
ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "ReadRows from table " << GetTable()
725763
<< " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec");
726764
}
727765

766+
void HandleForget(TRpcServices::TEvForgetOperation::TPtr& ev) {
767+
Y_UNUSED(ev);
768+
769+
ReplyWithError(Ydb::StatusIds::CANCELLED, TStringBuilder() << "ReadRows from table " << GetTable()
770+
<< " cancelled, because client disconnected");
771+
}
772+
728773
void ReplyWithError(const Ydb::StatusIds::StatusCode& status, const TString& errorMsg,
729774
const ::google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues = nullptr)
730775
{
776+
CancelReads();
731777
LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC ReplyWithError: " << errorMsg);
732778
SendResult(status, errorMsg, issues);
733779
}
@@ -736,7 +782,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
736782
return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly");
737783
}
738784

739-
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev) {
785+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
740786
return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Failed to connect to shard " << ev->Get()->TabletId);
741787
}
742788

@@ -760,6 +806,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
760806
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
761807

762808
hFunc(TEvents::TEvWakeup, HandleTimeout);
809+
hFunc(TRpcServices::TEvForgetOperation, HandleForget);
763810
}
764811
}
765812

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "kqp_ut_common.h"
22

33
#include <ydb/core/base/backtrace.h>
4+
#include <ydb/core/tx/datashard/datashard.h>
45
#include <ydb/core/tx/schemeshard/schemeshard.h>
56
#include <ydb/core/kqp/counters/kqp_counters.h>
67
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
@@ -139,6 +140,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
139140
if (!settings.FeatureFlags.HasEnableOlapCompression()) {
140141
ServerSettings->SetEnableOlapCompression(true);
141142
}
143+
ServerSettings->Controls = settings.Controls;
142144

143145
if (settings.Storage) {
144146
ServerSettings->SetCustomDiskParams(*settings.Storage);
@@ -153,7 +155,15 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
153155
}
154156

155157
Server.Reset(MakeHolder<Tests::TServer>(*ServerSettings));
156-
Server->EnableGRpc(grpcPort);
158+
159+
if (settings.GrpcServerOptions) {
160+
auto options = settings.GrpcServerOptions;
161+
options->SetPort(grpcPort);
162+
options->SetHost("localhost");
163+
Server->EnableGRpc(*options);
164+
} else {
165+
Server->EnableGRpc(grpcPort);
166+
}
157167

158168
RunCall([this, domain = settings.DomainRoot] {
159169
this->Server->SetupDefaultProfiles();
@@ -1396,7 +1406,7 @@ void Grant(NYdb::NTable::TSession& adminSession, const char* permissions, const
13961406
);
13971407
auto result = adminSession.ExecuteSchemeQuery(grantQuery).ExtractValueSync();
13981408
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1399-
};
1409+
};
14001410

14011411
THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TActorId& sender,
14021412
const TString& path, NSchemeCache::TSchemeCacheNavigate::EOp op)
@@ -1556,7 +1566,7 @@ void CheckTableReads(NYdb::NTable::TSession& session, const TString& tableName,
15561566
const TString selectPartitionStats(Q_(Sprintf(R"(
15571567
SELECT *
15581568
FROM `/Root/.sys/partition_stats`
1559-
WHERE FollowerId %s 0 AND (RowReads != 0 OR RangeReadRows != 0) AND Path = '%s'
1569+
WHERE FollowerId %s 0 AND (RowReads != 0 OR RangeReadRows != 0) AND Path = '%s'
15601570
)", (checkFollower ? "!=" : "="), tableName.c_str())));
15611571

15621572
auto result = session.ExecuteDataQuery(selectPartitionStats, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
@@ -1574,7 +1584,35 @@ void CheckTableReads(NYdb::NTable::TSession& session, const TString& tableName,
15741584
Y_FAIL("!readsExpected, but there are read stats for %s", tableName.c_str());
15751585
}
15761586
}
1577-
Y_FAIL("readsExpected, but there is timeout waiting for read stats from %s", tableName.c_str());
1587+
Y_FAIL("readsExpected, but there is timeout waiting for read stats from %s", tableName.c_str());
1588+
}
1589+
1590+
TTableId ResolveTableId(Tests::TServer* server, TActorId sender, const TString& path) {
1591+
auto response = Navigate(*server->GetRuntime(), sender, path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
1592+
return response->ResultSet.at(0).TableId;
1593+
}
1594+
1595+
NKikimrTxDataShard::TEvCompactTableResult CompactTable(
1596+
Tests::TServer* server, ui64 shardId, const TTableId& tableId, bool compactBorrowed)
1597+
{
1598+
TTestActorRuntime* runtime = server->GetRuntime();
1599+
auto sender = runtime->AllocateEdgeActor();
1600+
auto request = MakeHolder<TEvDataShard::TEvCompactTable>(tableId.PathId);
1601+
request->Record.SetCompactBorrowed(compactBorrowed);
1602+
runtime->SendToPipe(shardId, sender, request.Release(), 0, GetPipeConfigWithRetries());
1603+
1604+
auto ev = runtime->GrabEdgeEventRethrow<TEvDataShard::TEvCompactTableResult>(sender);
1605+
return ev->Get()->Record;
1606+
}
1607+
1608+
void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed) {
1609+
TTestActorRuntime* runtime = server->GetRuntime();
1610+
auto sender = runtime->AllocateEdgeActor();
1611+
auto shards = GetTableShards(server, sender, path);
1612+
auto tableId = ResolveTableId(server, sender, path);
1613+
for (auto shard : shards) {
1614+
CompactTable(server, shard, tableId, compactBorrowed);
1615+
}
15781616
}
15791617

15801618
NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) {

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
8989
NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>();
9090
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
9191
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory = NYql::NDq::CreateDefaultS3ActorsFactory();
92+
NKikimrConfig::TImmediateControlsConfig Controls;
93+
TMaybe<NYdbGrpc::TServerOptions> GrpcServerOptions;
9294

9395
TKikimrSettings()
9496
{
@@ -126,6 +128,8 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
126128
AppConfig.MutableColumnShardConfig()->SetAlterObjectEnabled(enable);
127129
return *this;
128130
}
131+
TKikimrSettings& SetControls(const NKikimrConfig::TImmediateControlsConfig& value) { Controls = value; return *this; }
132+
TKikimrSettings& SetGrpcServerOptions(const NYdbGrpc::TServerOptions& grpcServerOptions) { GrpcServerOptions = grpcServerOptions; return *this; };
129133
};
130134

131135
class TKikimrRunner {
@@ -390,6 +394,8 @@ int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const
390394

391395
void CheckTableReads(NYdb::NTable::TSession& session, const TString& tableName, bool checkFollower, bool readsExpected);
392396

397+
void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed = false);
398+
393399
bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference);
394400

395401
struct TGetPlanParams {

0 commit comments

Comments
 (0)