Skip to content

Commit

Permalink
feat: add support for session-terminate (#562)
Browse files Browse the repository at this point in the history
Implements 'session-terminate' which allows the clients to end the session.
If there's the restart attribute set to true in the <bridge-session> extension,
then Jicofo will start a new session after tearing down the existing one.

Adds rate limiting on the restart requests to prevent abuse and/or client bugs.
There must be at least 10 second gap between the requests and no more than
3 requests are allowed in the last minute.
  • Loading branch information
paweldomas authored Jul 13, 2020
1 parent 902880b commit ef04a4e
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jitsi-xmpp-extensions</artifactId>
<version>1.0-6-g009420d</version>
<version>1.0-13-ga40f9c3</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public XMPPError onSessionAccept(JingleSession jingleSession,
return null;
}

@Override
public XMPPError onSessionTerminate(JingleSession jingleSession, JingleIQ iq)
{
logger.warn("Ignored Jingle 'session-terminate'");

return null;
}

@Override
public XMPPError onSessionInfo(JingleSession session, JingleIQ iq)
{
Expand Down
110 changes: 96 additions & 14 deletions src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,11 @@ protected void onMemberLeft(ChatRoomMember chatRoomMember)
= findParticipantForChatMember(chatRoomMember);
if (leftParticipant != null)
{
terminateParticipant(leftParticipant, Reason.GONE, null);
terminateParticipant(
leftParticipant,
Reason.GONE,
null,
/* no need to send session-terminate - gone */ false);
}
else
{
Expand All @@ -1308,31 +1312,38 @@ else if (participants.size() == 0)

private void terminateParticipant(Participant participant,
Reason reason,
String message)
String message,
boolean sendSessionTerminate)
{
logger.info(String.format(
"Terminating %s, reason: %s, send st: %s",
participant,
reason,
sendSessionTerminate));

BridgeSession bridgeSession;
synchronized (participantLock)
{
Jid contactAddress = participant.getMucJid();
if (participant.isSessionEstablished())
{
JingleSession jingleSession = participant.getJingleSession();
logger.info("Terminating: " + contactAddress);

jingle.terminateSession(jingleSession, reason, message);
jingle.terminateSession(jingleSession, reason, message, sendSessionTerminate);

removeSources(
jingleSession,
participant.getSourcesCopy(),
participant.getSourceGroupsCopy(),
false /* no JVB update - will expire */);

participant.setJingleSession(null);
}

bridgeSession = participant.terminateBridgeSession();

boolean removed = participants.remove(participant);
logger.info(
"Removed participant: " + removed + ", " + contactAddress);
logger.info("Removed participant: " + removed + ", " + contactAddress);
}

if (bridgeSession != null)
Expand Down Expand Up @@ -1523,7 +1534,7 @@ public XMPPError onSessionInfo(JingleSession session, JingleIQ iq)
String bridgeSessionId = bsPE != null ? bsPE.getId() : null;
BridgeSession bridgeSession = findBridgeSession(participant);

if (bridgeSession != null)
if (bridgeSession != null && bridgeSession.id.equals(bridgeSessionId))
{
logger.info(String.format(
"Received ICE failed notification from %s, session: %s",
Expand All @@ -1543,6 +1554,75 @@ public XMPPError onSessionInfo(JingleSession session, JingleIQ iq)
return null;
}

/**
* Handles 'session-terminate' received from the client.
*
* {@inheritDoc}
*/
@Override
public XMPPError onSessionTerminate(JingleSession session, JingleIQ iq)
{
Participant participant = findParticipantForJingleSession(session);

// FIXME: (duplicate) there's very similar logic in onSessionAccept/onSessionInfo
if (participant == null)
{
String errorMsg = "No participant for " + session.getAddress();

logger.warn("onSessionTerminate: " + errorMsg);

return XMPPError.from(
XMPPError.Condition.item_not_found, errorMsg).build();
}

BridgeSessionPacketExtension bsPE
= iq.getExtension(
BridgeSessionPacketExtension.ELEMENT_NAME,
BridgeSessionPacketExtension.NAMESPACE);
String bridgeSessionId = bsPE != null ? bsPE.getId() : null;
BridgeSession bridgeSession = findBridgeSession(participant);
boolean restartRequested = bsPE != null ? bsPE.isRestart() : false;

if (bridgeSession == null || !bridgeSession.id.equals(bridgeSessionId))
{
logger.info(String.format(
"Ignored session-terminate for invalid session: %s, bridge session ID: %s restart: %s",
participant,
bridgeSessionId,
restartRequested));

return XMPPError.from(XMPPError.Condition.item_not_found, "invalid bridge session ID").build();
}

logger.info(String.format(
"Received session-terminate from %s, session: %s, restart: %s",
participant,
bridgeSession,
restartRequested));

synchronized (participantLock)
{
terminateParticipant(participant, null, null, /* do not send session-terminate */ false);

if (restartRequested)
{
if (participant.incrementAndCheckRestartRequests())
{
participants.add(participant);
inviteParticipant(participant, false, hasToStartMuted(participant, false));
}
else
{
logger.warn(String.format("Rate limiting %s for restart requests", participant));

return XMPPError.from(XMPPError.Condition.resource_constraint, "rate-limited").build();
}
}
}

return null;
}

/**
* Advertises new sources across all conference participants by using
* 'source-add' Jingle notification.
Expand Down Expand Up @@ -2507,7 +2587,8 @@ void onInviteFailed(ParticipantChannelAllocator channelAllocator)
terminateParticipant(
channelAllocator.getParticipant(),
Reason.GENERAL_ERROR,
"jingle session failed");
"jingle session failed",
/* send session-terminate */ true);
}

/**
Expand Down Expand Up @@ -2712,19 +2793,20 @@ public void run()
if (participants.size() == 1)
{
Participant p = participants.get(0);
logger.info(
"Timing out single participant: " + p.getMucJid());

logger.info("Timing out single participant: " + p.getMucJid());

terminateParticipant(
p, Reason.EXPIRED, "Idle session timeout");
p,
Reason.EXPIRED,
"Idle session timeout",
/* send session-terminate */ true);

disposeConference();
}
else
{
logger.error(
"Should never execute if more than 1 participant? "
+ getRoomName());
logger.error("Should never execute if more than 1 participant? " + getRoomName());
}
singleParticipantTout = null;
}
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/jitsi/jicofo/LipSyncHack.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,10 @@ public void sendRemoveSourceIQ(
@Override
public void terminateSession(JingleSession session,
Reason reason,
String msg)
String msg,
boolean sendTerminate)
{
jingleImpl.terminateSession(session, reason, msg);
jingleImpl.terminateSession(session, reason, msg, sendTerminate);
}

/**
Expand Down
69 changes: 69 additions & 0 deletions src/main/java/org/jitsi/jicofo/Participant.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import org.jitsi.utils.logging.*;
import org.jxmpp.jid.*;

import java.time.*;
import java.util.*;

import static java.time.temporal.ChronoUnit.SECONDS;

/**
* Class represent Jitsi Meet conference participant. Stores information about
* Colibri channels allocated, Jingle session and media sources.
Expand Down Expand Up @@ -66,6 +69,17 @@ public static String getEndpointId(XmppChatMember chatRoomMember)
*/
private JitsiMeetConferenceImpl.BridgeSession bridgeSession;

/**
* The {@link Clock} used by this participant.
*/
private Clock clock = Clock.systemUTC();

/**
* The list stored the timestamp when the last restart requests have been received for this participant and is used
* for rate limiting. See {@link #incrementAndCheckRestartRequests()} for more details.
*/
private final Deque<Instant> restartRequests = new LinkedList<>();

/**
* MUC chat member of this participant.
*/
Expand Down Expand Up @@ -152,6 +166,15 @@ void setBridgeSession(JitsiMeetConferenceImpl.BridgeSession bridgeSession)
this.bridgeSession = bridgeSession;
}

/**
* Sets the new clock instance to be used by this participant. Meant for testing.
* @param newClock - the new {@link Clock}
*/
public void setClock(Clock newClock)
{
this.clock = newClock;
}

/**
* Sets {@link JingleSession} established with this peer.
* @param jingleSession the new Jingle session to be assigned to this peer.
Expand All @@ -170,6 +193,14 @@ public XmppChatMember getChatMember()
return roomMember;
}

/**
* @return {@link Clock} used by this participant instance.
*/
public Clock getClock()
{
return clock;
}

/**
* Returns <tt>true</tt> if this participant supports RTP bundle and RTCP
* mux.
Expand Down Expand Up @@ -205,6 +236,44 @@ public boolean hasRtxSupport()
return supportedFeatures.contains(DiscoveryUtil.FEATURE_RTX);
}

/**
* Rate limiting mechanism for session restart requests received from participants.
* The rules ar as follows:
* - must be at least 10 second gap between the requests
* - no more than 3 requests within the last minute
*
* @return {@code true} if it's okay to process the request, as in it doesn't violate the current rate limiting
* policy, or {@code false} if the request should be denied.
*/
public boolean incrementAndCheckRestartRequests()
{
final Instant now = Instant.now(clock);
Instant previousRequest = this.restartRequests.peekLast();

if (previousRequest == null)
{
this.restartRequests.add(now);

return true;
}

if (previousRequest.until(now, SECONDS) < 10)
{
return false;
}

// Allow only 3 requests within the last minute
this.restartRequests.removeIf(requestTime -> requestTime.until(now, SECONDS) > 60);
if (this.restartRequests.size() > 2)
{
return false;
}

this.restartRequests.add(now);

return true;
}

/**
* FIXME: we need to remove "is SIP gateway code", but there are still
* situations where we need to know whether given peer is a human or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ protected IQ processJingleIQ(JingleIQ iq)
case SESSION_INFO:
error = requestHandler.onSessionInfo(session, iq);
break;
case SESSION_TERMINATE:
error = requestHandler.onSessionTerminate(session, iq);
break;
case TRANSPORT_ACCEPT:
error = requestHandler.onTransportAccept(session, iq.getContentList());
break;
Expand Down Expand Up @@ -558,39 +561,47 @@ public void terminateHandlersSessions(JingleRequestHandler requestHandler)
{
if (session.getRequestHandler() == requestHandler)
{
terminateSession(session, Reason.GONE, null);
terminateSession(session, Reason.GONE, null, true);
}
}
}

/**
* Terminates given Jingle session by sending 'session-terminate' with some
* {@link Reason} if provided.
* Terminates given Jingle session. This method is to be called either to send 'session-terminate' or to inform
* this operation set that the session has been terminated as a result of 'session-terminate' received from
* the other peer in which case {@code sendTerminate} should be set to {@code false}.
*
* @param session the <tt>JingleSession</tt> to terminate.
* @param reason one of {@link Reason} enum that indicates why the session
* is being ended or <tt>null</tt> to omit.
* @param sendTerminate when {@code true} it means that a 'session-terminate' is to be sent, otherwise it means
* the session is being ended on the remote peer's request.
* {@inheritDoc}
*/
@Override
public void terminateSession(JingleSession session,
Reason reason,
String message)
String message,
boolean sendTerminate)
{
logger.info("Terminate session: " + session.getAddress());
logger.info(String.format(
"Terminate session: %s, reason: %s, send terminate: %s",
session.getAddress(),
reason,
sendTerminate));

// we do not send session-terminate as muc addresses are invalid at this
// point
// FIXME: but there is also connection address available
JingleIQ terminate
= JinglePacketFactory.createSessionTerminate(
if (sendTerminate)
{
JingleIQ terminate
= JinglePacketFactory.createSessionTerminate(
getOurJID(),
session.getAddress(),
session.getSessionID(),
reason,
message);

getConnection().sendStanza(terminate);
getConnection().sendStanza(terminate);
}

sessions.remove(session.getSessionID());
}
Expand Down
Loading

0 comments on commit ef04a4e

Please sign in to comment.