Skip to content

Commit

Permalink
Implement more public API functionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 committed Apr 12, 2024
1 parent 319f557 commit 815781e
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 32 deletions.
210 changes: 210 additions & 0 deletions client/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright (c) 2022-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.

package client

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"

"github.com/pion/webrtc/v3"
)

const (
httpRequestTimeout = 10 * time.Second
httpResponseBodyMaxSizeBytes = 1024 * 1024 // 1MB
)

func (c *Client) Unmute(track webrtc.TrackLocal) error {
if track == nil {
return fmt.Errorf("invalid nil track")
}

c.mut.RLock()
sender := c.voiceSender
c.mut.RUnlock()

if sender == nil {
snd, err := c.pc.AddTrack(track)
if err != nil {
return fmt.Errorf("failed to add track: %w", err)
}
c.mut.Lock()
c.voiceSender = snd
sender = snd
c.mut.Unlock()
} else {
if err := sender.ReplaceTrack(track); err != nil {
return fmt.Errorf("failed to replace track: %w", err)
}
}

go func() {
defer log.Printf("exiting RTCP handler")
rtcpBuf := make([]byte, receiveMTU)
for {
if _, _, rtcpErr := sender.Read(rtcpBuf); rtcpErr != nil {
log.Printf("failed to read rtcp: %s", rtcpErr.Error())
return
}
}
}()

return c.SendWS(wsEventUnmute, nil, false)
}

func (c *Client) Mute() error {
c.mut.Lock()
defer c.mut.Unlock()

if c.voiceSender != nil {
if err := c.voiceSender.ReplaceTrack(nil); err != nil {
return fmt.Errorf("failed to replace track: %w", err)
}
}

return c.sendWS(wsEventMute, nil, false)
}

func (c *Client) StartScreenShare(tracks []webrtc.TrackLocal) (*webrtc.RTPTransceiver, error) {
if len(tracks) == 0 {
return nil, fmt.Errorf("invalid empty tracks")
}

if len(tracks) > 2 {
return nil, fmt.Errorf("too many tracks")
}

data, err := json.Marshal(map[string]string{
"screenStreamID": tracks[0].StreamID(),
})
if err != nil {
return nil, fmt.Errorf("failed to marshal data: %w", err)
}

if err := c.SendWS(wsEventScreenOn, map[string]any{
"data": string(data),
}, false); err != nil {
return nil, fmt.Errorf("failed to send screen on event: %w", err)
}

trx, err := c.pc.AddTransceiverFromTrack(tracks[0], webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly})
if err != nil {
return nil, fmt.Errorf("failed to add transceiver for track: %w", err)
}

// Simulcast
if len(tracks) > 1 {
if err := trx.Sender().AddEncoding(tracks[1]); err != nil {
return nil, fmt.Errorf("failed to add encoding: %w", err)
}
}

c.mut.Lock()
c.screenTransceiver = trx
c.mut.Unlock()

sender := trx.Sender()

go func() {
defer log.Printf("exiting RTCP handler")
rtcpBuf := make([]byte, receiveMTU)
for {
if _, _, rtcpErr := sender.Read(rtcpBuf); rtcpErr != nil {
log.Printf("failed to read rtcp: %s", rtcpErr.Error())
return
}
}
}()

return trx, nil
}

func (c *Client) StopScreenShare() error {
c.mut.Lock()
defer c.mut.Unlock()

if c.screenTransceiver != nil {
if err := c.pc.RemoveTrack(c.screenTransceiver.Sender()); err != nil {
return fmt.Errorf("failed to remove track: %w", err)
}
c.screenTransceiver = nil
}

return c.sendWS(wsEventScreenOff, nil, false)
}

func (c *Client) RaiseHand() error {
return c.SendWS(wsEventRaiseHand, nil, false)
}

func (c *Client) LowerHand() error {
return c.SendWS(wsEventLowerHand, nil, false)
}

func (c *Client) StartRecording() error {
ctx, cancel := context.WithTimeout(context.Background(), httpRequestTimeout)
defer cancel()
res, err := c.apiClient.DoAPIRequest(ctx, http.MethodPost,
fmt.Sprintf("%s/plugins/%s/calls/%s/recording/start", c.cfg.SiteURL, pluginID, c.cfg.ChannelID), "", "")
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
defer res.Body.Close()

if res.StatusCode != 200 {
return fmt.Errorf("unexpected response status code %d", res.StatusCode)
}

return nil
}

func (c *Client) StopRecording() error {
ctx, cancel := context.WithTimeout(context.Background(), httpRequestTimeout)
defer cancel()
res, err := c.apiClient.DoAPIRequest(ctx, http.MethodPost,
fmt.Sprintf("%s/plugins/%s/calls/%s/recording/stop", c.cfg.SiteURL, pluginID, c.cfg.ChannelID), "", "")
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
defer res.Body.Close()

if res.StatusCode != 200 {
return fmt.Errorf("unexpected response status code %d", res.StatusCode)
}

return nil
}

// TODO: return a proper Config object, ideally exposed in github.com/mattermost/mattermost-plugin-calls/server/public.
func (c *Client) GetCallsConfig() (map[string]any, error) {
ctx, cancel := context.WithTimeout(context.Background(), httpRequestTimeout)
defer cancel()
res, err := c.apiClient.DoAPIRequest(ctx, http.MethodGet,
fmt.Sprintf("%s/plugins/%s/config", c.cfg.SiteURL, pluginID), "", "")
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
defer res.Body.Close()

if res.StatusCode != 200 {
return nil, fmt.Errorf("unexpected response status code %d", res.StatusCode)
}

dec := json.NewDecoder(&io.LimitedReader{
R: res.Body,
N: httpResponseBodyMaxSizeBytes,
})

var config map[string]any
if err := dec.Decode(&config); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

return config, nil
}
44 changes: 28 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/mattermost/rtcd/service/ws"

"github.com/mattermost/mattermost/server/public/model"

"github.com/pion/webrtc/v3"
)

Expand All @@ -21,22 +23,23 @@ type EventHandler func(ctx any) error
type EventType string

const (
WSConnectEvent EventType = "WSConnect"
WSDisconnectEvent EventType = "WSDisconnect"
WSCallJoinEvent EventType = "WSCallJoin"
WSCallRecordingState EventType = "WSCallRecordingState"
WSCallJobState EventType = "WSCallJobState"
WSJobStopEvent EventType = "WSStopJobEvent"
RTCConnectEvent EventType = "RTCConnect"
RTCDisconnectEvent EventType = "RTCDisconnect"
RTCTrackEvent EventType = "RTCTrack"
CloseEvent EventType = "Close"
ErrorEvent EventType = "Error"
WSConnectEvent EventType = "WSConnect"
WSDisconnectEvent EventType = "WSDisconnect"
WSCallJoinEvent EventType = "WSCallJoin"
WSCallRecordingState EventType = "WSCallRecordingState"
WSCallJobState EventType = "WSCallJobState"
WSJobStopEvent EventType = "WSStopJobEvent"
RTCConnectEvent EventType = "RTCConnect"
RTCDisconnectEvent EventType = "RTCDisconnect"
RTCTrackEvent EventType = "RTCTrack"
CloseEvent EventType = "Close"
ErrorEvent EventType = "Error"
WSCallHostChangedEvent EventType = "WSCallHostChanged"
)

func (e EventType) IsValid() bool {
switch e {
case WSConnectEvent, WSDisconnectEvent, WSCallJoinEvent, WSCallRecordingState,
case WSConnectEvent, WSDisconnectEvent, WSCallJoinEvent, WSCallRecordingState, WSCallHostChangedEvent,
WSCallJobState, WSJobStopEvent, RTCConnectEvent, RTCDisconnectEvent,
RTCTrackEvent, CloseEvent, ErrorEvent:
return true
Expand All @@ -62,6 +65,9 @@ type Client struct {

handlers map[EventType]EventHandler

// HTTP API
apiClient *model.Client4

// WebSocket
ws *ws.Client
wsDoneCh chan struct{}
Expand All @@ -73,10 +79,12 @@ type Client struct {
currentConnID string

// WebRTC
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
iceCh chan webrtc.ICECandidateInit
receivers map[string]*webrtc.RTPReceiver
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
iceCh chan webrtc.ICECandidateInit
receivers map[string]*webrtc.RTPReceiver
voiceSender *webrtc.RTPSender
screenTransceiver *webrtc.RTPTransceiver

state int32

Expand All @@ -91,6 +99,9 @@ func New(cfg Config, opts ...Option) (*Client, error) {
return nil, fmt.Errorf("failed to validate config: %w", err)
}

apiClient := model.NewAPIv4Client(cfg.SiteURL)
apiClient.SetToken(cfg.AuthToken)

c := &Client{
cfg: cfg,
handlers: make(map[EventType]EventHandler),
Expand All @@ -99,6 +110,7 @@ func New(cfg Config, opts ...Option) (*Client, error) {
wsClientSeqNo: 1,
iceCh: make(chan webrtc.ICECandidateInit, iceChSize),
receivers: make(map[string]*webrtc.RTPReceiver),
apiClient: apiClient,
}

for _, opt := range opts {
Expand Down
29 changes: 28 additions & 1 deletion client/rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"log"
"sync/atomic"

"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
)

Expand All @@ -25,6 +26,14 @@ const (
receiveMTU = 1460
)

var (
rtpVideoExtensions = []string{
"urn:ietf:params:rtp-hdrext:sdes:mid",
"urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
}
)

func (c *Client) handleWSEventSignal(evData map[string]any) error {
data, ok := evData["data"].(string)
if !ok {
Expand Down Expand Up @@ -138,7 +147,25 @@ func (c *Client) initRTCSession() error {
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
}

pc, err := webrtc.NewPeerConnection(cfg)
var m webrtc.MediaEngine
if err := m.RegisterDefaultCodecs(); err != nil {
return fmt.Errorf("failed to register default codecs: %w", err)
}

i := interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(&m, &i); err != nil {
return fmt.Errorf("failed to register default interceptors: %w", err)
}

for _, ext := range rtpVideoExtensions {
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: ext}, webrtc.RTPCodecTypeVideo); err != nil {
return fmt.Errorf("failed to register header extension: %w", err)
}
}

api := webrtc.NewAPI(webrtc.WithMediaEngine(&m), webrtc.WithInterceptorRegistry(&i))

pc, err := api.NewPeerConnection(cfg)
if err != nil {
return fmt.Errorf("failed to create new peer connection: %s", err)
}
Expand Down
Loading

0 comments on commit 815781e

Please sign in to comment.