From 1e4885c31faf03a3958bcc04f33ab2c421b5c00d Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Mon, 9 Jun 2025 11:25:20 +0000 Subject: [PATCH 1/3] add MemberTimeoutsMs struct for deadlines/timeouts change heartbeat_deadline from now + rebalaceTimeout + sessionTimeout to just now + sessionTimeout add checks to stop waiting ungracefully finished consumers after sessionTimeout has passed --- .../actors/kafka_balance_actor_sql.cpp | 2 +- .../kafka_proxy/actors/kafka_balancer_actor.cpp | 17 ++++++++++++----- .../kafka_proxy/actors/kafka_balancer_actor.h | 9 +++++++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp b/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp index d207b9dfb95b..914a9856e329 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp @@ -157,7 +157,7 @@ const TString SELECT_ALIVE_MEMBERS = R"sql( DECLARE $PaginationMemberId AS Utf8; DECLARE $Limit AS Uint64; - SELECT member_id, instance_id, rebalance_timeout_ms + SELECT member_id, instance_id, rebalance_timeout_ms, session_timeout_ms, heartbeat_deadline FROM `%s` VIEW PRIMARY KEY WHERE database = $Database diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp index c4e09579f7a9..bd243e011eae 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp @@ -602,11 +602,16 @@ void TKafkaBalancerActor::JoinStepWaitMembersAndChooseProtocol(NKqp::TEvKqp::TEv // check all clients have joined or their timeout has expired for (auto prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.begin(); prevGenerationMembersAndTimeoutsIt != WaitedMemberIdsAndTimeouts.end();) { + ui32 memberRebalanceTimeoutMs = prevGenerationMembersAndTimeoutsIt->second.RebalanceTimeoutMs; + TInstant memberHeartbeatDeadline = prevGenerationMembersAndTimeoutsIt->second.HeartbeatDeadline; if (AllWorkerStates.count(prevGenerationMembersAndTimeoutsIt->first) == 1) { KAFKA_LOG_D(TStringBuilder() << "Waited member connected: " << prevGenerationMembersAndTimeoutsIt->first); prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); - } else if ((RebalanceStartTime + TDuration::MilliSeconds(prevGenerationMembersAndTimeoutsIt->second)) < now) { - KAFKA_LOG_D(TStringBuilder() << "Waited member connect deadline: " << prevGenerationMembersAndTimeoutsIt->first); + } else if ((RebalanceStartTime + TDuration::MilliSeconds(memberRebalanceTimeoutMs)) < now) { + KAFKA_LOG_D(TStringBuilder() << "Waited member connect rebalance deadline: " << prevGenerationMembersAndTimeoutsIt->first); + prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); + } else if (memberHeartbeatDeadline < now) { + KAFKA_LOG_D(TStringBuilder() << "Waited member connect session deadline: " << prevGenerationMembersAndTimeoutsIt->first); prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); } else { ++prevGenerationMembersAndTimeoutsIt; @@ -938,7 +943,7 @@ bool TKafkaBalancerActor::ParseAssignments( bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts( NKqp::TEvKqp::TEvQueryResponse::TPtr ev, - std::unordered_map& membersAndRebalanceTimeouts, + std::unordered_map& membersAndTimeouts, TString& lastMemberId) { if (!ev) { @@ -956,7 +961,9 @@ bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts( TString memberId = TString(parser.ColumnParser("member_id").GetUtf8()); TString instanceId = parser.ColumnParser("instance_id").GetOptionalUtf8().value_or(""); ui32 rebalanceTimeoutMs = parser.ColumnParser("rebalance_timeout_ms").GetOptionalUint32().value_or(DEFAULT_REBALANCE_TIMEOUT_MS); - membersAndRebalanceTimeouts[memberId] = rebalanceTimeoutMs; + ui32 sessionTimeoutMs = parser.ColumnParser("session_timeout_ms").GetOptionalUint32().value_or(DEFAULT_SESSION_TIMEOUT_MS); + TInstant heartbeatDeadline = parser.ColumnParser("heartbeat_deadline").GetOptionalDatetime().value_or(TInstant::Now() + TDuration::MilliSeconds(sessionTimeoutMs)); + membersAndTimeouts[memberId] = {rebalanceTimeoutMs, heartbeatDeadline}; lastMemberId = memberId; } @@ -1224,7 +1231,7 @@ NYdb::TParamsBuilder TKafkaBalancerActor::BuildInsertMemberParams() { params.AddParam("$MemberId").Utf8(MemberId).Build(); params.AddParam("$InstanceId").Utf8(InstanceId).Build(); params.AddParam("$Database").Utf8(Kqp->DataBase).Build(); - params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(RebalanceTimeoutMs + SessionTimeoutMs)).Build(); + params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds( SessionTimeoutMs)).Build(); params.AddParam("$SessionTimeoutMs").Uint32(SessionTimeoutMs).Build(); params.AddParam("$RebalanceTimeoutMs").Uint32(RebalanceTimeoutMs).Build(); diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h index 7086cf6b5ab0..084e7b2fcc0c 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h @@ -71,6 +71,11 @@ struct TGroupStatus { TString ProtocolType; }; +struct MemberTimeoutsMs { + ui32 RebalanceTimeoutMs; + TInstant HeartbeatDeadline; +}; + class TKafkaBalancerActor : public NActors::TActorBootstrapped { public: using TBase = NActors::TActorBootstrapped; @@ -309,7 +314,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped ParseGroupState(NKqp::TEvKqp::TEvQueryResponse::TPtr ev); bool ParseAssignments(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, TString& assignments); bool ParseWorkerStates(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& workerStates, TString& outLastMemberId); - bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& membersAndRebalanceTimeouts, TString& lastMemberId); + bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& membersAndRebalanceTimeouts, TString& lastMemberId); bool ParseDeadsAndSessionTimeout(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& deadsCount, ui32& outSessionTimeoutMs); bool ParseGroupsCount(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& groupsCount); bool ParseMemberGeneration(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& generation); @@ -363,7 +368,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped WorkerStates; std::unordered_map AllWorkerStates; - std::unordered_map WaitedMemberIdsAndTimeouts; + std::unordered_map WaitedMemberIdsAndTimeouts; TInstant RebalanceStartTime = TInstant::Now(); TString Protocol; TString ProtocolType; From 2f29ff00197f8850cafbbd460d305569c1a26e71 Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Mon, 9 Jun 2025 11:27:41 +0000 Subject: [PATCH 2/3] remove surplus space --- ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp index bd243e011eae..e70171807f57 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp @@ -1231,7 +1231,7 @@ NYdb::TParamsBuilder TKafkaBalancerActor::BuildInsertMemberParams() { params.AddParam("$MemberId").Utf8(MemberId).Build(); params.AddParam("$InstanceId").Utf8(InstanceId).Build(); params.AddParam("$Database").Utf8(Kqp->DataBase).Build(); - params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds( SessionTimeoutMs)).Build(); + params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(SessionTimeoutMs)).Build(); params.AddParam("$SessionTimeoutMs").Uint32(SessionTimeoutMs).Build(); params.AddParam("$RebalanceTimeoutMs").Uint32(RebalanceTimeoutMs).Build(); From 91724186c6a18e55050b04a77ee48e904ffcc906 Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Tue, 17 Jun 2025 16:24:15 +0000 Subject: [PATCH 3/3] solve mistakes --- ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp | 6 +++--- ydb/core/kafka_proxy/actors/kafka_balancer_actor.h | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp index e70171807f57..c70f26177265 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp @@ -603,12 +603,12 @@ void TKafkaBalancerActor::JoinStepWaitMembersAndChooseProtocol(NKqp::TEvKqp::TEv // check all clients have joined or their timeout has expired for (auto prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.begin(); prevGenerationMembersAndTimeoutsIt != WaitedMemberIdsAndTimeouts.end();) { ui32 memberRebalanceTimeoutMs = prevGenerationMembersAndTimeoutsIt->second.RebalanceTimeoutMs; - TInstant memberHeartbeatDeadline = prevGenerationMembersAndTimeoutsIt->second.HeartbeatDeadline; + const TInstant& memberHeartbeatDeadline = prevGenerationMembersAndTimeoutsIt->second.HeartbeatDeadline; if (AllWorkerStates.count(prevGenerationMembersAndTimeoutsIt->first) == 1) { KAFKA_LOG_D(TStringBuilder() << "Waited member connected: " << prevGenerationMembersAndTimeoutsIt->first); prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); } else if ((RebalanceStartTime + TDuration::MilliSeconds(memberRebalanceTimeoutMs)) < now) { - KAFKA_LOG_D(TStringBuilder() << "Waited member connect rebalance deadline: " << prevGenerationMembersAndTimeoutsIt->first); + KAFKA_LOG_D(TStringBuilder() << "Rebalance deadline: " << prevGenerationMembersAndTimeoutsIt->first); prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); } else if (memberHeartbeatDeadline < now) { KAFKA_LOG_D(TStringBuilder() << "Waited member connect session deadline: " << prevGenerationMembersAndTimeoutsIt->first); @@ -943,7 +943,7 @@ bool TKafkaBalancerActor::ParseAssignments( bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts( NKqp::TEvKqp::TEvQueryResponse::TPtr ev, - std::unordered_map& membersAndTimeouts, + std::unordered_map& membersAndTimeouts, TString& lastMemberId) { if (!ev) { diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h index 084e7b2fcc0c..daaebff6d638 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h @@ -71,11 +71,6 @@ struct TGroupStatus { TString ProtocolType; }; -struct MemberTimeoutsMs { - ui32 RebalanceTimeoutMs; - TInstant HeartbeatDeadline; -}; - class TKafkaBalancerActor : public NActors::TActorBootstrapped { public: using TBase = NActors::TActorBootstrapped; @@ -119,6 +114,11 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped message, ui8 retryNum = 0) : Context(context) , CorrelationId(corellationId) @@ -314,7 +314,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped ParseGroupState(NKqp::TEvKqp::TEvQueryResponse::TPtr ev); bool ParseAssignments(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, TString& assignments); bool ParseWorkerStates(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& workerStates, TString& outLastMemberId); - bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& membersAndRebalanceTimeouts, TString& lastMemberId); + bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& membersAndRebalanceTimeouts, TString& lastMemberId); bool ParseDeadsAndSessionTimeout(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& deadsCount, ui32& outSessionTimeoutMs); bool ParseGroupsCount(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& groupsCount); bool ParseMemberGeneration(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& generation);