Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove backward compat for source names. #2087

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions jvb/src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -718,15 +718,13 @@ public AbstractEndpoint findSourceOwner(@NotNull String sourceName)
* @param iceControlling {@code true} if the ICE agent of this endpoint's
* transport will be initialized to serve as a controlling ICE agent;
* otherwise, {@code false}
* @param sourceNames whether this endpoint signaled the source names support.
* @param doSsrcRewriting whether this endpoint signaled SSRC rewriting support.
* @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt>
*/
@NotNull
public Endpoint createLocalEndpoint(
String id,
boolean iceControlling,
boolean sourceNames,
boolean doSsrcRewriting,
boolean visitor,
boolean privateAddresses)
Expand All @@ -738,7 +736,7 @@ public Endpoint createLocalEndpoint(
}

final Endpoint endpoint = new Endpoint(
id, this, logger, iceControlling, sourceNames, doSsrcRewriting, visitor, privateAddresses);
id, this, logger, iceControlling, doSsrcRewriting, visitor, privateAddresses);
videobridge.localEndpointCreated(visitor);

subscribeToEndpointEvents(endpoint);
Expand Down
61 changes: 9 additions & 52 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import org.jitsi.videobridge.datachannel.DataChannelStack
import org.jitsi.videobridge.datachannel.protocol.DataChannelPacket
import org.jitsi.videobridge.datachannel.protocol.DataChannelProtocolConstants
import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.ForwardedEndpointsMessage
import org.jitsi.videobridge.message.ForwardedSourcesMessage
import org.jitsi.videobridge.message.ReceiverVideoConstraintsMessage
import org.jitsi.videobridge.message.SenderSourceConstraintsMessage
Expand Down Expand Up @@ -107,7 +106,6 @@ class Endpoint @JvmOverloads constructor(
* as a controlling ICE agent, false otherwise
*/
iceControlling: Boolean,
private val isUsingSourceNames: Boolean,
private val doSsrcRewriting: Boolean,
/**
* Whether this endpoint is in "visitor" mode, i.e. should be invisible to other endpoints.
Expand Down Expand Up @@ -205,9 +203,6 @@ class Endpoint @JvmOverloads constructor(
// Intentional no-op
}

override fun forwardedEndpointsChanged(forwardedEndpoints: Set<String>) =
sendForwardedEndpointsMessage(forwardedEndpoints)

override fun forwardedSourcesChanged(forwardedSources: Set<String>) {
sendForwardedSourcesMessage(forwardedSources)
}
Expand All @@ -221,8 +216,7 @@ class Endpoint @JvmOverloads constructor(
},
{ getOrderedEndpoints() },
diagnosticContext,
logger,
isUsingSourceNames,
logger
)

/** Whether any sources are suspended from being sent to this endpoint because of BWE. */
Expand Down Expand Up @@ -329,7 +323,7 @@ class Endpoint @JvmOverloads constructor(
conference.videobridge.statistics.totalVisitors.inc()
}

logger.info("Created new endpoint isUsingSourceNames=$isUsingSourceNames, iceControlling=$iceControlling")
logger.info("Created new endpoint, iceControlling=$iceControlling")
}

override var mediaSources: Array<MediaSourceDesc>
Expand Down Expand Up @@ -514,11 +508,7 @@ class Endpoint @JvmOverloads constructor(
// TODO: this should be part of an EndpointMessageTransport.EventHandler interface
fun endpointMessageTransportConnected() {
sendAllVideoConstraints()
if (isUsingSourceNames) {
sendForwardedSourcesMessage(bitrateController.forwardedSources)
} else {
sendForwardedEndpointsMessage(bitrateController.forwardedEndpoints)
}
sendForwardedSourcesMessage(bitrateController.forwardedSources)
videoSsrcs.sendAllMappings()
audioSsrcs.sendAllMappings()
}
Expand Down Expand Up @@ -585,17 +575,10 @@ class Endpoint @JvmOverloads constructor(
"Suppressing sending a SenderVideoConstraints message, endpoint has no such source: $sourceName"
}
} else {
if (isUsingSourceNames) {
val senderSourceConstraintsMessage =
SenderSourceConstraintsMessage(sourceName, maxVideoConstraints.maxHeight)
logger.cdebug { "Sender constraints changed: ${senderSourceConstraintsMessage.toJson()}" }
sendMessage(senderSourceConstraintsMessage)
} else {
maxReceiverVideoConstraints[sourceName]?.let {
sendVideoConstraints(it)
}
?: logger.error("No max receiver constraints mapping found for: $sourceName")
}
val senderSourceConstraintsMessage =
SenderSourceConstraintsMessage(sourceName, maxVideoConstraints.maxHeight)
logger.cdebug { "Sender constraints changed: ${senderSourceConstraintsMessage.toJson()}" }
sendMessage(senderSourceConstraintsMessage)
}
}

Expand Down Expand Up @@ -726,39 +709,13 @@ class Endpoint @JvmOverloads constructor(
return true
}

/**
* Sends a message to this endpoint in order to notify it that the set of endpoints for which the bridge
* is sending video has changed.
*
* @param forwardedEndpoints the collection of forwarded endpoints.
*/
@Deprecated("", ReplaceWith("sendForwardedSourcesMessage"), DeprecationLevel.WARNING)
fun sendForwardedEndpointsMessage(forwardedEndpoints: Collection<String>) {
if (isUsingSourceNames) {
return
}

val msg = ForwardedEndpointsMessage(forwardedEndpoints)
TaskPools.IO_POOL.execute {
try {
sendMessage(msg)
} catch (t: Throwable) {
logger.warn("Failed to send message:", t)
}
}
}

/**
* Sends a message to this endpoint in order to notify it that the set of media sources for which the bridge
* is sending video has changed.
*
* @param forwardedSources the collection of forwarded media sources (by name).
*/
fun sendForwardedSourcesMessage(forwardedSources: Collection<String>) {
if (!isUsingSourceNames) {
return
}

val msg = ForwardedSourcesMessage(forwardedSources)
TaskPools.IO_POOL.execute {
try {
Expand Down Expand Up @@ -851,9 +808,9 @@ class Endpoint @JvmOverloads constructor(
fun isOversending(): Boolean = bitrateController.isOversending()

/**
* Returns how many endpoints this Endpoint is currently forwarding video for
* Returns how many video sources are currently forwarding to this endpoint.
*/
fun numForwardedEndpoints(): Int = bitrateController.numForwardedEndpoints()
fun numForwardedSources(): Int = bitrateController.numForwardedSources()

fun setBandwidthAllocationSettings(message: ReceiverVideoConstraintsMessage) {
initialReceiverConstraintsReceived = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.jitsi.utils.logging2.LoggerImpl
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.videobridge.cc.config.BitrateControllerConfig.Companion.config
import org.jitsi.videobridge.message.ReceiverVideoConstraintsMessage
import org.jitsi.videobridge.util.endpointIdToSourceName

/**
* This class encapsulates all of the client-controlled settings for bandwidth allocation.
Expand Down Expand Up @@ -60,17 +59,10 @@ data class AllocationSettings @JvmOverloads constructor(
* the overall state changed.
*/
internal class AllocationSettingsWrapper(
private val useSourceNames: Boolean,
parentLogger: Logger = LoggerImpl(AllocationSettingsWrapper::class.java.name)
) {
private val logger = createChildLogger(parentLogger)

/**
* The last selected endpoints set signaled by the receiving endpoint.
*/
@Deprecated("", ReplaceWith("selectedSources"), DeprecationLevel.WARNING)
private var selectedEndpoints = emptyList<String>()

/**
* The last selected sources set signaled by the receiving endpoint.
*/
Expand All @@ -84,9 +76,6 @@ internal class AllocationSettingsWrapper(

private var assumedBandwidthBps: Long = -1

@Deprecated("", ReplaceWith("onStageSources"), DeprecationLevel.WARNING)
private var onStageEndpoints: List<String> = emptyList()

private var onStageSources: List<String> = emptyList()

private var allocationSettings = create()
Expand All @@ -111,35 +100,16 @@ internal class AllocationSettingsWrapper(
changed = true
}
}
if (useSourceNames) {
message.selectedSources?.let {
if (selectedSources != it) {
selectedSources = it
changed = true
}
}
message.onStageSources?.let {
if (onStageSources != it) {
onStageSources = it
changed = true
}
}
} else {
message.selectedEndpoints?.let {
logger.warn("Setting deprecated selectedEndpoints=$it")
val newSelectedSources = it.map { endpoint -> endpointIdToSourceName(endpoint) }
if (selectedSources != newSelectedSources) {
selectedSources = newSelectedSources
changed = true
}
message.selectedSources?.let {
if (selectedSources != it) {
selectedSources = it
changed = true
}
message.onStageEndpoints?.let {
logger.warn("Setting deprecated onStateEndpoints=$it")
val newOnStageSources = it.map { endpoint -> endpointIdToSourceName(endpoint) }
if (onStageSources != newOnStageSources) {
onStageSources = newOnStageSources
changed = true
}
}
message.onStageSources?.let {
if (onStageSources != it) {
onStageSources = it
changed = true
}
}
message.defaultConstraints?.let {
Expand All @@ -149,19 +119,8 @@ internal class AllocationSettingsWrapper(
}
}
message.constraints?.let {
var newConstraints = it

// Convert endpoint IDs to source names
if (!useSourceNames) {
newConstraints = HashMap(it.size)
it.entries.forEach {
entry ->
newConstraints[endpointIdToSourceName(entry.key)] = entry.value
}
}

if (this.videoConstraints != newConstraints) {
this.videoConstraints = newConstraints
if (this.videoConstraints != it) {
this.videoConstraints = it
changed = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,12 @@ class BitrateController<T : MediaSourceContainer> @JvmOverloads constructor(
endpointsSupplier: Supplier<List<T>>,
private val diagnosticContext: DiagnosticContext,
parentLogger: Logger,
private val useSourceNames: Boolean,
private val clock: Clock = Clock.systemUTC()
) {
val eventEmitter = SyncEventEmitter<EventHandler>()

private val bitrateAllocatorEventHandler = BitrateAllocatorEventHandler()

/**
* Keep track of the "forwarded" endpoints, i.e. the endpoints for which we are forwarding *some* layer.
*/
@Deprecated("", ReplaceWith("forwardedSources"), DeprecationLevel.WARNING)
var forwardedEndpoints: Set<String> = emptySet()
private set

/**
* Keep track of the "forwarded" sources, i.e. the media sources for which we are forwarding *some* layer.
*/
Expand Down Expand Up @@ -99,7 +91,7 @@ class BitrateController<T : MediaSourceContainer> @JvmOverloads constructor(
)
fun hasSuspendedSources() = bandwidthAllocator.allocation.hasSuspendedSources

private val allocationSettingsWrapper = AllocationSettingsWrapper(useSourceNames, parentLogger)
private val allocationSettingsWrapper = AllocationSettingsWrapper(parentLogger)
val allocationSettings
get() = allocationSettingsWrapper.get()

Expand Down Expand Up @@ -130,7 +122,7 @@ class BitrateController<T : MediaSourceContainer> @JvmOverloads constructor(
/**
* Return the number of endpoints whose streams are currently being forwarded.
*/
fun numForwardedEndpoints(): Int = forwardedEndpoints.size
fun numForwardedSources(): Int = forwardedSources.size
fun getTotalOversendingTime(): Duration = oversendingTimeTracker.totalTimeOn()
fun isOversending() = oversendingTimeTracker.state
fun bandwidthChanged(newBandwidthBps: Long) {
Expand All @@ -154,7 +146,6 @@ class BitrateController<T : MediaSourceContainer> @JvmOverloads constructor(
get() = JSONObject().apply {
put("bitrate_allocator", bandwidthAllocator.debugState)
put("packet_handler", packetHandler.debugState)
put("forwardedEndpoints", forwardedEndpoints.toString())
put("forwardedSources", forwardedSources.toString())
put("oversending", oversendingTimeTracker.state)
put("total_oversending_time_secs", oversendingTimeTracker.totalTimeOn().seconds)
Expand Down Expand Up @@ -256,7 +247,6 @@ class BitrateController<T : MediaSourceContainer> @JvmOverloads constructor(
}

interface EventHandler {
fun forwardedEndpointsChanged(forwardedEndpoints: Set<String>)
fun forwardedSourcesChanged(forwardedSources: Set<String>)
fun effectiveVideoConstraintsChanged(
oldEffectiveConstraints: EffectiveConstraintsMap,
Expand All @@ -276,18 +266,10 @@ class BitrateController<T : MediaSourceContainer> @JvmOverloads constructor(
// Actually implement the allocation (configure the packet filter to forward the chosen target layers).
packetHandler.allocationChanged(allocation)

if (useSourceNames) {
val newForwardedSources = allocation.forwardedSources
if (forwardedSources != newForwardedSources) {
forwardedSources = newForwardedSources
eventEmitter.fireEvent { forwardedSourcesChanged(newForwardedSources) }
}
} else {
val newForwardedEndpoints = allocation.forwardedEndpoints
if (forwardedEndpoints != newForwardedEndpoints) {
forwardedEndpoints = newForwardedEndpoints
eventEmitter.fireEvent { forwardedEndpointsChanged(newForwardedEndpoints) }
}
val newForwardedSources = allocation.forwardedSources
if (forwardedSources != newForwardedSources) {
forwardedSources = newForwardedSources
eventEmitter.fireEvent { forwardedSourcesChanged(newForwardedSources) }
}

oversendingTimeTracker.setState(allocation.oversending)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,15 @@ class Colibri2ConferenceHandler(
Condition.bad_request,
"Attempt to create endpoint ${c2endpoint.id} with no <transport>"
)
val sourceNames = c2endpoint.hasCapability(Capability.CAP_SOURCE_NAME_SUPPORT)
val ssrcRewriting = sourceNames && c2endpoint.hasCapability(Capability.CAP_SSRC_REWRITING_SUPPORT)
if (!c2endpoint.hasCapability(Capability.CAP_SOURCE_NAME_SUPPORT)) {
throw IqProcessingException(Condition.bad_request, "Source name support is mandatory.")
}

val ssrcRewriting = c2endpoint.hasCapability(Capability.CAP_SSRC_REWRITING_SUPPORT)
val privateAddresses = c2endpoint.hasCapability(Capability.CAP_PRIVATE_ADDRESS_CONNECTIVITY)
conference.createLocalEndpoint(
c2endpoint.id,
transport.iceControlling,
sourceNames,
ssrcRewriting,
c2endpoint.mucRole == MUCRole.visitor,
privateAddresses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LastNReducer(
.asSequence()
.filterIsInstance<Endpoint>()
.map {
it.numForwardedEndpoints()
it.numForwardedSources()
}
.maxOrNull()
}
Expand Down
Loading
Loading