Skip to content

Commit 402c71d

Browse files
committed
Allow to send duplicates through TDedicatedPipePool
1 parent 7fd0265 commit 402c71d

File tree

3 files changed

+38
-30
lines changed

3 files changed

+38
-30
lines changed

ydb/core/tx/schemeshard/dedicated_pipe_pool.h

+14-11
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,21 @@ class TDedicatedPipePool {
1515
TMap<TActorId, std::pair<TEntityId, TTabletId>> Owners;
1616

1717
public:
18-
void Create(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
19-
Y_ABORT_UNLESS(!Pipes[entityId].contains(dst));
18+
void Send(const TEntityId& entityId, TTabletId dst, THolder<IEventBase> message, const TActorContext& ctx) {
2019
using namespace NTabletPipe;
2120

22-
const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
23-
.MinRetryTime = TDuration::MilliSeconds(100),
24-
.MaxRetryTime = TDuration::Seconds(30),
25-
}));
21+
if (!Pipes[entityId].contains(dst)) {
22+
const auto clientId = ctx.Register(CreateClient(ctx.SelfID, ui64(dst), TClientRetryPolicy {
23+
.MinRetryTime = TDuration::MilliSeconds(100),
24+
.MaxRetryTime = TDuration::Seconds(30),
25+
}));
2626

27-
Pipes[entityId][dst] = clientId;
28-
Owners[clientId] = std::make_pair(entityId, dst);
27+
Pipes[entityId][dst] = clientId;
28+
Owners[clientId] = std::make_pair(entityId, dst);
29+
}
2930

31+
const auto clientId = Pipes[entityId][dst];
32+
Y_ABORT_UNLESS(Owners[clientId] == std::make_pair(entityId, dst));
3033
SendData(ctx.SelfID, clientId, message.Release(), 0);
3134
}
3235

@@ -53,10 +56,10 @@ class TDedicatedPipePool {
5356
}
5457
}
5558

56-
ui64 CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
59+
void CloseAll(const TEntityId& entityId, const TActorContext& ctx) {
5760
auto entityIt = Pipes.find(entityId);
5861
if (entityIt == Pipes.end()) {
59-
return 0;
62+
return;
6063
}
6164

6265
const auto& entityPipes = entityIt->second;
@@ -70,7 +73,7 @@ class TDedicatedPipePool {
7073
Close(entityId, tabletId, ctx);
7174
}
7275

73-
return tablets.size();
76+
return;
7477
}
7578

7679
void Shutdown(const TActorContext& ctx) {

ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp

+23-18
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
497497
private:
498498
TIndexBuildId BuildId;
499499

500-
TDeque<std::tuple<TTabletId, ui64, THolder<IEventBase>>> ToTabletSend;
500+
TMap<TTabletId, THolder<IEventBase>> ToTabletSend;
501501

502502
template <bool WithSnapshot = true, typename Record>
503503
TTabletId CommonFillRecord(Record& record, TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -551,9 +551,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
551551

552552
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
553553
ev->Record.SetSeed(ui64(shardId));
554-
LOG_D("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString());
554+
LOG_N("TTxBuildProgress: TEvSampleKRequest: " << ev->Record.ShortDebugString());
555555

556-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
556+
ToTabletSend.emplace(shardId, std::move(ev));
557557
}
558558

559559
void SendKMeansReshuffleRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -595,9 +595,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
595595
r.ClearClusters();
596596
return r.ShortDebugString();
597597
};
598-
LOG_D("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record));
598+
LOG_N("TTxBuildProgress: TEvReshuffleKMeansRequest: " << toDebugStr(ev->Record));
599599

600-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
600+
ToTabletSend.emplace(shardId, std::move(ev));
601601
}
602602

603603
void SendKMeansLocalRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -644,9 +644,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
644644

645645
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
646646
ev->Record.SetSeed(ui64(shardId));
647-
LOG_D("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());
647+
LOG_N("TTxBuildProgress: TEvLocalKMeansRequest: " << ev->Record.ShortDebugString());
648648

649-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
649+
ToTabletSend.emplace(shardId, std::move(ev));
650650
}
651651

652652
void SendPrefixKMeansRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -685,9 +685,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
685685

686686
auto shardId = CommonFillRecord<false>(ev->Record, shardIdx, buildInfo);
687687
ev->Record.SetSeed(ui64(shardId));
688-
LOG_D("TTxBuildProgress: TEvPrefixKMeansRequest: " << ev->Record.ShortDebugString());
688+
LOG_N("TTxBuildProgress: TEvPrefixKMeansRequest: " << ev->Record.ShortDebugString());
689689

690-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
690+
ToTabletSend.emplace(shardId, std::move(ev));
691691
}
692692

693693
void SendBuildSecondaryIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
@@ -736,9 +736,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
736736

737737
auto shardId = CommonFillRecord(ev->Record, shardIdx, buildInfo);
738738

739-
LOG_D("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString());
739+
LOG_N("TTxBuildProgress: TEvBuildIndexCreateRequest: " << ev->Record.ShortDebugString());
740740

741-
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
741+
ToTabletSend.emplace(shardId, std::move(ev));
742742
}
743743

744744
void SendUploadSampleKRequest(TIndexBuildInfo& buildInfo) {
@@ -757,7 +757,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
757757
buildInfo.DoneShards = {};
758758
buildInfo.InProgressShards = {};
759759
buildInfo.ToUploadShards = {};
760-
760+
761+
ToTabletSend.clear();
761762
Self->IndexBuildPipes.CloseAll(BuildId, ctx);
762763
}
763764

@@ -791,6 +792,9 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
791792
}
792793

793794
void AddAllShards(TIndexBuildInfo& buildInfo) {
795+
ToTabletSend.clear();
796+
Self->IndexBuildPipes.CloseAll(BuildId, Self->ActorContext());
797+
794798
for (const auto& [idx, status] : buildInfo.Shards) {
795799
AddShard(buildInfo, idx, status);
796800
}
@@ -1102,7 +1106,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
11021106
Y_ABORT_UNLESS(buildInfoPtr);
11031107
auto& buildInfo = *buildInfoPtr->Get();
11041108

1105-
LOG_I("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo);
1109+
LOG_N("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State);
1110+
LOG_D("TTxBuildProgress: Execute: " << BuildId << " " << buildInfo.State << " " << buildInfo);
11061111

11071112
switch (buildInfo.State) {
11081113
case TIndexBuildInfo::EState::Invalid:
@@ -1390,8 +1395,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
13901395
}
13911396

13921397
void DoComplete(const TActorContext& ctx) override {
1393-
for (auto& x: ToTabletSend) {
1394-
Self->IndexBuildPipes.Create(BuildId, std::get<0>(x), std::move(std::get<2>(x)), ctx);
1398+
for (auto& [shardId, ev]: ToTabletSend) {
1399+
Self->IndexBuildPipes.Send(BuildId, shardId, std::move(ev), ctx);
13951400
}
13961401
ToTabletSend.clear();
13971402
}
@@ -1466,7 +1471,7 @@ struct TSchemeShard::TIndexBuilder::TTxReplyRetry: public TSchemeShard::TIndexBu
14661471
const auto& tabletId = PipeRetry.TabletId;
14671472
const auto& shardIdx = Self->GetShardIdx(tabletId);
14681473

1469-
LOG_I("TTxReply : PipeRetry, id# " << buildId
1474+
LOG_N("TTxReply : PipeRetry, id# " << buildId
14701475
<< ", tabletId# " << tabletId
14711476
<< ", shardIdx# " << shardIdx);
14721477

@@ -2416,8 +2421,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyModify: public TSchemeShard::TIndexB
24162421
<< ", BuildIndexId: " << buildInfo.Id
24172422
<< ", status: " << Ydb::StatusIds::StatusCode_Name(status)
24182423
<< ", error: " << buildInfo.Issue
2419-
<< ", replyTo: " << buildInfo.CreateSender.ToString());
2420-
LOG_D("Message:\n" << responseEv->Record.ShortDebugString());
2424+
<< ", replyTo: " << buildInfo.CreateSender.ToString()
2425+
<< ", message: " << responseEv->Record.ShortDebugString());
24212426

24222427
Send(buildInfo.CreateSender, std::move(responseEv), 0, buildInfo.SenderCookie);
24232428
}

ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem
115115

116116
void Complete(const TActorContext& ctx) override {
117117
for (auto& [streamPathId, tabletId, ev] : ScanRequests) {
118-
Self->CdcStreamScanPipes.Create(streamPathId, tabletId, std::move(ev), ctx);
118+
Self->CdcStreamScanPipes.Send(streamPathId, tabletId, std::move(ev), ctx);
119119
}
120120

121121
if (StreamToProgress) {

0 commit comments

Comments
 (0)