Skip to content

Commit 4d5b4d5

Browse files
nerhneiroIrina Skvortsova
andauthored
add committedMetadata in PQ tablet (#18292)
Co-authored-by: Irina Skvortsova <nerhneiro@yandex-team.ru>
1 parent c1ac8fd commit 4d5b4d5

File tree

13 files changed

+124
-42
lines changed

13 files changed

+124
-42
lines changed

ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,16 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
9696
std::unordered_map<TString, TEvKafka::PartitionConsumerOffset> consumerToOffset;
9797
for (auto& consumerResult : partResult.GetConsumerResult()) {
9898
if (consumerResult.GetErrorCode() == NPersQueue::NErrorCode::OK) {
99-
consumerToOffset.emplace(consumerResult.GetConsumer(), TEvKafka::PartitionConsumerOffset{static_cast<ui64>(partResult.GetPartition()), static_cast<ui64>(consumerResult.GetCommitedOffset()), consumerResult.GetCommittedMetadata()});
99+
std::optional<TString> committedMetadata = consumerResult.HasCommittedMetadata() ?
100+
static_cast<std::optional<TString>>(consumerResult.GetCommittedMetadata()) :
101+
std::nullopt;
102+
TEvKafka::PartitionConsumerOffset partitionConsumerOffset = TEvKafka::PartitionConsumerOffset{
103+
static_cast<ui64>(partResult.GetPartition()),
104+
static_cast<ui64>(consumerResult.GetCommitedOffset()),
105+
committedMetadata};
106+
consumerToOffset.emplace(
107+
consumerResult.GetConsumer(),
108+
partitionConsumerOffset);
100109
}
101110
}
102111
(*PartitionIdToOffsets)[partResult.GetPartition()] = consumerToOffset;

ydb/core/kafka_proxy/kafka_events.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,12 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResp
230230
struct PartitionConsumerOffset {
231231
ui64 PartitionIndex;
232232
ui64 Offset;
233-
TString Metadata;
233+
std::optional<TString> Metadata = std::nullopt;
234+
235+
PartitionConsumerOffset(ui64 partitionIndex, ui64 offset, std::optional<TString> metadata = std::nullopt) :
236+
PartitionIndex(partitionIndex),
237+
Offset(offset),
238+
Metadata(metadata) {}
234239
};
235240

236241
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>

ydb/core/kafka_proxy/ut/kafka_test_client.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,6 @@ struct TReadInfo {
3939
i32 GenerationId;
4040
};
4141

42-
struct TConsumerOffset {
43-
ui64 PartitionIndex;
44-
ui64 Offset;
45-
TString Metadata;
46-
};
47-
48-
4942
class TKafkaTestClient {
5043
public:
5144
TKafkaTestClient(ui16 port, const TString clientName = "TestClient");

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,13 +1165,29 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
11651165
UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end());
11661166
UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 0);
11671167
}
1168-
11691168
{
1170-
// Check commit
11711169
std::unordered_map<TString, std::vector<NKafka::TEvKafka::PartitionConsumerOffset>> offsets;
11721170
std::vector<NKafka::TEvKafka::PartitionConsumerOffset> partitionsAndOffsets;
1171+
{
1172+
// Check commit
1173+
11731174
for (ui64 i = 0; i < minActivePartitions; ++i) {
1174-
partitionsAndOffsets.emplace_back(i, static_cast<ui64>(recordsCount), commitedMetaData);
1175+
// check that if a partition has a non-zero committed offset (that doesn't exceed endoffset) and committed metadata
1176+
// or a zero committed offset and metadata
1177+
// than no error is thrown and metadata is updated
1178+
1179+
// check that otherwise, if the committed offset exceeds current endoffset of the partition
1180+
// than an error is returned and passed committed metadata is not saved
1181+
1182+
if (i == 0) {
1183+
partitionsAndOffsets.emplace_back(i, static_cast<ui64>(recordsCount), commitedMetaData);
1184+
} else if (i == 1) {
1185+
partitionsAndOffsets.emplace_back(i, 0, commitedMetaData);
1186+
} else if (i == 2) {
1187+
partitionsAndOffsets.emplace_back(i, static_cast<ui64>(recordsCount), commitedMetaData);
1188+
} else {
1189+
partitionsAndOffsets.emplace_back(i, static_cast<ui64>(recordsCount));
1190+
}
11751191
}
11761192
offsets[firstTopicName] = partitionsAndOffsets;
11771193
offsets[shortTopicName] = partitionsAndOffsets;
@@ -1181,13 +1197,21 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
11811197
UNIT_ASSERT_VALUES_EQUAL(topic.Partitions.size(), minActivePartitions);
11821198
for (const auto& partition : topic.Partitions) {
11831199
if (topic.Name.value() == firstTopicName) {
1184-
if (partition.PartitionIndex == 0) {
1200+
// in first topic
1201+
if (partition.PartitionIndex == 0 || partition.PartitionIndex == 1) {
11851202
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
11861203
} else {
11871204
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::OFFSET_OUT_OF_RANGE));
11881205
}
11891206
} else {
1190-
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::OFFSET_OUT_OF_RANGE));
1207+
if (partition.PartitionIndex == 1) {
1208+
// nothing was produced in the second topic
1209+
// check that if a zero offset is committed no error occurs and committed metadata is saved
1210+
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1211+
} else {
1212+
// otherwise, an error occurs, because committed offset exceeds endoffset
1213+
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::OFFSET_OUT_OF_RANGE));
1214+
}
11911215
}
11921216
}
11931217
}
@@ -1205,11 +1229,24 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
12051229
auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; });
12061230
UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end());
12071231
UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 5);
1208-
for (auto p = partitions.begin(); p != partitions.end(); p++) {
1209-
UNIT_ASSERT_VALUES_EQUAL(p->Metadata, "");
1232+
UNIT_ASSERT_VALUES_EQUAL(partition0->Metadata, commitedMetaData);
1233+
int i = 0;
1234+
// checking committed metadata for the first topic
1235+
for (auto it = partitions.begin(); it != partitions.end(); it++) {
1236+
if (i != 2) {
1237+
// for i == 0 and i == 1 check that committed metadata == "additional-info" as committed offset didn't exceed endoffset
1238+
// for other i != 2 values check that committed metadata is empty as no metadata was committed
1239+
// that a new value of metadata is saved
1240+
UNIT_ASSERT_VALUES_EQUAL(it->Metadata, partitionsAndOffsets[i].Metadata);
1241+
} else {
1242+
// check that in case an error has occurred (because committed offset exceeded endoffset)
1243+
// committed metadata is not saved
1244+
UNIT_ASSERT_VALUES_EQUAL(it->Metadata, std::nullopt);
1245+
}
1246+
i += 1;
12101247
}
12111248
}
1212-
1249+
}
12131250
{
12141251
// Check fetch offsets with nonexistent topic
12151252
std::map<TString, std::vector<i32>> topicsToPartions;
@@ -1225,6 +1262,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
12251262

12261263
{
12271264
// Check commit with nonexistent topic
1265+
12281266
std::unordered_map<TString, std::vector<NKafka::TEvKafka::PartitionConsumerOffset>> offsets;
12291267
std::vector<NKafka::TEvKafka::PartitionConsumerOffset> partitionsAndOffsets;
12301268
for (ui64 i = 0; i < minActivePartitions; ++i) {

ydb/core/persqueue/events/internal.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,8 @@ struct TEvPQ {
363363

364364
TEvSetClientInfo(const ui64 cookie, const TString& clientId, const ui64 offset, const TString& sessionId, const ui64 partitionSessionId,
365365
const ui32 generation, const ui32 step, const TActorId& pipeClient,
366-
ESetClientInfoType type = ESCI_OFFSET, ui64 readRuleGeneration = 0, bool strict = false)
366+
ESetClientInfoType type = ESCI_OFFSET, ui64 readRuleGeneration = 0, bool strict = false,
367+
const std::optional<TString>& сommittedMetadata = std::nullopt)
367368
: Cookie(cookie)
368369
, ClientId(clientId)
369370
, Offset(offset)
@@ -375,6 +376,7 @@ struct TEvPQ {
375376
, ReadRuleGeneration(readRuleGeneration)
376377
, Strict(strict)
377378
, PipeClient(pipeClient)
379+
, CommittedMetadata(сommittedMetadata)
378380
{
379381
}
380382

@@ -389,6 +391,7 @@ struct TEvPQ {
389391
ui64 ReadRuleGeneration;
390392
bool Strict;
391393
TActorId PipeClient;
394+
std::optional<TString> CommittedMetadata;
392395
};
393396

394397

ydb/core/persqueue/partition.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,9 @@ void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst, NWilson::TSpa
169169
}
170170

171171
void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset,
172-
const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits) {
173-
ctx.Send(Tablet, MakeReplyGetClientOffsetOk(dst, offset, writeTimestamp, createTimestamp, consumerHasAnyCommits).Release());
172+
const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits,
173+
const std::optional<TString>& committedMetadata) {
174+
ctx.Send(Tablet, MakeReplyGetClientOffsetOk(dst, offset, writeTimestamp, createTimestamp, consumerHasAnyCommits, committedMetadata).Release());
174175
}
175176

176177
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) {
@@ -770,6 +771,9 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
770771
clientInfo->SetConsumer(userInfo.User);
771772
clientInfo->set_errorcode(NPersQueue::NErrorCode::EErrorCode::OK);
772773
clientInfo->SetCommitedOffset(userInfo.Offset);
774+
if (userInfo.CommittedMetadata.has_value()) {
775+
clientInfo->SetCommittedMetadata(*userInfo.CommittedMetadata);
776+
}
773777
requiredConsumers.extract(userInfo.User);
774778
}
775779
continue;
@@ -2783,6 +2787,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
27832787
userInfo.Generation = actual->Generation;
27842788
userInfo.Step = actual->Step;
27852789
userInfo.Offset = actual->Offset;
2790+
userInfo.CommittedMetadata = actual->CommittedMetadata;
27862791
if (userInfo.Offset <= (i64)CompactionBlobEncoder.StartOffset) {
27872792
userInfo.AnyCommits = false;
27882793
}
@@ -3196,7 +3201,6 @@ void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
31963201

31973202
if (!act.SessionId.empty() && act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && (i64)act.Offset <= userInfo.Offset) { //this is stale request, answer ok for it
31983203
ScheduleReplyOk(act.Cookie);
3199-
32003204
return;
32013205
}
32023206

@@ -3266,6 +3270,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
32663270
{
32673271
const TString& user = act.ClientId;
32683272
ui64 offset = act.Offset;
3273+
const std::optional<TString>& committedMetadata = act.CommittedMetadata ? act.CommittedMetadata : userInfo.CommittedMetadata;
32693274
const TString& session = act.SessionId;
32703275
ui32 generation = act.Generation;
32713276
ui32 step = act.Step;
@@ -3304,6 +3309,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
33043309
} else {
33053310
if (createSession || dropSession) {
33063311
offset = userInfo.Offset;
3312+
33073313
auto *ui = UsersInfoStorage->GetIfExists(userInfo.User);
33083314
auto ts = ui ? GetTime(*ui, userInfo.Offset) : std::make_pair<TInstant, TInstant>(TInstant::Zero(), TInstant::Zero());
33093315

@@ -3335,6 +3341,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
33353341
);
33363342

33373343
userInfo.Offset = offset;
3344+
userInfo.CommittedMetadata = committedMetadata;
33383345
if (userInfo.Offset <= (i64)BlobEncoder.StartOffset) {
33393346
userInfo.AnyCommits = false;
33403347
}
@@ -3358,14 +3365,15 @@ void TPartition::ScheduleReplyGetClientOffsetOk(const ui64 dst,
33583365
const i64 offset,
33593366
const TInstant writeTimestamp,
33603367
const TInstant createTimestamp,
3361-
bool consumerHasAnyCommits)
3368+
bool consumerHasAnyCommits,
3369+
const std::optional<TString>& committedMetadata)
33623370
{
33633371
Replies.emplace_back(Tablet,
33643372
MakeReplyGetClientOffsetOk(dst,
33653373
offset,
33663374
writeTimestamp,
33673375
createTimestamp,
3368-
consumerHasAnyCommits).Release());
3376+
consumerHasAnyCommits, committedMetadata).Release());
33693377

33703378
}
33713379

@@ -3441,7 +3449,7 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
34413449
ui64 offset, ui32 gen, ui32 step, const TString& session,
34423450
ui64 readOffsetRewindSum,
34433451
ui64 readRuleGeneration,
3444-
bool anyCommits)
3452+
bool anyCommits, const std::optional<TString>& committedMetadata)
34453453
{
34463454
TBuffer idata;
34473455
{
@@ -3453,6 +3461,9 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
34533461
userData.SetOffsetRewindSum(readOffsetRewindSum);
34543462
userData.SetReadRuleGeneration(readRuleGeneration);
34553463
userData.SetAnyCommits(anyCommits);
3464+
if (committedMetadata.has_value()) {
3465+
userData.SetCommittedMetadata(*committedMetadata);
3466+
}
34563467

34573468
TString out;
34583469
Y_PROTOBUF_SUPPRESS_NODISCARD userData.SerializeToString(&out);
@@ -3514,7 +3525,7 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request)
35143525
userInfo->Session,
35153526
ui ? ui->ReadOffsetRewindSum : 0,
35163527
userInfo->ReadRuleGeneration,
3517-
userInfo->AnyCommits);
3528+
userInfo->AnyCommits, userInfo->CommittedMetadata);
35183529
} else {
35193530
AddCmdDeleteRange(request,
35203531
ikey, ikeyDeprecated);
@@ -3556,6 +3567,7 @@ TUserInfoBase& TPartition::GetOrCreatePendingUser(const TString& user,
35563567
newPendingUserIt->second.Generation = userIt->Generation;
35573568
newPendingUserIt->second.Step = userIt->Step;
35583569
newPendingUserIt->second.Offset = userIt->Offset;
3570+
newPendingUserIt->second.CommittedMetadata = userIt->CommittedMetadata;
35593571
newPendingUserIt->second.ReadRuleGeneration = userIt->ReadRuleGeneration;
35603572
newPendingUserIt->second.Important = userIt->Important;
35613573
newPendingUserIt->second.ReadFromTimestamp = userIt->ReadFromTimestamp;
@@ -3594,7 +3606,8 @@ THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyGetClientOffsetOk(const ui
35943606
const i64 offset,
35953607
const TInstant writeTimestamp,
35963608
const TInstant createTimestamp,
3597-
bool consumerHasAnyCommits)
3609+
bool consumerHasAnyCommits,
3610+
const std::optional<TString>& committedMetadata)
35983611
{
35993612
auto response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
36003613
NKikimrClient::TResponse& resp = *response->Response;
@@ -3605,6 +3618,10 @@ THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyGetClientOffsetOk(const ui
36053618
auto user = resp.MutablePartitionResponse()->MutableCmdGetClientOffsetResult();
36063619
if (offset > -1)
36073620
user->SetOffset(offset);
3621+
3622+
if (committedMetadata.has_value()) {
3623+
user->SetCommittedMetadata(*committedMetadata);
3624+
}
36083625
if (writeTimestamp)
36093626
user->SetWriteTimestampMS(writeTimestamp.MilliSeconds());
36103627
if (createTimestamp) {

ydb/core/persqueue/partition.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
161161
NKikimrPQ::TError::EKind kind, const TString& reason);
162162
void ReplyErrorForStoredWrites(const TActorContext& ctx);
163163

164-
void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits);
164+
void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits, const std::optional<TString>& committedMetadata);
165165
void ReplyOk(const TActorContext& ctx, const ui64 dst);
166166
void ReplyOk(const TActorContext& ctx, const ui64 dst, NWilson::TSpan& span);
167167
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo, NWilson::TSpan& span);
@@ -358,7 +358,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
358358
const i64 offset,
359359
const TInstant writeTimestamp,
360360
const TInstant createTimestamp,
361-
bool consumerHasAnyCommits);
361+
bool consumerHasAnyCommits,
362+
const std::optional<TString>& committedMetadata=std::nullopt);
362363
void ScheduleReplyError(const ui64 dst,
363364
NPersQueue::NErrorCode::EErrorCode errorCode,
364365
const TString& error);
@@ -375,7 +376,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
375376
ui64 offset, ui32 gen, ui32 step, const TString& session,
376377
ui64 readOffsetRewindSum,
377378
ui64 readRuleGeneration,
378-
bool anyCommits);
379+
bool anyCommits, const std::optional<TString>& committedMetadata);
379380
void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request);
380381
void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
381382
void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request);
@@ -390,7 +391,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
390391
const i64 offset,
391392
const TInstant writeTimestamp,
392393
const TInstant createTimestamp,
393-
bool consumerHasAnyCommits);
394+
bool consumerHasAnyCommits,
395+
const std::optional<TString>& committedMetadata);
394396
THolder<TEvPQ::TEvError> MakeReplyError(const ui64 dst,
395397
NPersQueue::NErrorCode::EErrorCode errorCode,
396398
const TString& error);

0 commit comments

Comments
 (0)