Skip to content

Commit 37c19de

Browse files
The PQ tablet does not receive a TEvTxCalcPredicateResult (#17497)
1 parent 52cd0e1 commit 37c19de

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2256,7 +2256,7 @@ void TPartition::AnswerCurrentReplies(const TActorContext& ctx)
22562256
TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t)
22572257
{
22582258
auto result = EProcessResult::Continue;
2259-
if (t->SupportivePartitionActor && !t->WriteInfo) { // Pending for write info
2259+
if (t->SupportivePartitionActor && !t->WriteInfo && !t->WriteInfoApplied) { // Pending for write info
22602260
return EProcessResult::NotReady;
22612261
}
22622262
if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,15 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
315315
void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode);
316316
void WaitForDeletePartitionDone();
317317

318+
void SendCalcPredicate(ui64 step,
319+
ui64 txId,
320+
const TActorId& suppPartitionId);
321+
void WaitForGetWriteInfoRequest();
322+
void SendGetWriteInfoError(ui32 internalPartitionId,
323+
TString message,
324+
const TActorId& suppPartitionId);
325+
void WaitForCalcPredicateResult(ui64 txId, bool predicate);
326+
318327
TMaybe<TTestContext> Ctx;
319328
TMaybe<TFinalizer> Finalizer;
320329

@@ -2512,6 +2521,47 @@ void TPartitionFixture::CmdChangeOwner(ui64 cookie, const TString& sourceId, TDu
25122521
ownerCookie = event->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
25132522
}
25142523

2524+
void TPartitionFixture::SendCalcPredicate(ui64 step,
2525+
ui64 txId,
2526+
const TActorId& suppPartitionId)
2527+
{
2528+
SendCalcPredicate(step, txId, "", 0, 0, suppPartitionId);
2529+
}
2530+
2531+
void TPartitionFixture::WaitForGetWriteInfoRequest()
2532+
{
2533+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvGetWriteInfoRequest>();
2534+
UNIT_ASSERT(event != nullptr);
2535+
//UNIT_ASSERT_VALUES_EQUAL(event->OriginalPartition, ActorId);
2536+
}
2537+
2538+
void TPartitionFixture::SendGetWriteInfoError(ui32 internalPartitionId,
2539+
TString message,
2540+
const TActorId& suppPartitionId)
2541+
{
2542+
auto event = MakeHolder<TEvPQ::TEvGetWriteInfoError>(internalPartitionId,
2543+
std::move(message));
2544+
//event->SupportivePartition = suppPartitionId;
2545+
2546+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, suppPartitionId, event.Release()));
2547+
}
2548+
2549+
void TPartitionFixture::WaitForCalcPredicateResult(ui64 txId, bool predicate)
2550+
{
2551+
while (true) {
2552+
TAutoPtr<IEventHandle> handle;
2553+
auto events =
2554+
Ctx->Runtime->GrabEdgeEvents<TEvPQ::TEvTxCalcPredicateResult, TEvKeyValue::TEvRequest>(handle,
2555+
TDuration::Seconds(1));
2556+
if (std::get<TEvKeyValue::TEvRequest*>(events)) {
2557+
SendDiskStatusResponse(nullptr);
2558+
} else if (auto* event = std::get<TEvPQ::TEvTxCalcPredicateResult*>(events)) {
2559+
UNIT_ASSERT_VALUES_EQUAL(event->TxId, txId);
2560+
UNIT_ASSERT_VALUES_EQUAL(event->Predicate, predicate);
2561+
break;
2562+
}
2563+
}
2564+
}
25152565

25162566
Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture)
25172567
{
@@ -3614,6 +3664,20 @@ Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_M
36143664
WaitForWriteError(2, NPersQueue::NErrorCode::ERROR);
36153665
}
36163666

3667+
Y_UNIT_TEST_F(After_TEvGetWriteInfoError_Comes_TEvTxCalcPredicateResult, TPartitionFixture)
3668+
{
3669+
const TPartitionId partitionId{1};
3670+
const ui64 step = 12345;
3671+
const ui64 txId = 67890;
3672+
3673+
CreatePartition({.Partition=partitionId});
3674+
3675+
SendCalcPredicate(step, txId, Ctx->Edge);
3676+
WaitForGetWriteInfoRequest();
3677+
SendGetWriteInfoError(31415, "error", Ctx->Edge);
3678+
WaitForCalcPredicateResult(txId, false);
3679+
}
3680+
36173681
} // End of suite
36183682

36193683
} // namespace

0 commit comments

Comments
 (0)