Skip to content

Commit 7ddf958

Browse files
committed
Add read iterator cancellation to ReadRows RPC (#19376)
(cherry picked from commit 4ee8a57)
1 parent 8e0bc89 commit 7ddf958

File tree

5 files changed

+179
-22
lines changed

5 files changed

+179
-22
lines changed

ydb/core/grpc_services/rpc_read_rows.cpp

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ struct RequestedKeyColumn {
4040
struct TShardReadState {
4141
std::vector<TOwnedCellVec> Keys;
4242
ui32 FirstUnprocessedQuery = 0;
43+
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
4344
};
4445

4546
}
@@ -338,6 +339,14 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
338339
Become(&TThis::MainState);
339340

340341
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC bootstraped ");
342+
343+
auto selfId = ctx.SelfID;
344+
auto* actorSystem = ctx.ActorSystem();
345+
auto clientLostCb = [selfId, actorSystem]() {
346+
actorSystem->Send(selfId, new TRpcServices::TEvForgetOperation());
347+
};
348+
349+
Request->SetFinishAction(std::move(clientLostCb));
341350
}
342351

343352
bool ResolveTable() {
@@ -454,7 +463,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
454463
}
455464
}
456465

457-
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev) {
466+
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
458467
TEvTxProxySchemeCache::TEvResolveKeySetResult *msg = ev->Get();
459468
auto& resolvePartitionsResult = msg->Request;
460469

@@ -519,8 +528,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
519528
if (it == ShardIdToReadState.end()) {
520529
TStringStream ss;
521530
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
522-
ReplyWithError(statusCode, ss.Str(), &issues);
523-
return;
531+
return ReplyWithError(statusCode, ss.Str(), &issues);
524532
}
525533

526534
switch (statusCode) {
@@ -531,6 +539,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
531539
if (Retries < MaxTotalRetries) {
532540
TStringStream ss;
533541
ss << "Reached MaxRetries count for DataShard# " << shardId << ", status# " << statusCode;
542+
it->second.Status = statusCode;
534543
ReplyWithError(statusCode, ss.Str(), &issues);
535544
} else {
536545
SendRead(shardId, it->second);
@@ -545,8 +554,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
545554
if (statusCode != Ydb::StatusIds::OVERLOADED) {
546555
statusCode = Ydb::StatusIds::ABORTED;
547556
}
548-
ReplyWithError(statusCode, ss.Str(), &issues);
549-
return;
557+
it->second.Status = statusCode;
558+
return ReplyWithError(statusCode, ss.Str(), &issues);
550559
}
551560
}
552561
if (!msg->Record.HasFinished() || !msg->Record.GetFinished()) {
@@ -562,6 +571,9 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
562571
// we just wait for the next batch of results.
563572
it->second.FirstUnprocessedQuery = token.GetFirstUnprocessedQuery();
564573
ReadsInFlight++;
574+
} else {
575+
// Read for this shard has finished
576+
it->second.Status = statusCode;
565577
}
566578
}
567579
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC TEvReadResult RowsCount: " << msg->GetRowsCount());
@@ -720,14 +732,48 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
720732
PassAway();
721733
}
722734

735+
void CancelReads() {
736+
TStringStream ss;
737+
ss << "TReadRowsRPC CancelReads, shardIds# [";
738+
739+
bool hasActiveReads = false;
740+
741+
for (const auto& [shardId, state] : ShardIdToReadState) {
742+
if (state.Status != Ydb::StatusIds::STATUS_CODE_UNSPECIFIED) {
743+
// Read has already finished for this shard
744+
continue;
745+
}
746+
auto request = std::make_unique<TEvDataShard::TEvReadCancel>();
747+
auto& record = request->Record;
748+
record.SetReadId(shardId); // shardId is also a readId
749+
Send(PipeCache, new TEvPipeCache::TEvForward(request.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());
750+
ss << shardId << ", ";
751+
hasActiveReads = true;
752+
}
753+
754+
ss << "]";
755+
756+
if (hasActiveReads) {
757+
LOG_WARN_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, ss.Str());
758+
}
759+
}
760+
723761
void HandleTimeout(TEvents::TEvWakeup::TPtr&) {
724-
return ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "ReadRows from table " << GetTable()
762+
ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "ReadRows from table " << GetTable()
725763
<< " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec");
726764
}
727765

766+
void HandleForget(TRpcServices::TEvForgetOperation::TPtr& ev) {
767+
Y_UNUSED(ev);
768+
769+
ReplyWithError(Ydb::StatusIds::CANCELLED, TStringBuilder() << "ReadRows from table " << GetTable()
770+
<< " cancelled, because client disconnected");
771+
}
772+
728773
void ReplyWithError(const Ydb::StatusIds::StatusCode& status, const TString& errorMsg,
729774
const ::google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues = nullptr)
730775
{
776+
CancelReads();
731777
LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC ReplyWithError: " << errorMsg);
732778
SendResult(status, errorMsg, issues);
733779
}
@@ -736,7 +782,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
736782
return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly");
737783
}
738784

739-
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev) {
785+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
740786
return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Failed to connect to shard " << ev->Get()->TabletId);
741787
}
742788

@@ -760,6 +806,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
760806
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
761807

762808
hFunc(TEvents::TEvWakeup, HandleTimeout);
809+
hFunc(TRpcServices::TEvForgetOperation, HandleForget);
763810
}
764811
}
765812

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,15 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
155155
}
156156

157157
Server.Reset(MakeHolder<Tests::TServer>(*ServerSettings));
158-
Server->EnableGRpc(grpcPort);
158+
159+
if (settings.GrpcServerOptions) {
160+
auto options = settings.GrpcServerOptions;
161+
options->SetPort(grpcPort);
162+
options->SetHost("localhost");
163+
Server->EnableGRpc(*options);
164+
} else {
165+
Server->EnableGRpc(grpcPort);
166+
}
159167

160168
RunCall([this, domain = settings.DomainRoot] {
161169
this->Server->SetupDefaultProfiles();

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
9090
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
9191
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory = NYql::NDq::CreateDefaultS3ActorsFactory();
9292
NKikimrConfig::TImmediateControlsConfig Controls;
93+
TMaybe<NYdbGrpc::TServerOptions> GrpcServerOptions;
9394

9495
TKikimrSettings()
9596
{
@@ -128,6 +129,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
128129
return *this;
129130
}
130131
TKikimrSettings& SetControls(const NKikimrConfig::TImmediateControlsConfig& value) { Controls = value; return *this; }
132+
TKikimrSettings& SetGrpcServerOptions(const NYdbGrpc::TServerOptions& grpcServerOptions) { GrpcServerOptions = grpcServerOptions; return *this; };
131133
};
132134

133135
class TKikimrRunner {

ydb/core/kqp/ut/opt/kqp_kv_ut.cpp

Lines changed: 112 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22

3+
#include <ydb/core/base/blobstorage.h>
34
#include <ydb/core/tx/datashard/datashard.h>
45
#include <yql/essentials/parser/pg_catalog/catalog.h>
56
#include <yql/essentials/parser/pg_wrapper/interface/codec.h>
@@ -367,7 +368,12 @@ Y_UNIT_TEST_SUITE(KqpKv) {
367368
CompareYson(Sprintf("[[[%du];[%du]]]", valueToReturn_1, valueToReturn_2), TString{res});
368369
}
369370

370-
Y_UNIT_TEST_TWIN(ReadRows_ExternalBlobs, NewPrecharge) {
371+
template <typename FutureT>
372+
auto GetValue(TKikimrRunner& kikimr, FutureT&& future) {
373+
return kikimr.GetTestServer().GetRuntime()->WaitFuture(future);
374+
}
375+
376+
Y_UNIT_TEST_TWIN(ReadRows_ExternalBlobs, UseExtBlobsPrecharge) {
371377
NKikimrConfig::TImmediateControlsConfig controls;
372378

373379
if (NewPrecharge) {
@@ -379,11 +385,16 @@ Y_UNIT_TEST_SUITE(KqpKv) {
379385
auto settings = TKikimrSettings()
380386
.SetFeatureFlags(flags)
381387
.SetWithSampleTables(false)
382-
.SetControls(controls);
388+
.SetUseRealThreads(false)
389+
.SetControls(controls)
390+
.SetGrpcServerOptions(NYdbGrpc::TServerOptions()
391+
.SetGRpcShutdownDeadline(TDuration::MilliSeconds(0))
392+
);
393+
383394
auto kikimr = TKikimrRunner{settings};
384395

385396
auto db = kikimr.GetTableClient();
386-
auto session = db.CreateSession().GetValueSync().GetSession();
397+
auto session = GetValue(kikimr, db.CreateSession()).GetSession();
387398
const auto tableName = "/Root/TestTable";
388399
const auto keyColumnName_1 = "blob_id";
389400
const auto keyColumnName_2 = "chunk_num";
@@ -400,7 +411,7 @@ Y_UNIT_TEST_SUITE(KqpKv) {
400411
builder.SetPrimaryKeyColumns({keyColumnName_1, keyColumnName_2});
401412
builder.AddNullableColumn(dataColumnName, EPrimitiveType::String);
402413

403-
auto result = session.CreateTable(tableName, builder.Build()).GetValueSync();
414+
auto result = GetValue(kikimr, session.CreateTable(tableName, builder.Build()));
404415
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
405416

406417
TString largeValue(1_MB, 'L');
@@ -417,9 +428,27 @@ Y_UNIT_TEST_SUITE(KqpKv) {
417428
}
418429
rows.EndList();
419430

420-
auto upsertResult = db.BulkUpsert(tableName, rows.Build()).GetValueSync();
431+
auto upsertResult = GetValue(kikimr, db.BulkUpsert(tableName, rows.Build()));
421432
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
422433

434+
auto server = &kikimr.GetTestServer();
435+
436+
WaitForCompaction(server, tableName);
437+
438+
ui32 blobRequestCount = 0;
439+
440+
auto holder = server->GetRuntime()->AddObserver<TEvBlobStorage::TEvGet>([&blobRequestCount](TEvBlobStorage::TEvGet::TPtr& ev) {
441+
const auto& msg = ev->Get();
442+
for (size_t i = 0; i < msg->QuerySize; i++) {
443+
const auto& id = msg->Queries[i].Id;
444+
445+
if (id.BlobSize() == 1_MB + 8 /** ext blob meta */) {
446+
blobRequestCount++;
447+
break;
448+
}
449+
}
450+
});
451+
423452
NYdb::TValueBuilder keys;
424453
keys.BeginList();
425454
for (int i = 0; i < 10; i++) {
@@ -431,11 +460,7 @@ Y_UNIT_TEST_SUITE(KqpKv) {
431460
}
432461
keys.EndList();
433462

434-
auto server = &kikimr.GetTestServer();
435-
436-
WaitForCompaction(server, tableName);
437-
438-
auto selectResult = db.ReadRows(tableName, keys.Build()).GetValueSync();
463+
auto selectResult = GetValue(kikimr, db.ReadRows(tableName, keys.Build()));
439464

440465
UNIT_ASSERT_C(selectResult.IsSuccess(), selectResult.GetIssues().ToString());
441466

@@ -444,10 +469,83 @@ Y_UNIT_TEST_SUITE(KqpKv) {
444469

445470
UNIT_ASSERT(parser.TryNextRow());
446471

447-
auto val = parser.GetValue(0);
448-
TValueParser valParser(val);
449-
TUuidValue v = valParser.GetUuid();
450-
Cout << v.ToString() << Endl;
472+
UNIT_ASSERT_VALUES_EQUAL(UseExtBlobsPrecharge ? 2 : 10, blobRequestCount);
473+
}
474+
475+
Y_UNIT_TEST(ReadRows_TimeoutCancelsReads) {
476+
auto settings = TKikimrSettings()
477+
.SetWithSampleTables(false)
478+
.SetUseRealThreads(false)
479+
.SetGrpcServerOptions(NYdbGrpc::TServerOptions()
480+
.SetGRpcShutdownDeadline(TDuration::MilliSeconds(0))
481+
);
482+
483+
auto kikimr = TKikimrRunner{settings};
484+
485+
auto db = kikimr.GetTableClient();
486+
auto session = GetValue(kikimr, db.CreateSession()).GetSession();
487+
const auto tableName = "/Root/TestTable";
488+
const auto keyColumnName = "blob_id";
489+
const auto dataColumnName = "data";
490+
491+
TTableBuilder builder;
492+
builder.AddNonNullableColumn(keyColumnName, EPrimitiveType::Int32);
493+
builder.SetPrimaryKeyColumn(keyColumnName);
494+
builder.AddNullableColumn(dataColumnName, EPrimitiveType::String);
495+
builder.SetUniformPartitions(2);
496+
TExplicitPartitions partitions;
497+
partitions.AppendSplitPoints(TValueBuilder().BeginTuple().AddElement().OptionalInt32(5).EndTuple().Build());
498+
builder.SetPartitionAtKeys(partitions);
499+
500+
auto result = GetValue(kikimr, session.CreateTable(tableName, builder.Build()));
501+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
502+
503+
NYdb::TValueBuilder rows;
504+
rows.BeginList();
505+
for (int i = 0; i < 10; i++) {
506+
rows.AddListItem()
507+
.BeginStruct()
508+
.AddMember(keyColumnName).Int32(i)
509+
.AddMember(dataColumnName).String("foo")
510+
.EndStruct();
511+
}
512+
rows.EndList();
513+
514+
auto upsertResult = GetValue(kikimr, db.BulkUpsert(tableName, rows.Build()));
515+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
516+
517+
ui32 cancelCount = 0;
518+
bool droppedOneResult = false;
519+
520+
kikimr.GetTestServer().GetRuntime()->SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
521+
if (auto *evRead = ev->CastAsLocal<TEvDataShard::TEvReadResult>()) {
522+
if (droppedOneResult) {
523+
// Drop one of two results, so only one cancel would be sent to the DataShard
524+
return TTestActorRuntime::EEventAction::PROCESS;
525+
}
526+
droppedOneResult = true;
527+
return TTestActorRuntime::EEventAction::DROP;
528+
}
529+
if (auto *evReadCancel = ev->CastAsLocal<TEvDataShard::TEvReadCancel>()) {
530+
cancelCount++;
531+
}
532+
return TTestActorRuntime::EEventAction::PROCESS;
533+
});
534+
535+
NYdb::TValueBuilder keys;
536+
keys.BeginList();
537+
for (int i = 0; i < 10; i++) {
538+
keys.AddListItem()
539+
.BeginStruct()
540+
.AddMember(keyColumnName).Int32(i)
541+
.EndStruct();
542+
}
543+
keys.EndList();
544+
545+
auto selectResult = GetValue(kikimr, db.ReadRows(tableName, keys.Build()));
546+
547+
UNIT_ASSERT(!selectResult.IsSuccess());
548+
UNIT_ASSERT_VALUES_EQUAL(cancelCount, 1);
451549
}
452550

453551
TVector<::ReadRowsPgParam> readRowsPgParams

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3365,6 +3365,8 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte
33653365
state.Request->ReadSpan.EndError("Cancelled");
33663366
}
33673367
DeleteReadIterator(it);
3368+
3369+
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " Cancelled read: " << readId);
33683370
}
33693371

33703372
void TDataShard::Handle(TEvDataShard::TEvReadScanStarted::TPtr& ev) {

0 commit comments

Comments
 (0)