Skip to content

Ydb bug fix session timeout leave group stable 25 1 #19785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: stable-25-1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ const TString SELECT_ALIVE_MEMBERS = R"sql(
DECLARE $PaginationMemberId AS Utf8;
DECLARE $Limit AS Uint64;

SELECT member_id, instance_id, rebalance_timeout_ms
SELECT member_id, instance_id, rebalance_timeout_ms, session_timeout_ms, heartbeat_deadline
FROM `%s`
VIEW PRIMARY KEY
WHERE database = $Database
Expand Down
17 changes: 12 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,16 @@ void TKafkaBalancerActor::JoinStepWaitMembersAndChooseProtocol(NKqp::TEvKqp::TEv

// check all clients have joined or their timeout has expired
for (auto prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.begin(); prevGenerationMembersAndTimeoutsIt != WaitedMemberIdsAndTimeouts.end();) {
ui32 memberRebalanceTimeoutMs = prevGenerationMembersAndTimeoutsIt->second.RebalanceTimeoutMs;
const TInstant& memberHeartbeatDeadline = prevGenerationMembersAndTimeoutsIt->second.HeartbeatDeadline;
if (AllWorkerStates.count(prevGenerationMembersAndTimeoutsIt->first) == 1) {
KAFKA_LOG_D(TStringBuilder() << "Waited member connected: " << prevGenerationMembersAndTimeoutsIt->first);
prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt);
} else if ((RebalanceStartTime + TDuration::MilliSeconds(prevGenerationMembersAndTimeoutsIt->second)) < now) {
KAFKA_LOG_D(TStringBuilder() << "Waited member connect deadline: " << prevGenerationMembersAndTimeoutsIt->first);
} else if ((RebalanceStartTime + TDuration::MilliSeconds(memberRebalanceTimeoutMs)) < now) {
KAFKA_LOG_D(TStringBuilder() << "Rebalance deadline: " << prevGenerationMembersAndTimeoutsIt->first);
prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt);
} else if (memberHeartbeatDeadline < now) {
KAFKA_LOG_D(TStringBuilder() << "Waited member connect session deadline: " << prevGenerationMembersAndTimeoutsIt->first);
prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt);
} else {
++prevGenerationMembersAndTimeoutsIt;
Expand Down Expand Up @@ -938,7 +943,7 @@ bool TKafkaBalancerActor::ParseAssignments(

bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts(
NKqp::TEvKqp::TEvQueryResponse::TPtr ev,
std::unordered_map<TString, ui32>& membersAndRebalanceTimeouts,
std::unordered_map<TString, MemberTimeoutsMs>& membersAndTimeouts,
TString& lastMemberId)
{
if (!ev) {
Expand All @@ -956,7 +961,9 @@ bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts(
TString memberId = TString(parser.ColumnParser("member_id").GetUtf8());
TString instanceId = parser.ColumnParser("instance_id").GetOptionalUtf8().value_or("");
ui32 rebalanceTimeoutMs = parser.ColumnParser("rebalance_timeout_ms").GetOptionalUint32().value_or(DEFAULT_REBALANCE_TIMEOUT_MS);
membersAndRebalanceTimeouts[memberId] = rebalanceTimeoutMs;
ui32 sessionTimeoutMs = parser.ColumnParser("session_timeout_ms").GetOptionalUint32().value_or(DEFAULT_SESSION_TIMEOUT_MS);
TInstant heartbeatDeadline = parser.ColumnParser("heartbeat_deadline").GetOptionalDatetime().value_or(TInstant::Now() + TDuration::MilliSeconds(sessionTimeoutMs));
membersAndTimeouts[memberId] = {rebalanceTimeoutMs, heartbeatDeadline};

lastMemberId = memberId;
}
Expand Down Expand Up @@ -1224,7 +1231,7 @@ NYdb::TParamsBuilder TKafkaBalancerActor::BuildInsertMemberParams() {
params.AddParam("$MemberId").Utf8(MemberId).Build();
params.AddParam("$InstanceId").Utf8(InstanceId).Build();
params.AddParam("$Database").Utf8(Kqp->DataBase).Build();
params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(RebalanceTimeoutMs + SessionTimeoutMs)).Build();
params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(SessionTimeoutMs)).Build();
params.AddParam("$SessionTimeoutMs").Uint32(SessionTimeoutMs).Build();
params.AddParam("$RebalanceTimeoutMs").Uint32(RebalanceTimeoutMs).Build();

Expand Down
9 changes: 7 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_balancer_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
LEAVE_SET_DEAD
};

struct MemberTimeoutsMs {
ui32 RebalanceTimeoutMs;
TInstant HeartbeatDeadline;
};

TKafkaBalancerActor(const TContext::TPtr context, ui64 cookie, ui64 corellationId, TMessagePtr<TJoinGroupRequestData> message, ui8 retryNum = 0)
: Context(context)
, CorrelationId(corellationId)
Expand Down Expand Up @@ -309,7 +314,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
std::optional<TGroupStatus> ParseGroupState(NKqp::TEvKqp::TEvQueryResponse::TPtr ev);
bool ParseAssignments(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, TString& assignments);
bool ParseWorkerStates(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map<TString, NKafka::TWorkerState>& workerStates, TString& outLastMemberId);
bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map<TString, ui32>& membersAndRebalanceTimeouts, TString& lastMemberId);
bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map<TString, MemberTimeoutsMs>& membersAndRebalanceTimeouts, TString& lastMemberId);
bool ParseDeadsAndSessionTimeout(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& deadsCount, ui32& outSessionTimeoutMs);
bool ParseGroupsCount(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& groupsCount);
bool ParseMemberGeneration(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& generation);
Expand Down Expand Up @@ -363,7 +368,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
TString Assignments;
std::unordered_map<TString, TString> WorkerStates;
std::unordered_map<TString, NKafka::TWorkerState> AllWorkerStates;
std::unordered_map<TString, ui32> WaitedMemberIdsAndTimeouts;
std::unordered_map<TString, MemberTimeoutsMs> WaitedMemberIdsAndTimeouts;
TInstant RebalanceStartTime = TInstant::Now();
TString Protocol;
TString ProtocolType;
Expand Down
Loading