diff --git a/client/api.go b/client/api.go new file mode 100644 index 0000000..6e33e6a --- /dev/null +++ b/client/api.go @@ -0,0 +1,210 @@ +// Copyright (c) 2022-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package client + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "time" + + "github.com/pion/webrtc/v3" +) + +const ( + httpRequestTimeout = 10 * time.Second + httpResponseBodyMaxSizeBytes = 1024 * 1024 // 1MB +) + +func (c *Client) Unmute(track webrtc.TrackLocal) error { + if track == nil { + return fmt.Errorf("invalid nil track") + } + + c.mut.RLock() + sender := c.voiceSender + c.mut.RUnlock() + + if sender == nil { + snd, err := c.pc.AddTrack(track) + if err != nil { + return fmt.Errorf("failed to add track: %w", err) + } + c.mut.Lock() + c.voiceSender = snd + sender = snd + c.mut.Unlock() + } else { + if err := sender.ReplaceTrack(track); err != nil { + return fmt.Errorf("failed to replace track: %w", err) + } + } + + go func() { + defer log.Printf("exiting RTCP handler") + rtcpBuf := make([]byte, receiveMTU) + for { + if _, _, rtcpErr := sender.Read(rtcpBuf); rtcpErr != nil { + log.Printf("failed to read rtcp: %s", rtcpErr.Error()) + return + } + } + }() + + return c.SendWS(wsEventUnmute, nil, false) +} + +func (c *Client) Mute() error { + c.mut.Lock() + defer c.mut.Unlock() + + if c.voiceSender != nil { + if err := c.voiceSender.ReplaceTrack(nil); err != nil { + return fmt.Errorf("failed to replace track: %w", err) + } + } + + return c.sendWS(wsEventMute, nil, false) +} + +func (c *Client) StartScreenShare(tracks []webrtc.TrackLocal) (*webrtc.RTPTransceiver, error) { + if len(tracks) == 0 { + return nil, fmt.Errorf("invalid empty tracks") + } + + if len(tracks) > 2 { + return nil, fmt.Errorf("too many tracks") + } + + data, err := json.Marshal(map[string]string{ + "screenStreamID": tracks[0].StreamID(), + }) + if err != nil { + return nil, fmt.Errorf("failed to marshal data: %w", err) + } + + if err := c.SendWS(wsEventScreenOn, map[string]any{ + "data": string(data), + }, false); err != nil { + return nil, fmt.Errorf("failed to send screen on event: %w", err) + } + + trx, err := c.pc.AddTransceiverFromTrack(tracks[0], webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}) + if err != nil { + return nil, fmt.Errorf("failed to add transceiver for track: %w", err) + } + + // Simulcast + if len(tracks) > 1 { + if err := trx.Sender().AddEncoding(tracks[1]); err != nil { + return nil, fmt.Errorf("failed to add encoding: %w", err) + } + } + + c.mut.Lock() + c.screenTransceiver = trx + c.mut.Unlock() + + sender := trx.Sender() + + go func() { + defer log.Printf("exiting RTCP handler") + rtcpBuf := make([]byte, receiveMTU) + for { + if _, _, rtcpErr := sender.Read(rtcpBuf); rtcpErr != nil { + log.Printf("failed to read rtcp: %s", rtcpErr.Error()) + return + } + } + }() + + return trx, nil +} + +func (c *Client) StopScreenShare() error { + c.mut.Lock() + defer c.mut.Unlock() + + if c.screenTransceiver != nil { + if err := c.pc.RemoveTrack(c.screenTransceiver.Sender()); err != nil { + return fmt.Errorf("failed to remove track: %w", err) + } + c.screenTransceiver = nil + } + + return c.sendWS(wsEventScreenOff, nil, false) +} + +func (c *Client) RaiseHand() error { + return c.SendWS(wsEventRaiseHand, nil, false) +} + +func (c *Client) LowerHand() error { + return c.SendWS(wsEventLowerHand, nil, false) +} + +func (c *Client) StartRecording() error { + ctx, cancel := context.WithTimeout(context.Background(), httpRequestTimeout) + defer cancel() + res, err := c.apiClient.DoAPIRequest(ctx, http.MethodPost, + fmt.Sprintf("%s/plugins/%s/calls/%s/recording/start", c.cfg.SiteURL, pluginID, c.cfg.ChannelID), "", "") + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != 200 { + return fmt.Errorf("unexpected response status code %d", res.StatusCode) + } + + return nil +} + +func (c *Client) StopRecording() error { + ctx, cancel := context.WithTimeout(context.Background(), httpRequestTimeout) + defer cancel() + res, err := c.apiClient.DoAPIRequest(ctx, http.MethodPost, + fmt.Sprintf("%s/plugins/%s/calls/%s/recording/stop", c.cfg.SiteURL, pluginID, c.cfg.ChannelID), "", "") + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != 200 { + return fmt.Errorf("unexpected response status code %d", res.StatusCode) + } + + return nil +} + +// TODO: return a proper Config object, ideally exposed in github.com/mattermost/mattermost-plugin-calls/server/public. +func (c *Client) GetCallsConfig() (map[string]any, error) { + ctx, cancel := context.WithTimeout(context.Background(), httpRequestTimeout) + defer cancel() + res, err := c.apiClient.DoAPIRequest(ctx, http.MethodGet, + fmt.Sprintf("%s/plugins/%s/config", c.cfg.SiteURL, pluginID), "", "") + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != 200 { + return nil, fmt.Errorf("unexpected response status code %d", res.StatusCode) + } + + dec := json.NewDecoder(&io.LimitedReader{ + R: res.Body, + N: httpResponseBodyMaxSizeBytes, + }) + + var config map[string]any + if err := dec.Decode(&config); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return config, nil +} diff --git a/client/client.go b/client/client.go index b69e999..796cb7c 100644 --- a/client/client.go +++ b/client/client.go @@ -13,6 +13,8 @@ import ( "github.com/mattermost/rtcd/service/ws" + "github.com/mattermost/mattermost/server/public/model" + "github.com/pion/webrtc/v3" ) @@ -21,22 +23,23 @@ type EventHandler func(ctx any) error type EventType string const ( - WSConnectEvent EventType = "WSConnect" - WSDisconnectEvent EventType = "WSDisconnect" - WSCallJoinEvent EventType = "WSCallJoin" - WSCallRecordingState EventType = "WSCallRecordingState" - WSCallJobState EventType = "WSCallJobState" - WSJobStopEvent EventType = "WSStopJobEvent" - RTCConnectEvent EventType = "RTCConnect" - RTCDisconnectEvent EventType = "RTCDisconnect" - RTCTrackEvent EventType = "RTCTrack" - CloseEvent EventType = "Close" - ErrorEvent EventType = "Error" + WSConnectEvent EventType = "WSConnect" + WSDisconnectEvent EventType = "WSDisconnect" + WSCallJoinEvent EventType = "WSCallJoin" + WSCallRecordingState EventType = "WSCallRecordingState" + WSCallJobState EventType = "WSCallJobState" + WSJobStopEvent EventType = "WSStopJobEvent" + RTCConnectEvent EventType = "RTCConnect" + RTCDisconnectEvent EventType = "RTCDisconnect" + RTCTrackEvent EventType = "RTCTrack" + CloseEvent EventType = "Close" + ErrorEvent EventType = "Error" + WSCallHostChangedEvent EventType = "WSCallHostChanged" ) func (e EventType) IsValid() bool { switch e { - case WSConnectEvent, WSDisconnectEvent, WSCallJoinEvent, WSCallRecordingState, + case WSConnectEvent, WSDisconnectEvent, WSCallJoinEvent, WSCallRecordingState, WSCallHostChangedEvent, WSCallJobState, WSJobStopEvent, RTCConnectEvent, RTCDisconnectEvent, RTCTrackEvent, CloseEvent, ErrorEvent: return true @@ -62,6 +65,9 @@ type Client struct { handlers map[EventType]EventHandler + // HTTP API + apiClient *model.Client4 + // WebSocket ws *ws.Client wsDoneCh chan struct{} @@ -73,10 +79,12 @@ type Client struct { currentConnID string // WebRTC - pc *webrtc.PeerConnection - dc *webrtc.DataChannel - iceCh chan webrtc.ICECandidateInit - receivers map[string]*webrtc.RTPReceiver + pc *webrtc.PeerConnection + dc *webrtc.DataChannel + iceCh chan webrtc.ICECandidateInit + receivers map[string]*webrtc.RTPReceiver + voiceSender *webrtc.RTPSender + screenTransceiver *webrtc.RTPTransceiver state int32 @@ -91,6 +99,9 @@ func New(cfg Config, opts ...Option) (*Client, error) { return nil, fmt.Errorf("failed to validate config: %w", err) } + apiClient := model.NewAPIv4Client(cfg.SiteURL) + apiClient.SetToken(cfg.AuthToken) + c := &Client{ cfg: cfg, handlers: make(map[EventType]EventHandler), @@ -99,6 +110,7 @@ func New(cfg Config, opts ...Option) (*Client, error) { wsClientSeqNo: 1, iceCh: make(chan webrtc.ICECandidateInit, iceChSize), receivers: make(map[string]*webrtc.RTPReceiver), + apiClient: apiClient, } for _, opt := range opts { diff --git a/client/rtc.go b/client/rtc.go index 87979fe..9144652 100644 --- a/client/rtc.go +++ b/client/rtc.go @@ -13,6 +13,7 @@ import ( "log" "sync/atomic" + "github.com/pion/interceptor" "github.com/pion/webrtc/v3" ) @@ -25,6 +26,14 @@ const ( receiveMTU = 1460 ) +var ( + rtpVideoExtensions = []string{ + "urn:ietf:params:rtp-hdrext:sdes:mid", + "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id", + "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id", + } +) + func (c *Client) handleWSEventSignal(evData map[string]any) error { data, ok := evData["data"].(string) if !ok { @@ -138,7 +147,25 @@ func (c *Client) initRTCSession() error { SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, } - pc, err := webrtc.NewPeerConnection(cfg) + var m webrtc.MediaEngine + if err := m.RegisterDefaultCodecs(); err != nil { + return fmt.Errorf("failed to register default codecs: %w", err) + } + + i := interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(&m, &i); err != nil { + return fmt.Errorf("failed to register default interceptors: %w", err) + } + + for _, ext := range rtpVideoExtensions { + if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: ext}, webrtc.RTPCodecTypeVideo); err != nil { + return fmt.Errorf("failed to register header extension: %w", err) + } + } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(&m), webrtc.WithInterceptorRegistry(&i)) + + pc, err := api.NewPeerConnection(cfg) if err != nil { return fmt.Errorf("failed to create new peer connection: %s", err) } diff --git a/client/websocket.go b/client/websocket.go index 2e97d82..a6562b1 100644 --- a/client/websocket.go +++ b/client/websocket.go @@ -28,17 +28,25 @@ const ( ) const ( - wsEventJoin = wsEvPrefix + "join" - wsEventLeave = wsEvPrefix + "leave" - wsEventReconnect = wsEvPrefix + "reconnect" - wsEventSignal = wsEvPrefix + "signal" - wsEventICE = wsEvPrefix + "ice" - wsEventSDP = wsEvPrefix + "sdp" - wsEventError = wsEvPrefix + "error" - wsEventUserLeft = wsEvPrefix + "user_left" - wsEventCallEnd = wsEvPrefix + "call_end" - wsEventCallJobState = wsEvPrefix + "call_job_state" - wsEventJobStop = wsEvPrefix + "job_stop" + wsEventJoin = wsEvPrefix + "join" + wsEventLeave = wsEvPrefix + "leave" + wsEventReconnect = wsEvPrefix + "reconnect" + wsEventSignal = wsEvPrefix + "signal" + wsEventICE = wsEvPrefix + "ice" + wsEventSDP = wsEvPrefix + "sdp" + wsEventError = wsEvPrefix + "error" + wsEventUserLeft = wsEvPrefix + "user_left" + wsEventCallEnd = wsEvPrefix + "call_end" + wsEventCallJobState = wsEvPrefix + "call_job_state" + wsEventJobStop = wsEvPrefix + "job_stop" + wsEventMute = wsEvPrefix + "mute" + wsEventUnmute = wsEvPrefix + "unmute" + wsEventScreenOn = wsEvPrefix + "screen_on" + wsEventScreenOff = wsEvPrefix + "screen_off" + wsEventRaiseHand = wsEvPrefix + "raise_hand" + wsEventLowerHand = wsEvPrefix + "unraise_hand" + wsEventReact = wsEvPrefix + "react" + wsEventCallHostChanged = wsEvPrefix + "call_host_changed" ) var ( @@ -46,10 +54,7 @@ var ( errCallEnded = errors.New("call ended") ) -func (c *Client) SendWS(ev string, msg any, binary bool) error { - c.mut.Lock() - defer c.mut.Unlock() - +func (c *Client) sendWS(ev string, msg any, binary bool) error { var err error var data []byte var msgType ws.MessageType @@ -80,6 +85,13 @@ func (c *Client) SendWS(ev string, msg any, binary bool) error { return nil } +func (c *Client) SendWS(ev string, msg any, binary bool) error { + c.mut.Lock() + defer c.mut.Unlock() + + return c.sendWS(ev, msg, binary) +} + func (c *Client) handleWSEventHello(ev *model.WebSocketEvent) (isReconnect bool, err error) { connID, ok := ev.GetData()["connection_id"].(string) if !ok || connID == "" { @@ -184,6 +196,12 @@ func (c *Client) handleWSMsg(msg ws.Message) error { return errCallEnded } case wsEventCallJobState: + callID, _ := ev.GetData()["callID"].(string) + if callID != c.cfg.ChannelID { + // Ignore if the event is not for the current call/channel. + return nil + } + data, ok := ev.GetData()["jobState"].(map[string]any) if !ok { return fmt.Errorf("invalid recording state") @@ -200,6 +218,20 @@ func (c *Client) handleWSMsg(msg ws.Message) error { case wsEventJobStop: jobID, _ := ev.GetData()["job_id"].(string) c.emit(WSJobStopEvent, jobID) + case wsEventCallHostChanged: + channelID := ev.GetBroadcast().ChannelId + if channelID == "" { + channelID, _ = ev.GetData()["channelID"].(string) + } + if channelID != c.cfg.ChannelID { + return nil + } + hostID, _ := ev.GetData()["hostID"].(string) + if hostID == "" { + return fmt.Errorf("unexpected empty hostID") + } + + c.emit(WSCallHostChangedEvent, hostID) default: } case ws.BinaryMessage: