Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Mark bridge unhealthy based on restart requests from endpoints #1210

Merged
merged 13 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 116 additions & 75 deletions jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/Bridge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.jxmpp.jid.Jid
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.max
import org.jitsi.jicofo.bridge.BridgeConfig.Companion.config as config

/**
Expand All @@ -48,36 +51,34 @@ class Bridge @JvmOverloads internal constructor(
private val clock: Clock = Clock.systemUTC()
) : Comparable<Bridge> {

/**
* Keep track of the recently added endpoints.
*/
/** Keep track of the recently added endpoints. */
private val newEndpointsRate = RateTracker(
config.participantRampupInterval,
Duration.ofMillis(100),
clock
)

/**
* The last report stress level
*/
private val endpointRestartRequestRate = RateTracker(
config.iceFailureDetection.interval,
Duration.ofSeconds(1),
clock
)

/** Number of endpoints currently allocated on this bridge by this jicofo instance. */
val endpoints = AtomicInteger(0)

/** The last report stress level */
var lastReportedStressLevel = 0.0
private set

/**
* Holds bridge version (if known - not all bridge version are capable of
* reporting it).
*/
/** Holds bridge version (if known - not all bridge version are capable of reporting it). */
private var version: String? = null

/**
* Whether the last received presence indicated the bridge is healthy.
*/
/** Whether the last received presence indicated the bridge is healthy. */
var isHealthy = true
private set

/**
* Holds bridge release ID, or null if not known.
*/
/** Holds bridge release ID, or null if not known. */
private var releaseId: String? = null

/**
Expand Down Expand Up @@ -108,25 +109,16 @@ class Bridge @JvmOverloads internal constructor(
}
}

/**
* Start out with the configured value, update if the bridge reports a value.
*/
/** Start out with the configured value, update if the bridge reports a value. */
private var averageParticipantStress = config.averageParticipantStress

/**
* Stores a boolean that indicates whether the bridge is in graceful shutdown mode.
*/
/** Stores a boolean that indicates whether the bridge is in graceful shutdown mode. */
var isInGracefulShutdown = false // we assume it is not shutting down

/**
* Whether the bridge is in SHUTTING_DOWN mode.
*/
/** Whether the bridge is in SHUTTING_DOWN mode. */
var isShuttingDown = false
private set

/**
* @return true if the bridge is currently in drain mode
*/
/**
* Stores a boolean that indicates whether the bridge is in drain mode.
*/
Expand All @@ -140,19 +132,27 @@ class Bridge @JvmOverloads internal constructor(
*/
private var failureInstant: Instant? = null

/**
* @return the region of this [Bridge].
*/
/** @return the region of this [Bridge]. */
var region: String? = null
private set

/**
* @return the relay ID advertised by the bridge, or `null` if
* none was advertised.
*/
/** @return the relay ID advertised by the bridge, or `null` if none was advertised. */
var relayId: String? = null
private set

/**
* If this [Bridge] has been removed from the list of bridges. Once removed, the metrics specific to this instance
* are cleared and no longer emitted. If the bridge re-connects, a new [Bridge] instance will be created.
*/
val removed = AtomicBoolean(false)

/**
* The last instant at which we detected, based on restart requests from endpoints, that this bridge is failing ICE
*/
private var lastIceFailed = Instant.MIN
private val failingIce: Boolean
get() = Duration.between(lastIceFailed, clock.instant()) < config.iceFailureDetection.timeout

private val logger: Logger = LoggerImpl(Bridge::class.java.name)

init {
Expand Down Expand Up @@ -237,37 +237,75 @@ class Bridge @JvmOverloads internal constructor(
return compare(this, other)
}

/**
* Notifies this [Bridge] that it was used for a new endpoint.
*/
/** Notifies this [Bridge] that it was used for a new endpoint. */
fun endpointAdded() {
newEndpointsRate.update(1)
endpoints.incrementAndGet()
if (!removed.get()) {
BridgeMetrics.endpoints.set(endpoints.get().toLong(), listOf(jid.resourceOrEmpty.toString()))
}
}

/**
* Returns the net number of video channels recently allocated or removed
* from this bridge.
*/
fun endpointRemoved() = endpointsRemoved(1)
fun endpointsRemoved(count: Int) {
endpoints.addAndGet(-count)
if (!removed.get()) {
BridgeMetrics.endpoints.set(endpoints.get().toLong(), listOf(jid.resourceOrEmpty.toString()))
}
if (endpoints.get() < 0) {
logger.error("Removed more endpoints than were allocated. Resetting to 0.", Throwable())
endpoints.set(0)
}
}
internal fun markRemoved() {
if (removed.compareAndSet(false, true)) {
BridgeMetrics.restartRequestsMetric.remove(listOf(jid.resourceOrEmpty.toString()))
BridgeMetrics.endpoints.remove(listOf(jid.resourceOrEmpty.toString()))
}
}
internal fun updateMetrics() {
if (!removed.get()) {
BridgeMetrics.failingIce.set(failingIce, listOf(jid.resourceOrEmpty.toString()))
}
}

fun endpointRequestedRestart() {
endpointRestartRequestRate.update(1)
if (!removed.get()) {
BridgeMetrics.restartRequestsMetric.inc(listOf(jid.resourceOrEmpty.toString()))
}

if (config.iceFailureDetection.enabled) {
val restartCount = endpointRestartRequestRate.getAccumulatedCount()
val endpoints = endpoints.get()
if (endpoints >= config.iceFailureDetection.minEndpoints &&
restartCount > endpoints * config.iceFailureDetection.threshold
) {
// Reset the timeout regardless of the previous state, but only log if the state changed.
if (!failingIce) {
logger.info("Detected an ICE failing state.")
}
lastIceFailed = clock.instant()
}
}
}

/** Returns the net number of video channels recently allocated or removed from this bridge. */
private val recentlyAddedEndpointCount: Long
get() = newEndpointsRate.getAccumulatedCount()

/**
* The version of this bridge (with embedded release ID, if available).
*/
/** The version of this bridge (with embedded release ID, if available). */
val fullVersion: String?
get() = if (version != null && releaseId != null) "$version-$releaseId" else version

/**
* {@inheritDoc}
*/
override fun toString(): String {
return String.format(
"Bridge[jid=%s, version=%s, relayId=%s, region=%s, stress=%.2f]",
"Bridge[jid=%s, version=%s, relayId=%s, region=%s, correctedStress=%.2f]",
jid.toString(),
fullVersion,
relayId,
region,
stress
correctedStress
)
}

Expand All @@ -276,34 +314,37 @@ class Bridge @JvmOverloads internal constructor(
* can exceed 1).
* @return this bridge's stress level
*/
val stress: Double
get() =
// While a stress of 1 indicates a bridge is fully loaded, we allow
// larger values to keep sorting correctly.
lastReportedStressLevel +
recentlyAddedEndpointCount.coerceAtLeast(0) * averageParticipantStress
val correctedStress: Double
get() {
// Correct for recently added endpoints.
// While a stress of 1 indicates a bridge is fully loaded, we allow larger values to keep sorting correctly.
val s = lastReportedStressLevel + recentlyAddedEndpointCount.coerceAtLeast(0) * averageParticipantStress

/**
* @return true if the stress of the bridge is greater-than-or-equal to the threshold.
*/
// Correct for failing ICE.
return if (failingIce) max(s, config.stressThreshold + 0.01) else s
}

/** @return true if the stress of the bridge is greater-than-or-equal to the threshold. */
val isOverloaded: Boolean
get() = stress >= config.stressThreshold
get() = correctedStress >= config.stressThreshold

val debugState: OrderedJsonObject
get() {
val o = OrderedJsonObject()
o["version"] = version.toString()
o["release"] = releaseId.toString()
o["stress"] = stress
o["operational"] = isOperational
o["region"] = region.toString()
o["drain"] = isDraining
o["graceful-shutdown"] = isInGracefulShutdown
o["shutting-down"] = isShuttingDown
o["overloaded"] = isOverloaded
o["relay-id"] = relayId.toString()
o["healthy"] = isHealthy
return o
get() = OrderedJsonObject().apply {
this["corrected-stress"] = correctedStress
this["drain"] = isDraining
this["endpoints"] = endpoints.get()
this["endpoint-restart-requests"] = endpointRestartRequestRate.getAccumulatedCount()
this["failing-ice"] = failingIce
this["graceful-shutdown"] = isInGracefulShutdown
this["healthy"] = isHealthy
this["operational"] = isOperational
this["overloaded"] = isOverloaded
this["region"] = region.toString()
this["relay-id"] = relayId.toString()
this["release"] = releaseId.toString()
this["shutting-down"] = isShuttingDown
this["stress"] = lastReportedStressLevel
this["version"] = version.toString()
}

companion object {
Expand All @@ -327,7 +368,7 @@ class Bridge @JvmOverloads internal constructor(
return if (myPriority != otherPriority) {
myPriority - otherPriority
} else {
b1.stress.compareTo(b2.stress)
b1.correctedStress.compareTo(b2.correctedStress)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,34 @@ class BridgeConfig private constructor() {
fun getRegionGroup(region: String?): Set<String> =
if (region == null) emptySet() else regionGroups[region] ?: setOf(region)

val iceFailureDetection = IceFailureDetectionConfig()

companion object {
const val BASE = "jicofo.bridge"

@JvmField
val config = BridgeConfig()
}
}

class IceFailureDetectionConfig {
val enabled: Boolean by config {
"$BASE.enabled".from(JitsiConfig.newConfig)
}
val interval: Duration by config {
"$BASE.interval".from(JitsiConfig.newConfig)
}
val minEndpoints: Int by config {
"$BASE.min-endpoints".from(JitsiConfig.newConfig)
}
val threshold: Double by config {
"$BASE.threshold".from(JitsiConfig.newConfig)
}
val timeout: Duration by config {
"$BASE.timeout".from(JitsiConfig.newConfig)
}

companion object {
const val BASE = "jicofo.bridge.ice-failure-detection"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.jitsi.jicofo.bridge

import org.jitsi.jicofo.metrics.JicofoMetricsContainer.Companion.instance as metricsContainer
class BridgeMetrics {
companion object {
/** Total number of participants that requested a restart for a specific bridge. */
val restartRequestsMetric = metricsContainer.registerCounter(
"bridge_restart_requests_total",
"Total number of requests to restart a bridge",
labelNames = listOf("jvb")
)
val endpoints = metricsContainer.registerLongGauge(
"bridge_endpoints",
"The number of endpoints on a bridge.",
labelNames = listOf("jvb")
)
val failingIce = metricsContainer.registerBooleanMetric(
"bridge_failing_ice",
"Whether a bridge is currently in the failing ICE state.",
labelNames = listOf("jvb")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class BridgeSelector @JvmOverloads constructor(
logger.warn("Lost a bridge: $bridgeJid")
lostBridges.inc()
}
it.markRemoved()
bridgeCount.dec()
eventEmitter.fireEvent { bridgeRemoved(it) }
}
Expand Down Expand Up @@ -214,10 +215,7 @@ class BridgeSelector @JvmOverloads constructor(
conferenceBridges,
participantProperties,
OctoConfig.config.enabled
).also {
// The bridge was selected for an endpoint, increment its counter.
it?.endpointAdded()
}
)
}

val stats: JSONObject
Expand Down Expand Up @@ -245,6 +243,7 @@ class BridgeSelector @JvmOverloads constructor(
inShutdownBridgeCountMetric.set(bridges.values.count { it.isInGracefulShutdown }.toLong())
operationalBridgeCountMetric.set(bridges.values.count { it.isOperational }.toLong())
bridgeVersionCount.set(bridges.values.map { it.fullVersion }.toSet().size.toLong())
bridges.values.forEach { it.updateMetrics() }
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ class VisitorTopologyStrategy : TopologySelectionStrategy() {
cascade.getDistanceFrom(it) { node -> !node.visitor }
}

val sortedNodes = nodesWithDistance.entries.sortedWith(compareBy({ it.value }, { it.key.bridge.stress }))
.map { it.key }
val sortedNodes = nodesWithDistance.entries.sortedWith(
compareBy({ it.value }, { it.key.bridge.correctedStress })
).map { it.key }

/* TODO: this logic looks a lot like bridge selection. Do we want to try to share logic with that code? */
val nonOverloadedInRegion = sortedNodes.filter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ interface ColibriSessionManager {
suppressLocalBridgeUpdate: Boolean = false
)

fun getBridgeSessionId(participantId: String): String?
fun getBridgeSessionId(participantId: String): Pair<Bridge?, String?>

/**
* Stop using [bridge], expiring all endpoints on it (e.g. because it was detected to have failed).
Expand Down
Loading