Skip to content

Commit 4c8dfa6

Browse files
Fix seqNo conflict on concurrent TXs (#15281)
1 parent b725eb8 commit 4c8dfa6

File tree

5 files changed

+201
-37
lines changed

5 files changed

+201
-37
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,16 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
12771277
}
12781278
txSourceIds.insert(s.first);
12791279
}
1280+
auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first);
1281+
1282+
if (!inFlightIter.IsEnd()) {
1283+
if (s.second.MinSeqNo <= inFlightIter->second) {
1284+
tx.Predicate = false;
1285+
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
1286+
tx.WriteInfoApplied = true;
1287+
break;
1288+
}
1289+
}
12801290

12811291
auto existing = knownSourceIds.find(s.first);
12821292
if (existing.IsEnd())
@@ -1291,9 +1301,6 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
12911301
if (ret == EProcessResult::Continue && tx.Predicate.GetOrElse(true)) {
12921302
TxAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end());
12931303

1294-
// A temporary solution. This line should be deleted when we fix the error with the SeqNo promotion.
1295-
WriteAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end());
1296-
12971304
tx.WriteInfoApplied = true;
12981305
WriteKeysSizeEstimate += tx.WriteInfo->BodyKeys.size();
12991306
WriteKeysSizeEstimate += tx.WriteInfo->SrcIdInfo.size();
@@ -2391,6 +2398,13 @@ void TPartition::CommitWriteOperations(TTransaction& t)
23912398
if (!t.WriteInfo) {
23922399
return;
23932400
}
2401+
for (const auto& s : t.WriteInfo->SrcIdInfo) {
2402+
auto [iter, ins] = TxInflightMaxSeqNoPerSourceId.emplace(s.first, s.second.SeqNo);
2403+
if (!ins) {
2404+
Y_ABORT_UNLESS(iter->second < s.second.SeqNo);
2405+
iter->second = s.second.SeqNo;
2406+
}
2407+
}
23942408
const auto& ctx = ActorContext();
23952409

23962410
if (!HaveWriteMsg) {
@@ -2406,6 +2420,8 @@ void TPartition::CommitWriteOperations(TTransaction& t)
24062420
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
24072421
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
24082422

2423+
auto oldHeadOffset = NewHead.Offset;
2424+
24092425
if (!t.WriteInfo->BodyKeys.empty()) {
24102426
bool needCompactHead =
24112427
(Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0;
@@ -2494,12 +2510,16 @@ void TPartition::CommitWriteOperations(TTransaction& t)
24942510

24952511
WriteInflightSize += msg.Msg.Data.size();
24962512
ExecRequest(msg, *Parameters, PersistRequest.Get());
2497-
2498-
auto& info = TxSourceIdForPostPersist[blob.SourceId];
2499-
info.SeqNo = blob.SeqNo;
2500-
info.Offset = NewHead.Offset;
25012513
}
25022514
}
2515+
for (const auto& [srcId, info] : t.WriteInfo->SrcIdInfo) {
2516+
auto& sourceIdBatch = Parameters->SourceIdBatch;
2517+
auto sourceId = sourceIdBatch.GetSource(srcId);
2518+
sourceId.Update(info.SeqNo, info.Offset + oldHeadOffset, CurrentTimestamp);
2519+
auto& persistInfo = TxSourceIdForPostPersist[srcId];
2520+
persistInfo.SeqNo = info.SeqNo;
2521+
persistInfo.Offset = info.Offset + oldHeadOffset;
2522+
}
25032523

25042524
Parameters->FirstCommitWriteOperations = false;
25052525

ydb/core/persqueue/partition.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
665665
THashSet<TString> TxAffectedConsumers;
666666
THashSet<TString> SetOffsetAffectedConsumers;
667667
THashMap<TString, TSourceIdPostPersistInfo> TxSourceIdForPostPersist;
668+
THashMap<TString, ui64> TxInflightMaxSeqNoPerSourceId;
669+
668670

669671
ui32 MaxBlobSize;
670672
const ui32 TotalLevels = 4;

ydb/core/persqueue/partition_write.cpp

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
315315
already ? maxOffset : offset, CurrentTimestamp, already, maxSeqNo,
316316
PartitionQuotaWaitTimeForCurrentBlob, TopicQuotaWaitTimeForCurrentBlob, queueTime, writeTime, response.Span
317317
);
318+
318319
PQ_LOG_D("Answering for message sourceid: '" << EscapeC(s)
319320
<< "', Topic: '" << TopicName()
320321
<< "', Partition: " << Partition
@@ -521,6 +522,7 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
521522
SourceIdCounter.Use(sourceId, now);
522523
}
523524
TxSourceIdForPostPersist.clear();
525+
TxInflightMaxSeqNoPerSourceId.clear();
524526

525527
TxAffectedSourcesIds.clear();
526528
WriteAffectedSourcesIds.clear();
@@ -1042,6 +1044,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
10421044
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
10431045
return EProcessResult::ContinueDrop;
10441046
}
1047+
10451048
if (DiskIsFull) {
10461049
ScheduleReplyError(p.Cookie,
10471050
NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL,
@@ -1051,6 +1054,13 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
10511054
if (TxAffectedSourcesIds.contains(p.Msg.SourceId)) {
10521055
return EProcessResult::Blocked;
10531056
}
1057+
auto inflightMaxSeqNo = TxInflightMaxSeqNoPerSourceId.find(p.Msg.SourceId);
1058+
1059+
if (!inflightMaxSeqNo.IsEnd()) {
1060+
if (p.Msg.SeqNo <= inflightMaxSeqNo->second) {
1061+
return EProcessResult::Blocked;
1062+
}
1063+
}
10541064
WriteAffectedSourcesIds.insert(p.Msg.SourceId);
10551065
return EProcessResult::Continue;
10561066
}
@@ -1173,12 +1183,11 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
11731183
<< ". Writing seqNo: " << sourceId.UpdatedSeqNo()
11741184
<< ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset
11751185
);
1176-
if (!p.Internal) {
1177-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
1178-
MsgsDiscarded.Inc();
1179-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
1180-
BytesDiscarded.Inc(p.Msg.Data.size());
1181-
}
1186+
Y_ENSURE(!p.Internal); // No Already for transactions;
1187+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
1188+
MsgsDiscarded.Inc();
1189+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
1190+
BytesDiscarded.Inc(p.Msg.Data.size());
11821191
} else {
11831192
if (!p.Internal) {
11841193
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
@@ -1225,6 +1234,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
12251234
", must be at least " << curOffset,
12261235
p,
12271236
NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET);
1237+
12281238
return false;
12291239
}
12301240

@@ -1279,6 +1289,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
12791289
//this must not be happen - client sends gaps, fail this client till the end
12801290
//now no changes will leak
12811291
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
1292+
12821293
return false;
12831294
}
12841295
WriteNewSizeFull += p.Msg.SourceId.size() + p.Msg.Data.size();
@@ -1380,7 +1391,6 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
13801391
++curOffset;
13811392
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
13821393
}
1383-
13841394
return true;
13851395
}
13861396

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ struct TTestContext {
127127

128128
static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration duration, TInstant& deadline) {
129129
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
130-
Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
131130
if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") {
132131
return true;
133132
}

0 commit comments

Comments
 (0)