Skip to content

Commit d63f67e

Browse files
committed
Add read iterator cancellation to ReadRows RPC
(cherry picked from commit 4ee8a57)
1 parent d9cb943 commit d63f67e

File tree

5 files changed

+285
-10
lines changed

5 files changed

+285
-10
lines changed

ydb/core/grpc_services/rpc_read_rows.cpp

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ struct RequestedKeyColumn {
4040
struct TShardReadState {
4141
std::vector<TOwnedCellVec> Keys;
4242
ui32 FirstUnprocessedQuery = 0;
43+
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
4344
};
4445

4546
}
@@ -338,6 +339,14 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
338339
Become(&TThis::MainState);
339340

340341
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC bootstraped ");
342+
343+
auto selfId = ctx.SelfID;
344+
auto* actorSystem = ctx.ActorSystem();
345+
auto clientLostCb = [selfId, actorSystem]() {
346+
actorSystem->Send(selfId, new TRpcServices::TEvForgetOperation());
347+
};
348+
349+
Request->SetFinishAction(std::move(clientLostCb));
341350
}
342351

343352
bool ResolveTable() {
@@ -454,7 +463,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
454463
}
455464
}
456465

457-
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev) {
466+
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
458467
TEvTxProxySchemeCache::TEvResolveKeySetResult *msg = ev->Get();
459468
auto& resolvePartitionsResult = msg->Request;
460469

@@ -519,8 +528,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
519528
if (it == ShardIdToReadState.end()) {
520529
TStringStream ss;
521530
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
522-
ReplyWithError(statusCode, ss.Str(), &issues);
523-
return;
531+
return ReplyWithError(statusCode, ss.Str(), &issues);
524532
}
525533

526534
switch (statusCode) {
@@ -531,6 +539,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
531539
if (Retries < MaxTotalRetries) {
532540
TStringStream ss;
533541
ss << "Reached MaxRetries count for DataShard# " << shardId << ", status# " << statusCode;
542+
it->second.Status = statusCode;
534543
ReplyWithError(statusCode, ss.Str(), &issues);
535544
} else {
536545
SendRead(shardId, it->second);
@@ -545,8 +554,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
545554
if (statusCode != Ydb::StatusIds::OVERLOADED) {
546555
statusCode = Ydb::StatusIds::ABORTED;
547556
}
548-
ReplyWithError(statusCode, ss.Str(), &issues);
549-
return;
557+
it->second.Status = statusCode;
558+
return ReplyWithError(statusCode, ss.Str(), &issues);
550559
}
551560
}
552561
if (!msg->Record.HasFinished() || !msg->Record.GetFinished()) {
@@ -562,6 +571,9 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
562571
// we just wait for the next batch of results.
563572
it->second.FirstUnprocessedQuery = token.GetFirstUnprocessedQuery();
564573
ReadsInFlight++;
574+
} else {
575+
// Read for this shard has finished
576+
it->second.Status = statusCode;
565577
}
566578
}
567579
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC TEvReadResult RowsCount: " << msg->GetRowsCount());
@@ -720,14 +732,48 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
720732
PassAway();
721733
}
722734

735+
void CancelReads() {
736+
TStringStream ss;
737+
ss << "TReadRowsRPC CancelReads, shardIds# [";
738+
739+
bool hasActiveReads = false;
740+
741+
for (const auto& [shardId, state] : ShardIdToReadState) {
742+
if (state.Status != Ydb::StatusIds::STATUS_CODE_UNSPECIFIED) {
743+
// Read has already finished for this shard
744+
continue;
745+
}
746+
auto request = std::make_unique<TEvDataShard::TEvReadCancel>();
747+
auto& record = request->Record;
748+
record.SetReadId(shardId); // shardId is also a readId
749+
Send(PipeCache, new TEvPipeCache::TEvForward(request.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());
750+
ss << shardId << ", ";
751+
hasActiveReads = true;
752+
}
753+
754+
ss << "]";
755+
756+
if (hasActiveReads) {
757+
LOG_WARN_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, ss.Str());
758+
}
759+
}
760+
723761
void HandleTimeout(TEvents::TEvWakeup::TPtr&) {
724-
return ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "ReadRows from table " << GetTable()
762+
ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "ReadRows from table " << GetTable()
725763
<< " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec");
726764
}
727765

766+
void HandleForget(TRpcServices::TEvForgetOperation::TPtr& ev) {
767+
Y_UNUSED(ev);
768+
769+
ReplyWithError(Ydb::StatusIds::CANCELLED, TStringBuilder() << "ReadRows from table " << GetTable()
770+
<< " cancelled, because client disconnected");
771+
}
772+
728773
void ReplyWithError(const Ydb::StatusIds::StatusCode& status, const TString& errorMsg,
729774
const ::google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues = nullptr)
730775
{
776+
CancelReads();
731777
LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC ReplyWithError: " << errorMsg);
732778
SendResult(status, errorMsg, issues);
733779
}
@@ -736,7 +782,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
736782
return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly");
737783
}
738784

739-
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev) {
785+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
740786
return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Failed to connect to shard " << ev->Get()->TabletId);
741787
}
742788

@@ -760,6 +806,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
760806
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
761807

762808
hFunc(TEvents::TEvWakeup, HandleTimeout);
809+
hFunc(TRpcServices::TEvForgetOperation, HandleForget);
763810
}
764811
}
765812

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "kqp_ut_common.h"
22

33
#include <ydb/core/base/backtrace.h>
4+
#include <ydb/core/tx/datashard/datashard.h>
45
#include <ydb/core/tx/schemeshard/schemeshard.h>
56
#include <ydb/core/kqp/counters/kqp_counters.h>
67
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
@@ -139,6 +140,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
139140
if (!settings.FeatureFlags.HasEnableOlapCompression()) {
140141
ServerSettings->SetEnableOlapCompression(true);
141142
}
143+
ServerSettings->Controls = settings.Controls;
142144

143145
if (settings.Storage) {
144146
ServerSettings->SetCustomDiskParams(*settings.Storage);
@@ -153,7 +155,15 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
153155
}
154156

155157
Server.Reset(MakeHolder<Tests::TServer>(*ServerSettings));
156-
Server->EnableGRpc(grpcPort);
158+
159+
if (settings.GrpcServerOptions) {
160+
auto options = settings.GrpcServerOptions;
161+
options->SetPort(grpcPort);
162+
options->SetHost("localhost");
163+
Server->EnableGRpc(*options);
164+
} else {
165+
Server->EnableGRpc(grpcPort);
166+
}
157167

158168
RunCall([this, domain = settings.DomainRoot] {
159169
this->Server->SetupDefaultProfiles();
@@ -1470,6 +1480,34 @@ void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
14701480
UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
14711481
}
14721482

1483+
TTableId ResolveTableId(Tests::TServer* server, TActorId sender, const TString& path) {
1484+
auto response = Navigate(*server->GetRuntime(), sender, path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
1485+
return response->ResultSet.at(0).TableId;
1486+
}
1487+
1488+
NKikimrTxDataShard::TEvCompactTableResult CompactTable(
1489+
Tests::TServer* server, ui64 shardId, const TTableId& tableId, bool compactBorrowed)
1490+
{
1491+
TTestActorRuntime* runtime = server->GetRuntime();
1492+
auto sender = runtime->AllocateEdgeActor();
1493+
auto request = MakeHolder<TEvDataShard::TEvCompactTable>(tableId.PathId);
1494+
request->Record.SetCompactBorrowed(compactBorrowed);
1495+
runtime->SendToPipe(shardId, sender, request.Release(), 0, GetPipeConfigWithRetries());
1496+
1497+
auto ev = runtime->GrabEdgeEventRethrow<TEvDataShard::TEvCompactTableResult>(sender);
1498+
return ev->Get()->Record;
1499+
}
1500+
1501+
void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed) {
1502+
TTestActorRuntime* runtime = server->GetRuntime();
1503+
auto sender = runtime->AllocateEdgeActor();
1504+
auto shards = GetTableShards(server, sender, path);
1505+
auto tableId = ResolveTableId(server, sender, path);
1506+
for (auto shard : shards) {
1507+
CompactTable(server, shard, tableId, compactBorrowed);
1508+
}
1509+
}
1510+
14731511
NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) {
14741512
if (auto ops = opt.GetMapSafe().find("Operators"); ops != opt.GetMapSafe().end()) {
14751513
auto opName = ops->second.GetArraySafe()[0].GetMapSafe().at("Name").GetStringSafe();

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
8989
NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>();
9090
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
9191
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory = NYql::NDq::CreateDefaultS3ActorsFactory();
92+
NKikimrConfig::TImmediateControlsConfig Controls;
93+
TMaybe<NYdbGrpc::TServerOptions> GrpcServerOptions;
9294

9395
TKikimrSettings()
9496
{
@@ -126,6 +128,8 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
126128
AppConfig.MutableColumnShardConfig()->SetAlterObjectEnabled(enable);
127129
return *this;
128130
}
131+
TKikimrSettings& SetControls(const NKikimrConfig::TImmediateControlsConfig& value) { Controls = value; return *this; }
132+
TKikimrSettings& SetGrpcServerOptions(const NYdbGrpc::TServerOptions& grpcServerOptions) { GrpcServerOptions = grpcServerOptions; return *this; };
129133
};
130134

131135
class TKikimrRunner {
@@ -383,6 +387,8 @@ TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, cons
383387
void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
384388
void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);
385389

390+
void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed = false);
391+
386392
bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference);
387393

388394
struct TGetPlanParams {

0 commit comments

Comments
 (0)