diff --git a/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp b/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp index d207b9dfb95b..914a9856e329 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp @@ -157,7 +157,7 @@ const TString SELECT_ALIVE_MEMBERS = R"sql( DECLARE $PaginationMemberId AS Utf8; DECLARE $Limit AS Uint64; - SELECT member_id, instance_id, rebalance_timeout_ms + SELECT member_id, instance_id, rebalance_timeout_ms, session_timeout_ms, heartbeat_deadline FROM `%s` VIEW PRIMARY KEY WHERE database = $Database diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp index c4e09579f7a9..c70f26177265 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp @@ -602,11 +602,16 @@ void TKafkaBalancerActor::JoinStepWaitMembersAndChooseProtocol(NKqp::TEvKqp::TEv // check all clients have joined or their timeout has expired for (auto prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.begin(); prevGenerationMembersAndTimeoutsIt != WaitedMemberIdsAndTimeouts.end();) { + ui32 memberRebalanceTimeoutMs = prevGenerationMembersAndTimeoutsIt->second.RebalanceTimeoutMs; + const TInstant& memberHeartbeatDeadline = prevGenerationMembersAndTimeoutsIt->second.HeartbeatDeadline; if (AllWorkerStates.count(prevGenerationMembersAndTimeoutsIt->first) == 1) { KAFKA_LOG_D(TStringBuilder() << "Waited member connected: " << prevGenerationMembersAndTimeoutsIt->first); prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); - } else if ((RebalanceStartTime + TDuration::MilliSeconds(prevGenerationMembersAndTimeoutsIt->second)) < now) { - KAFKA_LOG_D(TStringBuilder() << "Waited member connect deadline: " << prevGenerationMembersAndTimeoutsIt->first); + } else if ((RebalanceStartTime + TDuration::MilliSeconds(memberRebalanceTimeoutMs)) < now) { + KAFKA_LOG_D(TStringBuilder() << "Rebalance deadline: " << prevGenerationMembersAndTimeoutsIt->first); + prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); + } else if (memberHeartbeatDeadline < now) { + KAFKA_LOG_D(TStringBuilder() << "Waited member connect session deadline: " << prevGenerationMembersAndTimeoutsIt->first); prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt); } else { ++prevGenerationMembersAndTimeoutsIt; @@ -938,7 +943,7 @@ bool TKafkaBalancerActor::ParseAssignments( bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts( NKqp::TEvKqp::TEvQueryResponse::TPtr ev, - std::unordered_map& membersAndRebalanceTimeouts, + std::unordered_map& membersAndTimeouts, TString& lastMemberId) { if (!ev) { @@ -956,7 +961,9 @@ bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts( TString memberId = TString(parser.ColumnParser("member_id").GetUtf8()); TString instanceId = parser.ColumnParser("instance_id").GetOptionalUtf8().value_or(""); ui32 rebalanceTimeoutMs = parser.ColumnParser("rebalance_timeout_ms").GetOptionalUint32().value_or(DEFAULT_REBALANCE_TIMEOUT_MS); - membersAndRebalanceTimeouts[memberId] = rebalanceTimeoutMs; + ui32 sessionTimeoutMs = parser.ColumnParser("session_timeout_ms").GetOptionalUint32().value_or(DEFAULT_SESSION_TIMEOUT_MS); + TInstant heartbeatDeadline = parser.ColumnParser("heartbeat_deadline").GetOptionalDatetime().value_or(TInstant::Now() + TDuration::MilliSeconds(sessionTimeoutMs)); + membersAndTimeouts[memberId] = {rebalanceTimeoutMs, heartbeatDeadline}; lastMemberId = memberId; } @@ -1224,7 +1231,7 @@ NYdb::TParamsBuilder TKafkaBalancerActor::BuildInsertMemberParams() { params.AddParam("$MemberId").Utf8(MemberId).Build(); params.AddParam("$InstanceId").Utf8(InstanceId).Build(); params.AddParam("$Database").Utf8(Kqp->DataBase).Build(); - params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(RebalanceTimeoutMs + SessionTimeoutMs)).Build(); + params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(SessionTimeoutMs)).Build(); params.AddParam("$SessionTimeoutMs").Uint32(SessionTimeoutMs).Build(); params.AddParam("$RebalanceTimeoutMs").Uint32(RebalanceTimeoutMs).Build(); diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h index 7086cf6b5ab0..daaebff6d638 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h @@ -114,6 +114,11 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped message, ui8 retryNum = 0) : Context(context) , CorrelationId(corellationId) @@ -309,7 +314,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped ParseGroupState(NKqp::TEvKqp::TEvQueryResponse::TPtr ev); bool ParseAssignments(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, TString& assignments); bool ParseWorkerStates(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& workerStates, TString& outLastMemberId); - bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& membersAndRebalanceTimeouts, TString& lastMemberId); + bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map& membersAndRebalanceTimeouts, TString& lastMemberId); bool ParseDeadsAndSessionTimeout(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& deadsCount, ui32& outSessionTimeoutMs); bool ParseGroupsCount(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& groupsCount); bool ParseMemberGeneration(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& generation); @@ -363,7 +368,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped WorkerStates; std::unordered_map AllWorkerStates; - std::unordered_map WaitedMemberIdsAndTimeouts; + std::unordered_map WaitedMemberIdsAndTimeouts; TInstant RebalanceStartTime = TInstant::Now(); TString Protocol; TString ProtocolType;