Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Mark bridge unhealthy based on restart requests from endpoints #1210

Merged
merged 13 commits into from
Feb 21, 2025
Merged
72 changes: 55 additions & 17 deletions jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/Bridge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.jxmpp.jid.Jid
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
import org.jitsi.jicofo.bridge.BridgeConfig.Companion.config as config

/**
Expand Down Expand Up @@ -57,6 +58,15 @@ class Bridge @JvmOverloads internal constructor(
clock
)

private val endpointRestartRequestRate = RateTracker(
config.endpointRestartRequestInterval,
Duration.ofMillis(100),
clock
)

/** Number of endpoints currently allocated on this bridge by this jicofo instance. */
val endpoints = AtomicInteger(0)

/**
* The last report stress level
*/
Expand Down Expand Up @@ -237,11 +247,39 @@ class Bridge @JvmOverloads internal constructor(
return compare(this, other)
}

/**
* Notifies this [Bridge] that it was used for a new endpoint.
*/
/** Notifies this [Bridge] that it was used for a new endpoint. */
fun endpointAdded() {
newEndpointsRate.update(1)
endpoints.incrementAndGet()
}

fun endpointRemoved() = endpointsRemoved(1)
fun endpointsRemoved(count: Int) {
endpoints.addAndGet(-count)
if (endpoints.get() < 0) {
logger.error("Removed more endpoints than were allocated. Resetting to 0.", Throwable())
endpoints.set(0)
}
}
fun endpointRequestedRestart() {
endpointRestartRequestRate.update(1)

if (config.endpointRestartRequestEnabled) {
val restartCount = endpointRestartRequestRate.getAccumulatedCount()
val endpoints = endpoints.get()
if (endpoints > config.endpointRestartRequestMinEndpoints &&
restartCount > endpoints * config.endpointRestartRequestThreshold
) {
// Reset the timeout regardless of the previous state, but only log if the state changed.
if (isOperational) {
logger.info(
"Marking as non-operational because of endpoint restart requests. " +
"Endpoints=$endpoints, restartCount=$restartCount"
)
}
isOperational = false
}
}
}

/**
Expand Down Expand Up @@ -290,20 +328,20 @@ class Bridge @JvmOverloads internal constructor(
get() = stress >= config.stressThreshold

val debugState: OrderedJsonObject
get() {
val o = OrderedJsonObject()
o["version"] = version.toString()
o["release"] = releaseId.toString()
o["stress"] = stress
o["operational"] = isOperational
o["region"] = region.toString()
o["drain"] = isDraining
o["graceful-shutdown"] = isInGracefulShutdown
o["shutting-down"] = isShuttingDown
o["overloaded"] = isOverloaded
o["relay-id"] = relayId.toString()
o["healthy"] = isHealthy
return o
get() = OrderedJsonObject().apply {
this["drain"] = isDraining
this["endpoints"] = endpoints.get()
this["endpoint-restart-requests"] = endpointRestartRequestRate.getAccumulatedCount()
this["graceful-shutdown"] = isInGracefulShutdown
this["healthy"] = isHealthy
this["operational"] = isOperational
this["overloaded"] = isOverloaded
this["region"] = region.toString()
this["relay-id"] = relayId.toString()
this["release"] = releaseId.toString()
this["shutting-down"] = isShuttingDown
this["stress"] = stress
this["version"] = version.toString()
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ class BridgeConfig private constructor() {
fun getRegionGroup(region: String?): Set<String> =
if (region == null) emptySet() else regionGroups[region] ?: setOf(region)

val endpointRestartRequestEnabled: Boolean by config {
"$BASE.endpoint-restart-request.enabled".from(JitsiConfig.newConfig)
}
val endpointRestartRequestInterval: Duration by config {
"$BASE.endpoint-restart-request.interval".from(JitsiConfig.newConfig)
}
val endpointRestartRequestMinEndpoints: Int by config {
"$BASE.endpoint-restart-request.min-endpoints".from(JitsiConfig.newConfig)
}
val endpointRestartRequestThreshold: Int by config {
"$BASE.endpoint-restart-request.threshold".from(JitsiConfig.newConfig)
}

companion object {
const val BASE = "jicofo.bridge"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,7 @@ class BridgeSelector @JvmOverloads constructor(
conferenceBridges,
participantProperties,
OctoConfig.config.enabled
).also {
// The bridge was selected for an endpoint, increment its counter.
it?.endpointAdded()
}
)
}

val stats: JSONObject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ interface ColibriSessionManager {
suppressLocalBridgeUpdate: Boolean = false
)

fun getBridgeSessionId(participantId: String): String?
fun getBridgeSessionId(participantId: String): Pair<Bridge?, String?>

/**
* Stop using [bridge], expiring all endpoints on it (e.g. because it was detected to have failed).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class ColibriV2SessionManager(
logger.info("Expiring.")
sessions.values.forEach { session ->
logger.debug { "Expiring $session" }
session.bridge.endpointsRemoved(getSessionParticipants(session).size)
session.expire()
}
sessions.clear()
Expand All @@ -134,6 +135,7 @@ class ColibriV2SessionManager(

private fun removeSession(session: Colibri2Session): Set<ParticipantInfo> {
val participants = getSessionParticipants(session)
session.bridge.endpointsRemoved(participants.size)
session.expire()
removeNode(session, ::repairMesh)
sessions.remove(session.relayId)
Expand Down Expand Up @@ -162,6 +164,7 @@ class ColibriV2SessionManager(
sessionRemoved = true
} else {
session.expire(sessionParticipantsToRemove)
session.bridge.endpointRemoved()
sessionParticipantsToRemove.forEach { remove(it) }
participantsRemoved.addAll(sessionParticipantsToRemove)

Expand Down Expand Up @@ -334,6 +337,7 @@ class ColibriV2SessionManager(
)
}
participantInfo = ParticipantInfo(participant, session)
session.bridge.endpointAdded()
stanzaCollector = session.sendAllocationRequest(participantInfo)
add(participantInfo)
if (created) {
Expand Down Expand Up @@ -581,8 +585,9 @@ class ColibriV2SessionManager(
}
}

override fun getBridgeSessionId(participantId: String): String? = synchronized(syncRoot) {
return participants[participantId]?.session?.id
override fun getBridgeSessionId(participantId: String): Pair<Bridge?, String?> = synchronized(syncRoot) {
val session = participants[participantId]?.session
return Pair(session?.bridge, session?.id)
}

override fun removeBridge(bridge: Bridge): List<String> = synchronized(syncRoot) {
Expand Down
12 changes: 12 additions & 0 deletions jicofo-selector/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ jicofo {
// Note that if no separate Service connection has been configured, all services will automatically use the
// Client connection.
xmpp-connection-name = Service

// Detect bridge failure based on endpoints that request a restart due to an ICE failure.
endpoint-restart-request {
enabled = true
// Count the number of restart requests in the last `interval`.
interval = 2 minutes
// The minimum number of endpoints on the bridge before this check is enabled.
min-endpoints = 50
// The threshold, in terms of fraction of endpoints that requested a restart in the last `interval`, above which
// the bridge will be considered unhelathy.
threshold = 0.1
}
}
// Configure the codecs and RTP extensions to be used in the offer sent to clients.
codec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class BridgeSelectorTest : ShouldSpec() {
val bridge = bridgeSelector.addJvbAddress(jid1).apply { setStats() }
bridge.stress shouldBe 0
bridgeSelector.selectBridge()
bridge.endpointAdded()
// The stress should increase because it was recently selected.
bridge.stress shouldNotBe 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.jitsi.jicofo.conference;

import kotlin.*;
import org.jetbrains.annotations.*;
import org.jitsi.jicofo.*;
import org.jitsi.jicofo.auth.*;
Expand Down Expand Up @@ -1169,13 +1170,18 @@ public MemberRole getRoleForMucJid(Jid mucJid)
*/
public void iceFailed(@NotNull Participant participant, String bridgeSessionId)
{
String existingBridgeSessionId = getColibriSessionManager().getBridgeSessionId(participant.getEndpointId());
if (Objects.equals(bridgeSessionId, existingBridgeSessionId))
Pair<Bridge, String> existingBridgeSession
= getColibriSessionManager().getBridgeSessionId(participant.getEndpointId());
if (Objects.equals(bridgeSessionId, existingBridgeSession.getSecond()))
{
logger.info(String.format(
"Received ICE failed notification from %s, bridge-session ID: %s",
participant.getEndpointId(),
bridgeSessionId));
if (existingBridgeSession.getFirst() != null)
{
existingBridgeSession.getFirst().endpointRequestedRestart();
}
reInviteParticipant(participant);
}
else
Expand Down Expand Up @@ -1203,12 +1209,18 @@ void terminateSession(
throws InvalidBridgeSessionIdException
{
// TODO: maybe move the bridgeSessionId logic to Participant
String existingBridgeSessionId = getColibriSessionManager().getBridgeSessionId(participant.getEndpointId());
if (!Objects.equals(bridgeSessionId, existingBridgeSessionId))
Pair<Bridge, String> existingBridgeSession
= getColibriSessionManager().getBridgeSessionId(participant.getEndpointId());
if (!Objects.equals(bridgeSessionId, existingBridgeSession.getSecond()))
{
throw new InvalidBridgeSessionIdException(bridgeSessionId + " is not a currently active session");
}

if (reinvite && existingBridgeSession.getFirst() != null)
{
existingBridgeSession.getFirst().endpointRequestedRestart();
}

synchronized (participantLock)
{
terminateParticipant(
Expand Down