Skip to content

Flush only INSERTs before commit #19744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 53 additions & 25 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
std::move(columnsMetadata),
std::move(writeIndexes),
priority);

// At current time only insert operation can fail.
NeedToFlushBeforeCommit |= (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT);
CA_LOG_D("Open: token=" << token);
}

Expand Down Expand Up @@ -413,6 +416,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
void CleanupClosedTokens() {
YQL_ENSURE(ShardedWriteController);
ShardedWriteController->CleanupClosedTokens();
NeedToFlushBeforeCommit = false;
}

void SetParentTraceId(NWilson::TTraceId traceId) {
Expand Down Expand Up @@ -1297,6 +1301,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
Stats.FillStats(stats, TablePath);
}

bool FlushBeforeCommit() const {
return NeedToFlushBeforeCommit;
}

private:
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
bool LinkedPipeCache = false;
Expand Down Expand Up @@ -1326,6 +1334,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {

IKqpTransactionManagerPtr TxManager;
bool Closed = false;
bool NeedToFlushBeforeCommit = false;
EMode Mode = EMode::WRITE;
THashMap<ui64, TInstant> SendTime;

Expand Down Expand Up @@ -1662,7 +1671,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
TKqpTableWriteActor::TWriteToken WriteToken = 0;

bool Closed = false;

bool WaitingForTableActor = false;

NWilson::TSpan DirectWriteActorSpan;
Expand Down Expand Up @@ -1909,9 +1917,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
settings.Priority);
}

// At current time only insert operation can fail.
NeedToFlushBeforeCommit |= (settings.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT);

writeInfo.Actors.at(settings.TableId.PathId).WriteActor->Open(
token.Cookie,
settings.OperationType,
Expand All @@ -1935,6 +1940,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
Process();
}

bool NeedToFlush() {
const bool outOfMemory = GetTotalFreeSpace() <= 0;
const bool needToFlush = outOfMemory
|| State == EState::FLUSHING
|| State == EState::PREPARING
|| State == EState::COMMITTING
|| State == EState::ROLLINGBACK;
return needToFlush;
}

bool NeedToFlushActor(const TKqpTableWriteActor* actor) {
return NeedToFlush()
&& (State != EState::FLUSHING
|| !TxId // Flush between queries
|| actor->FlushBeforeCommit()); // Flush before commit
}

bool Process() {
ProcessRequestQueue();
if (!ProcessFlush()) {
Expand All @@ -1945,7 +1967,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
if (State == EState::FLUSHING) {
bool isEmpty = true;
ForEachWriteActor([&](const TKqpTableWriteActor* actor, const TActorId) {
isEmpty &= actor->IsReady() && actor->IsEmpty();
if (NeedToFlushActor(actor)) {
isEmpty &= actor->IsReady() && actor->IsEmpty();
}
});
if (isEmpty) {
OnFlushed();
Expand Down Expand Up @@ -2014,14 +2038,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}

bool ProcessFlush() {
const bool outOfMemory = GetTotalFreeSpace() <= 0;
const bool needToFlush = outOfMemory
|| State == EState::FLUSHING
|| State == EState::PREPARING
|| State == EState::COMMITTING
|| State == EState::ROLLINGBACK;

if (!EnableStreamWrite && outOfMemory) {
if (!EnableStreamWrite && GetTotalFreeSpace() <= 0) {
ReplyErrorAndDie(
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
Expand All @@ -2031,12 +2048,12 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
return false;
}

if (needToFlush) {
if (NeedToFlush()) {
CA_LOG_D("Flush data");

bool flushFailed = false;
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
if (!flushFailed && actor->IsReady()) {
if (!flushFailed && actor->IsReady() && NeedToFlushActor(actor)) {
actor->FlushBuffers();
if (!actor->FlushToShards()) {
flushFailed = true;
Expand Down Expand Up @@ -2069,11 +2086,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub

CA_LOG_D("Start prepare for distributed commit");
YQL_ENSURE(State == EState::WRITING);
YQL_ENSURE(!NeedToFlushBeforeCommit);
State = EState::PREPARING;
CheckQueuesEmpty();
AFL_ENSURE(TxId);
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
AFL_ENSURE(!actor->FlushBeforeCommit());
actor->SetPrepare(*TxId);
});
Close();
Expand Down Expand Up @@ -2517,7 +2534,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
} else {
AFL_ENSURE(ev->Get()->TxId);
TxId = ev->Get()->TxId;
if (NeedToFlushBeforeCommit) {

bool needToFlushBeforeCommit = false;
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
needToFlushBeforeCommit |= actor->FlushBeforeCommit();
});

if (needToFlushBeforeCommit) {
Flush(std::move(ev->TraceId));
} else {
TxManager->StartPrepare();
Expand Down Expand Up @@ -2902,8 +2925,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
State = EState::WRITING;
AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
NeedToFlushBeforeCommit = false;

ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
AFL_ENSURE(TxId || actor->IsEmpty());
if (actor->IsEmpty()) {
actor->CleanupClosedTokens();
}
if (!TxId) {
actor->Unlink();
}

AFL_ENSURE(!actor->FlushBeforeCommit());
});

if (TxId) {
TxManager->StartPrepare();
Prepare(std::nullopt);
Expand All @@ -2915,11 +2949,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
});
ExecuterActorId = {};
Y_ABORT_UNLESS(GetTotalMemory() == 0);

ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
actor->CleanupClosedTokens();
actor->Unlink();
});
}

void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, NYql::EYqlIssueCode id, const TString& message, const NYql::TIssues& subIssues) override {
Expand Down Expand Up @@ -3059,7 +3088,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub

EState State;
bool HasError = false;
bool NeedToFlushBeforeCommit = false;
THashMap<TPathId, std::queue<TBufferWriteMessage>> RequestQueues;

struct TAckMessage {
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/runtime/kqp_write_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,6 @@ class TShardedWriteController : public IShardedWriteController {
}

void CleanupClosedTokens() override {
AFL_ENSURE(IsEmpty());
for (auto it = WriteInfos.begin(); it != WriteInfos.end();) {
if (it->second.Closed) {
AFL_ENSURE(it->second.Serializer->IsFinished());
Expand Down
Loading