Skip to content

Commit

Permalink
Use the new JVB stress level stat (#592)
Browse files Browse the repository at this point in the history
* use jvb stress level stat

* fall back to packet rate for stress calculation

* address PR feedback

* rename var/config prop

* remove redundant constants

* fix test comment

* fix log, formatting
  • Loading branch information
bbaldino authored Sep 15, 2020
1 parent 15bdbbf commit a382d32
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 71 deletions.
69 changes: 51 additions & 18 deletions src/main/java/org/jitsi/jicofo/bridge/Bridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,6 @@ public class Bridge
private static final ColibriStatsExtension EMPTY_STATS
= new ColibriStatsExtension();

/**
* We assume that each recently added participant contributes this much
* to the bridge's packet rate.
*/
private static final int AVG_PARTICIPANT_PACKET_RATE_PPS = config.averageParticipantPacketRatePps();

/**
* We assume this is the maximum packet rate that a bridge can handle.
*/
public static final int MAX_TOTAL_PACKET_RATE_PPS = config.maxBridgePacketRatePps();

/**
* This is static for the purposes of tests.
* TODO: just use the config and port the tests.
Expand All @@ -81,6 +70,17 @@ static void setFailureResetThreshold(long newValue)
*/
private int lastReportedPacketRatePps = 0;

/**
* The last report stress level
*/
private double lastReportedStressLevel = 0.0;

/**
* For older bridges which don't support reporting their stress level we'll fall back
* to calculating the stress manually via the packet rate.
*/
private boolean usePacketRateStatForStress = true;

/**
* Holds bridge version (if known - not all bridge version are capable of
* reporting it).
Expand Down Expand Up @@ -138,14 +138,28 @@ void setStats(ColibriStatsExtension stats)
}
stats = this.stats;

double stressLevel;
String stressLevelStr = stats.getValueAsString("stress_level");
if (stressLevelStr != null)
{
try
{
stressLevel = Double.parseDouble(stressLevelStr);
lastReportedStressLevel = stressLevel;
usePacketRateStatForStress = false;
}
catch (Exception ignored)
{
}
}
Integer packetRateDown = null;
Integer packetRateUp = null;
try
{
packetRateDown = stats.getValueAsInt(PACKET_RATE_DOWNLOAD);
packetRateUp = stats.getValueAsInt(PACKET_RATE_UPLOAD);
}
catch (NumberFormatException nfe)
catch (NumberFormatException ignored)
{
}

Expand Down Expand Up @@ -284,22 +298,41 @@ public String toString()
getStress());
}

/**
* Gets the "stress" of the bridge, represented as a double between 0 and 1 (though technically the value
* can exceed 1).
* @return this bridge's stress level
*/
public double getStress()
{
if (usePacketRateStatForStress)
{
return getStressFromPacketRate();
}
// While a stress of 1 indicates a bridge is fully loaded, we allow
// larger values to keep sorting correctly.
return (lastReportedStressLevel +
Math.max(0, getRecentlyAddedEndpointCount()) * config.getAverageParticipantStress());
}

/**
* Returns the "stress" of the bridge. The stress is computed based on the
* total packet rate reported by the bridge and the video stream diff
* estimation since the last update from the bridge.
* estimation since the last update from the bridge. Note that this is techincally
* deprecated and only exists for backwards compatibility with bridges who don't
* yet support reporting their stress level directly.
*
* @return the sum of the last total reported packet rate (in pps) and an
* estimation of the packet rate of the streams that we estimate that the bridge
* hasn't reported to Jicofo yet. The estimation is the product of the
* number of unreported streams and a constant C (which we set to 500 pps).
*/
public double getStress()
private double getStressFromPacketRate()
{
double stress =
(lastReportedPacketRatePps
+ Math.max(0, getRecentlyAddedEndpointCount()) * AVG_PARTICIPANT_PACKET_RATE_PPS)
/ (double) MAX_TOTAL_PACKET_RATE_PPS;
+ Math.max(0, getRecentlyAddedEndpointCount()) * config.averageParticipantPacketRatePps())
/ (double) config.maxBridgePacketRatePps();
// While a stress of 1 indicates a bridge is fully loaded, we allow
// larger values to keep sorting correctly.
return stress;
Expand All @@ -313,9 +346,9 @@ public boolean isOverloaded()
return getStress() >= config.stressThreshold();
}

public int getLastReportedPacketRatePps()
public double getLastReportedStressLevel()
{
return lastReportedPacketRatePps;
return lastReportedStressLevel;
}

public int getOctoVersion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Bridge select(
if (bridge != null)
{
logger.info("Selected initial bridge " + bridge
+ " with packetRate=" + bridge.getLastReportedPacketRatePps()
+ " with stress=" + bridge.getLastReportedStressLevel()
+ " for participantRegion=" + participantRegion);
}
else
Expand Down Expand Up @@ -133,7 +133,7 @@ public Bridge select(
if (bridge != null)
{
logger.info("Selected bridge " + bridge
+ " with packetRate=" + bridge.getLastReportedPacketRatePps()
+ " with stress=" + bridge.getLastReportedStressLevel()
+ " for participantRegion=" + participantRegion);
}
else
Expand Down
8 changes: 7 additions & 1 deletion src/main/kotlin/org/jitsi/jicofo/bridge/BridgeConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@ class BridgeConfig {

val averageParticipantPacketRatePps: Int by config {
"org.jitsi.jicofo.BridgeSelector.AVG_PARTICIPANT_PACKET_RATE".from(JitsiConfig.legacyConfig)
"$BASE.average-participant-packet-rate-pps".from(JitsiConfig.newConfig)
"$BASE.average-participant-packet-rate-pps"
.from(JitsiConfig.newConfig).softDeprecated("use $BASE.average-participant-stress")
}
fun averageParticipantPacketRatePps() = averageParticipantPacketRatePps

val averageParticipantStress: Double by config {
"$BASE.average-participant-stress".from(JitsiConfig.newConfig)
}
fun averageParticipantStress() = averageParticipantStress

val stressThreshold: Double by config { "$BASE.stress-threshold".from(JitsiConfig.newConfig) }
fun stressThreshold() = stressThreshold

Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ jicofo {
max-bridge-packet-rate = 50000
// The assumed average packet rate per participant.
average-participant-packet-rate-pps = 500
// The assumed average stress per participant
average-participant-stress = 0.01
// The stress level above which a bridge is considered overstressed.
stress-threshold = 0.8
// The amount of to wait before retrying using a failed bridge.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class BridgeSelectionStrategyTest
{
private static Random RANDOM = new Random(23571113);
private static final Random RANDOM = new Random(23571113);

private static Bridge createBridge(String region, double stress)
{
Expand All @@ -49,17 +49,14 @@ private static Bridge createBridge(String region, double stress, String jid)

private static ColibriStatsExtension createJvbStats(String region, double stress)
{
// Divide by two because we use half of it for upload, half for download
int packetRate = (int) (Bridge.MAX_TOTAL_PACKET_RATE_PPS * stress / 2);

ColibriStatsExtension statsExtension = new ColibriStatsExtension();

statsExtension.addStat(
new ColibriStatsExtension.Stat(
PACKET_RATE_DOWNLOAD, packetRate));
statsExtension.addStat(
new ColibriStatsExtension.Stat(
PACKET_RATE_UPLOAD, packetRate));
new ColibriStatsExtension.Stat(
"stress_level",
stress
)
);

if (region != null)
{
Expand Down
79 changes: 38 additions & 41 deletions src/test/java/org/jitsi/jicofo/bridge/BridgeSelectorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ public void selectorTest()
jvb2.setIsOperational(true);
jvb3.setIsOperational(true);

// Jvb 1 and 3 are occupied by some conferences, 2 is free
jvb1.setStats(createJvbStats(10));
jvb2.setStats(createJvbStats(23));
// Jvb 1 and 2 are occupied by some conferences, 3 is free
jvb1.setStats(createJvbStats(.1));
jvb2.setStats(createJvbStats(.23));
jvb3.setStats(createJvbStats(0));

assertEquals(jvb3Jid, selector.selectBridge(conference).getJid());

// Now Jvb 3 gets occupied the most
jvb3.setStats(createJvbStats(300));
jvb3.setStats(createJvbStats(.3));

assertEquals(jvb1Jid, selector.selectBridge(conference).getJid());

Expand All @@ -152,17 +152,17 @@ public void selectorTest()
jvb2.setIsOperational(true);
jvb3.setIsOperational(true);

jvb1.setStats(createJvbStats(1));
jvb1.setStats(createJvbStats(.01));
jvb2.setStats(createJvbStats(0));
jvb3.setStats(createJvbStats(0));

// JVB 1 should not be in front
assertNotEquals(jvb1Jid, selector.selectBridge(conference).getJid());

// JVB 2 least occupied
jvb1.setStats(createJvbStats(1));
jvb1.setStats(createJvbStats(.01));
jvb2.setStats(createJvbStats(0));
jvb3.setStats(createJvbStats(1));
jvb3.setStats(createJvbStats(.01));

assertEquals(jvb2Jid, selector.selectBridge(conference).getJid());

Expand Down Expand Up @@ -199,7 +199,7 @@ public void notOperationalThresholdTest()
boolean isTestNode = idx == testedIdx;

// Test node has 0 load...
bridges[idx].setStats(createJvbStats(isTestNode ? 0 : 100));
bridges[idx].setStats(createJvbStats(isTestNode ? 0 : .1));

// ... and is not operational
bridges[idx].setIsOperational(!isTestNode);
Expand All @@ -224,43 +224,33 @@ public void notOperationalThresholdTest()
Thread.sleep(150);
// Test node should recover
assertEquals(
bridges[testedIdx].getJid(),
selector.selectBridge(new MockJitsiMeetConference()).getJid());
bridges[testedIdx].getJid(),
selector.selectBridge(new MockJitsiMeetConference()).getJid()
);
}

Bridge.setFailureResetThreshold(BridgeConfig.config.failureResetThreshold().toMillis());
}

private ColibriStatsExtension createJvbStats(int bitrate)
private ColibriStatsExtension createJvbStats(double stress)
{
return createJvbStats(bitrate, null);
return createJvbStats(stress, null);
}

private ColibriStatsExtension createJvbStats(int bitrate, String region)
private ColibriStatsExtension createJvbStats(double stress, String region)
{
ColibriStatsExtension statsExtension = new ColibriStatsExtension();

statsExtension.addStat(
new ColibriStatsExtension.Stat(
BITRATE_DOWNLOAD, bitrate));
statsExtension.addStat(
new ColibriStatsExtension.Stat(
BITRATE_UPLOAD, bitrate));
statsExtension.addStat(
new ColibriStatsExtension.Stat(
PACKET_RATE_DOWNLOAD, bitrate));
statsExtension.addStat(
new ColibriStatsExtension.Stat(
PACKET_RATE_UPLOAD, bitrate));
"stress_level", stress
)
);

if (region != null)
{
statsExtension.addStat(
new ColibriStatsExtension.Stat(
REGION, region));
statsExtension.addStat(
new ColibriStatsExtension.Stat(
RELAY_ID, region));
statsExtension.addStat(new ColibriStatsExtension.Stat(REGION, region));
statsExtension.addStat(new ColibriStatsExtension.Stat(RELAY_ID, region));
}

return statsExtension;
Expand Down Expand Up @@ -303,37 +293,44 @@ public void testRegionBasedSelection()
// Or a bridge in the local region otherwise
assertEquals(
localBridge,
strategy.select(allBridges, conferenceBridges, "invalid region", true));
strategy.select(allBridges, conferenceBridges, "invalid region", true)
);
assertEquals(
localBridge,
strategy.select(allBridges, conferenceBridges, null, true));
strategy.select(allBridges, conferenceBridges, null, true)
);

conferenceBridges.put(bridge3, 1);
assertEquals(
bridge3,
strategy.select(allBridges, conferenceBridges, region3, true));
bridge3,
strategy.select(allBridges, conferenceBridges, region3, true)
);
assertEquals(
bridge2,
strategy.select(allBridges, conferenceBridges, region2, true));
bridge2,
strategy.select(allBridges, conferenceBridges, region2, true)
);
// A participant in an unknown region should be allocated on the existing
// conference bridge.
assertEquals(
bridge3,
strategy.select(allBridges, conferenceBridges, null, true));
bridge3,
strategy.select(allBridges, conferenceBridges, null, true)
);

conferenceBridges.put(bridge2, 1);
// A participant in an unknown region should be allocated on the least
// loaded (according to the order of 'allBridges') existing conference
// bridge.
assertEquals(
bridge2,
strategy.select(allBridges, conferenceBridges, null, true));
bridge2,
strategy.select(allBridges, conferenceBridges, null, true)
);
// A participant in a region with no bridges should also be allocated
// on the least loaded (according to the order of 'allBridges') existing
// conference bridge.
assertEquals(
bridge2,
strategy.select(allBridges, conferenceBridges, "invalid region", true));
bridge2,
strategy.select(allBridges, conferenceBridges, "invalid region", true)
);
}
}

0 comments on commit a382d32

Please sign in to comment.