From c533c4fdffe7a4c352f468ed10f7a0ee4f96e5d5 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 17 Jun 2025 15:38:09 +0300 Subject: [PATCH] fix --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 78 ++++++++++++++++-------- ydb/core/kqp/runtime/kqp_write_table.cpp | 1 - 2 files changed, 53 insertions(+), 26 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 98327ec4e11f..acab72097790 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -382,6 +382,9 @@ class TKqpTableWriteActor : public TActorBootstrapped { std::move(columnsMetadata), std::move(writeIndexes), priority); + + // At current time only insert operation can fail. + NeedToFlushBeforeCommit |= (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT); CA_LOG_D("Open: token=" << token); } @@ -413,6 +416,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { void CleanupClosedTokens() { YQL_ENSURE(ShardedWriteController); ShardedWriteController->CleanupClosedTokens(); + NeedToFlushBeforeCommit = false; } void SetParentTraceId(NWilson::TTraceId traceId) { @@ -1297,6 +1301,10 @@ class TKqpTableWriteActor : public TActorBootstrapped { Stats.FillStats(stats, TablePath); } + bool FlushBeforeCommit() const { + return NeedToFlushBeforeCommit; + } + private: NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false); bool LinkedPipeCache = false; @@ -1326,6 +1334,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { IKqpTransactionManagerPtr TxManager; bool Closed = false; + bool NeedToFlushBeforeCommit = false; EMode Mode = EMode::WRITE; THashMap SendTime; @@ -1662,7 +1671,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu TKqpTableWriteActor::TWriteToken WriteToken = 0; bool Closed = false; - bool WaitingForTableActor = false; NWilson::TSpan DirectWriteActorSpan; @@ -1909,9 +1917,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub settings.Priority); } - // At current time only insert operation can fail. - NeedToFlushBeforeCommit |= (settings.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT); - writeInfo.Actors.at(settings.TableId.PathId).WriteActor->Open( token.Cookie, settings.OperationType, @@ -1935,6 +1940,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub Process(); } + bool NeedToFlush() { + const bool outOfMemory = GetTotalFreeSpace() <= 0; + const bool needToFlush = outOfMemory + || State == EState::FLUSHING + || State == EState::PREPARING + || State == EState::COMMITTING + || State == EState::ROLLINGBACK; + return needToFlush; + } + + bool NeedToFlushActor(const TKqpTableWriteActor* actor) { + return NeedToFlush() + && (State != EState::FLUSHING + || !TxId // Flush between queries + || actor->FlushBeforeCommit()); // Flush before commit + } + bool Process() { ProcessRequestQueue(); if (!ProcessFlush()) { @@ -1945,7 +1967,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub if (State == EState::FLUSHING) { bool isEmpty = true; ForEachWriteActor([&](const TKqpTableWriteActor* actor, const TActorId) { - isEmpty &= actor->IsReady() && actor->IsEmpty(); + if (NeedToFlushActor(actor)) { + isEmpty &= actor->IsReady() && actor->IsEmpty(); + } }); if (isEmpty) { OnFlushed(); @@ -2014,14 +2038,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub } bool ProcessFlush() { - const bool outOfMemory = GetTotalFreeSpace() <= 0; - const bool needToFlush = outOfMemory - || State == EState::FLUSHING - || State == EState::PREPARING - || State == EState::COMMITTING - || State == EState::ROLLINGBACK; - - if (!EnableStreamWrite && outOfMemory) { + if (!EnableStreamWrite && GetTotalFreeSpace() <= 0) { ReplyErrorAndDie( NYql::NDqProto::StatusIds::PRECONDITION_FAILED, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, @@ -2031,12 +2048,12 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub return false; } - if (needToFlush) { + if (NeedToFlush()) { CA_LOG_D("Flush data"); bool flushFailed = false; ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) { - if (!flushFailed && actor->IsReady()) { + if (!flushFailed && actor->IsReady() && NeedToFlushActor(actor)) { actor->FlushBuffers(); if (!actor->FlushToShards()) { flushFailed = true; @@ -2069,11 +2086,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub CA_LOG_D("Start prepare for distributed commit"); YQL_ENSURE(State == EState::WRITING); - YQL_ENSURE(!NeedToFlushBeforeCommit); State = EState::PREPARING; CheckQueuesEmpty(); AFL_ENSURE(TxId); ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) { + AFL_ENSURE(!actor->FlushBeforeCommit()); actor->SetPrepare(*TxId); }); Close(); @@ -2517,7 +2534,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub } else { AFL_ENSURE(ev->Get()->TxId); TxId = ev->Get()->TxId; - if (NeedToFlushBeforeCommit) { + + bool needToFlushBeforeCommit = false; + ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) { + needToFlushBeforeCommit |= actor->FlushBeforeCommit(); + }); + + if (needToFlushBeforeCommit) { Flush(std::move(ev->TraceId)); } else { TxManager->StartPrepare(); @@ -2902,8 +2925,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId()); OnOperationFinished(Counters->BufferActorFlushLatencyHistogram); State = EState::WRITING; - AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit - NeedToFlushBeforeCommit = false; + + ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) { + AFL_ENSURE(TxId || actor->IsEmpty()); + if (actor->IsEmpty()) { + actor->CleanupClosedTokens(); + } + if (!TxId) { + actor->Unlink(); + } + + AFL_ENSURE(!actor->FlushBeforeCommit()); + }); + if (TxId) { TxManager->StartPrepare(); Prepare(std::nullopt); @@ -2915,11 +2949,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub }); ExecuterActorId = {}; Y_ABORT_UNLESS(GetTotalMemory() == 0); - - ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) { - actor->CleanupClosedTokens(); - actor->Unlink(); - }); } void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, NYql::EYqlIssueCode id, const TString& message, const NYql::TIssues& subIssues) override { @@ -3059,7 +3088,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub EState State; bool HasError = false; - bool NeedToFlushBeforeCommit = false; THashMap> RequestQueues; struct TAckMessage { diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index c7824518a1a8..38c069eb3f35 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1450,7 +1450,6 @@ class TShardedWriteController : public IShardedWriteController { } void CleanupClosedTokens() override { - AFL_ENSURE(IsEmpty()); for (auto it = WriteInfos.begin(); it != WriteInfos.end();) { if (it->second.Closed) { AFL_ENSURE(it->second.Serializer->IsFinished());