Skip to content

Commit 21e33f7

Browse files
authored
Flush only INSERTs before commit (#19744)
1 parent 38bd366 commit 21e33f7

File tree

2 files changed

+53
-26
lines changed

2 files changed

+53
-26
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
382382
std::move(columnsMetadata),
383383
std::move(writeIndexes),
384384
priority);
385+
386+
// At current time only insert operation can fail.
387+
NeedToFlushBeforeCommit |= (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT);
385388
CA_LOG_D("Open: token=" << token);
386389
}
387390

@@ -413,6 +416,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
413416
void CleanupClosedTokens() {
414417
YQL_ENSURE(ShardedWriteController);
415418
ShardedWriteController->CleanupClosedTokens();
419+
NeedToFlushBeforeCommit = false;
416420
}
417421

418422
void SetParentTraceId(NWilson::TTraceId traceId) {
@@ -1297,6 +1301,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
12971301
Stats.FillStats(stats, TablePath);
12981302
}
12991303

1304+
bool FlushBeforeCommit() const {
1305+
return NeedToFlushBeforeCommit;
1306+
}
1307+
13001308
private:
13011309
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
13021310
bool LinkedPipeCache = false;
@@ -1326,6 +1334,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
13261334

13271335
IKqpTransactionManagerPtr TxManager;
13281336
bool Closed = false;
1337+
bool NeedToFlushBeforeCommit = false;
13291338
EMode Mode = EMode::WRITE;
13301339
THashMap<ui64, TInstant> SendTime;
13311340

@@ -1662,7 +1671,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
16621671
TKqpTableWriteActor::TWriteToken WriteToken = 0;
16631672

16641673
bool Closed = false;
1665-
16661674
bool WaitingForTableActor = false;
16671675

16681676
NWilson::TSpan DirectWriteActorSpan;
@@ -1909,9 +1917,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19091917
settings.Priority);
19101918
}
19111919

1912-
// At current time only insert operation can fail.
1913-
NeedToFlushBeforeCommit |= (settings.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT);
1914-
19151920
writeInfo.Actors.at(settings.TableId.PathId).WriteActor->Open(
19161921
token.Cookie,
19171922
settings.OperationType,
@@ -1935,6 +1940,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19351940
Process();
19361941
}
19371942

1943+
bool NeedToFlush() {
1944+
const bool outOfMemory = GetTotalFreeSpace() <= 0;
1945+
const bool needToFlush = outOfMemory
1946+
|| State == EState::FLUSHING
1947+
|| State == EState::PREPARING
1948+
|| State == EState::COMMITTING
1949+
|| State == EState::ROLLINGBACK;
1950+
return needToFlush;
1951+
}
1952+
1953+
bool NeedToFlushActor(const TKqpTableWriteActor* actor) {
1954+
return NeedToFlush()
1955+
&& (State != EState::FLUSHING
1956+
|| !TxId // Flush between queries
1957+
|| actor->FlushBeforeCommit()); // Flush before commit
1958+
}
1959+
19381960
bool Process() {
19391961
ProcessRequestQueue();
19401962
if (!ProcessFlush()) {
@@ -1945,7 +1967,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19451967
if (State == EState::FLUSHING) {
19461968
bool isEmpty = true;
19471969
ForEachWriteActor([&](const TKqpTableWriteActor* actor, const TActorId) {
1948-
isEmpty &= actor->IsReady() && actor->IsEmpty();
1970+
if (NeedToFlushActor(actor)) {
1971+
isEmpty &= actor->IsReady() && actor->IsEmpty();
1972+
}
19491973
});
19501974
if (isEmpty) {
19511975
OnFlushed();
@@ -2014,14 +2038,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
20142038
}
20152039

20162040
bool ProcessFlush() {
2017-
const bool outOfMemory = GetTotalFreeSpace() <= 0;
2018-
const bool needToFlush = outOfMemory
2019-
|| State == EState::FLUSHING
2020-
|| State == EState::PREPARING
2021-
|| State == EState::COMMITTING
2022-
|| State == EState::ROLLINGBACK;
2023-
2024-
if (!EnableStreamWrite && outOfMemory) {
2041+
if (!EnableStreamWrite && GetTotalFreeSpace() <= 0) {
20252042
ReplyErrorAndDie(
20262043
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
20272044
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
@@ -2031,12 +2048,12 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
20312048
return false;
20322049
}
20332050

2034-
if (needToFlush) {
2051+
if (NeedToFlush()) {
20352052
CA_LOG_D("Flush data");
20362053

20372054
bool flushFailed = false;
20382055
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
2039-
if (!flushFailed && actor->IsReady()) {
2056+
if (!flushFailed && actor->IsReady() && NeedToFlushActor(actor)) {
20402057
actor->FlushBuffers();
20412058
if (!actor->FlushToShards()) {
20422059
flushFailed = true;
@@ -2069,11 +2086,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
20692086

20702087
CA_LOG_D("Start prepare for distributed commit");
20712088
YQL_ENSURE(State == EState::WRITING);
2072-
YQL_ENSURE(!NeedToFlushBeforeCommit);
20732089
State = EState::PREPARING;
20742090
CheckQueuesEmpty();
20752091
AFL_ENSURE(TxId);
20762092
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
2093+
AFL_ENSURE(!actor->FlushBeforeCommit());
20772094
actor->SetPrepare(*TxId);
20782095
});
20792096
Close();
@@ -2517,7 +2534,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
25172534
} else {
25182535
AFL_ENSURE(ev->Get()->TxId);
25192536
TxId = ev->Get()->TxId;
2520-
if (NeedToFlushBeforeCommit) {
2537+
2538+
bool needToFlushBeforeCommit = false;
2539+
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
2540+
needToFlushBeforeCommit |= actor->FlushBeforeCommit();
2541+
});
2542+
2543+
if (needToFlushBeforeCommit) {
25212544
Flush(std::move(ev->TraceId));
25222545
} else {
25232546
TxManager->StartPrepare();
@@ -2902,8 +2925,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29022925
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
29032926
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
29042927
State = EState::WRITING;
2905-
AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
2906-
NeedToFlushBeforeCommit = false;
2928+
2929+
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
2930+
AFL_ENSURE(TxId || actor->IsEmpty());
2931+
if (actor->IsEmpty()) {
2932+
actor->CleanupClosedTokens();
2933+
}
2934+
if (!TxId) {
2935+
actor->Unlink();
2936+
}
2937+
2938+
AFL_ENSURE(!actor->FlushBeforeCommit());
2939+
});
2940+
29072941
if (TxId) {
29082942
TxManager->StartPrepare();
29092943
Prepare(std::nullopt);
@@ -2915,11 +2949,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29152949
});
29162950
ExecuterActorId = {};
29172951
Y_ABORT_UNLESS(GetTotalMemory() == 0);
2918-
2919-
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
2920-
actor->CleanupClosedTokens();
2921-
actor->Unlink();
2922-
});
29232952
}
29242953

29252954
void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, NYql::EYqlIssueCode id, const TString& message, const NYql::TIssues& subIssues) override {
@@ -3059,7 +3088,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
30593088

30603089
EState State;
30613090
bool HasError = false;
3062-
bool NeedToFlushBeforeCommit = false;
30633091
THashMap<TPathId, std::queue<TBufferWriteMessage>> RequestQueues;
30643092

30653093
struct TAckMessage {

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1450,7 +1450,6 @@ class TShardedWriteController : public IShardedWriteController {
14501450
}
14511451

14521452
void CleanupClosedTokens() override {
1453-
AFL_ENSURE(IsEmpty());
14541453
for (auto it = WriteInfos.begin(); it != WriteInfos.end();) {
14551454
if (it->second.Closed) {
14561455
AFL_ENSURE(it->second.Serializer->IsFinished());

0 commit comments

Comments
 (0)