Skip to content

Commit

Permalink
Add get_stats
Browse files Browse the repository at this point in the history
  • Loading branch information
mickel8 committed Jan 28, 2024
1 parent 7f85ac4 commit ff8c678
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 20 deletions.
9 changes: 9 additions & 0 deletions lib/ex_webrtc/app.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule ExWebRTC.App do

Check warning on line 1 in lib/ex_webrtc/app.ex

View workflow job for this annotation

GitHub Actions / Lint (OTP 26 / Elixir 1.15)

Modules should have a @moduledoc tag.
use Application

def start(_type, _args) do
IO.inspect(:loading_ex_webrtc_app)

Check warning on line 5 in lib/ex_webrtc/app.ex

View workflow job for this annotation

GitHub Actions / Lint (OTP 26 / Elixir 1.15)

There should be no calls to IO.inspect/1.
children = [{Registry, keys: :unique, name: ExWebRTC.Registry}]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
51 changes: 51 additions & 0 deletions lib/ex_webrtc/dtls_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ defmodule ExWebRTC.DTLSTransport do
GenServer.call(dtls_transport, :set_ice_connected)
end

@spec get_local_cert_info(dtls_transport()) :: map()
def get_local_cert_info(dtls_transport) do
GenServer.call(dtls_transport, :get_local_cert_info)
end

@spec get_remote_cert_info(dtls_transport()) :: map()
def get_remote_cert_info(dtls_transport) do
GenServer.call(dtls_transport, :get_remote_cert_info)
end

@spec get_fingerprint(dtls_transport()) :: binary()
def get_fingerprint(dtls_transport) do
GenServer.call(dtls_transport, :get_fingerprint)
Expand Down Expand Up @@ -87,8 +97,12 @@ defmodule ExWebRTC.DTLSTransport do
ice_connected: false,
buffered_packets: nil,
cert: cert,
base64_cert: Base.encode64(cert),
pkey: pkey,
fingerprint: fingerprint,
remote_cert: nil,
remote_base64_cert: nil,
remote_fingerprint: nil,
in_srtp: ExLibSRTP.new(),
out_srtp: ExLibSRTP.new(),
# sha256 hex dump
Expand Down Expand Up @@ -133,6 +147,33 @@ defmodule ExWebRTC.DTLSTransport do
end
end

@impl true
def handle_call(:get_local_cert_info, _from, state) do
cert_info = %{
fingerprint: state.fingerprint,
fingerprint_algorithm: :sha_256,
base64_certificate: state.base64_cert
}

{:reply, cert_info, state}
end

@impl true
def handle_call(:get_remote_cert_info, _from, %{dtls_state: :connected} = state) do
cert_info = %{
fingerprint: state.remote_fingerprint,
fingerprint_algorithm: :sha_256,
base64_certificate: state.remote_base64_cert
}

{:reply, cert_info, state}
end

@impl true
def handle_call(:get_remote_cert_info, _from, state) do
{:reply, nil, state}
end

@impl true
def handle_call(:get_fingerprint, _from, state) do
{:reply, state.fingerprint, state}
Expand Down Expand Up @@ -247,6 +288,7 @@ defmodule ExWebRTC.DTLSTransport do

{:handshake_finished, lkm, rkm, profile, packets} ->
Logger.debug("DTLS handshake finished")
state = update_remote_cert_info(state)
state.ice_transport.send_data(state.ice_pid, packets)

peer_fingerprint =
Expand All @@ -269,6 +311,7 @@ defmodule ExWebRTC.DTLSTransport do
Logger.debug("DTLS handshake finished")
:ok = setup_srtp(state, lkm, rkm, profile)
state = update_dtls_state(state, :connected)
state = update_remote_cert_info(state)
{:ok, state}

:handshake_want_read ->
Expand Down Expand Up @@ -340,5 +383,13 @@ defmodule ExWebRTC.DTLSTransport do
%{state | dtls_state: new_dtls_state}
end

defp update_remote_cert_info(state) do
cert = ExDTLS.get_cert(state.dtls)
fingerprint = ExDTLS.get_cert_fingerprint(cert)
base64_cert = Base.encode64(cert)

%{state | remote_cert: cert, remote_base64_cert: base64_cert, remote_fingerprint: fingerprint}
end

defp notify(dst, msg), do: send(dst, {:dtls_transport, self(), msg})
end
3 changes: 3 additions & 0 deletions lib/ex_webrtc/ice_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule ExWebRTC.ICETransport do
@callback restart(pid()) :: :ok
@callback send_data(pid(), binary()) :: :ok
@callback set_remote_credentials(pid(), ufrag :: binary(), pwd :: binary()) :: :ok
@callback get_stats(pid()) :: map()
@callback stop(pid()) :: :ok
end

Expand Down Expand Up @@ -43,5 +44,7 @@ defmodule ExWebRTC.DefaultICETransport do
@impl true
defdelegate set_remote_credentials(pid, ufrag, pwd), to: ICEAgent
@impl true
defdelegate get_stats(pid), to: ICEAgent
@impl true
defdelegate stop(pid), to: ICEAgent
end
111 changes: 99 additions & 12 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule ExWebRTC.PeerConnection do
MediaStreamTrack,
RTPTransceiver,
RTPSender,
RTPReceiver,
SDPUtils,
SessionDescription,
Utils
Expand Down Expand Up @@ -59,6 +60,11 @@ defmodule ExWebRTC.PeerConnection do
@type connection_state() :: :closed | :failed | :disconnected | :new | :connecting | :connected

#### API ####
@spec get_all_peer_connections() :: [pid()]
def get_all_peer_connections() do
Registry.select(ExWebRTC.Registry, [{{:_, :"$1", :_}, [], [:"$1"]}])
end

@spec start_link(Configuration.options()) :: GenServer.on_start()
def start_link(options \\ []) do
configuration = Configuration.from_options!(options)
Expand Down Expand Up @@ -157,6 +163,11 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:remove_track, sender_id})
end

@spec get_stats(peer_connection()) :: %{String.t() => term()}
def get_stats(peer_connection) do
GenServer.call(peer_connection, :get_stats)
end

@spec send_rtp(peer_connection(), String.t(), ExRTP.Packet.t()) :: :ok
def send_rtp(peer_connection, track_id, packet) do
GenServer.cast(peer_connection, {:send_rtp, track_id, packet})
Expand All @@ -171,6 +182,7 @@ defmodule ExWebRTC.PeerConnection do

@impl true
def init({owner, config}) do
{:ok, _} = Registry.register(ExWebRTC.Registry, self(), self())
ice_config = [stun_servers: config.ice_servers, ip_filter: config.ice_ip_filter, on_data: nil]
{:ok, ice_pid} = DefaultICETransport.start_link(:controlled, ice_config)
{:ok, dtls_transport} = DTLSTransport.start_link(DefaultICETransport, ice_pid)
Expand All @@ -182,6 +194,7 @@ defmodule ExWebRTC.PeerConnection do

state = %{
owner: owner,
stats_id: Utils.generate_id(),
config: config,
current_local_desc: nil,
pending_local_desc: nil,
Expand Down Expand Up @@ -519,7 +532,7 @@ defmodule ExWebRTC.PeerConnection do
{:reply, :ok, state}

true ->
# that's not compliant with the W3C but it's safer not
# that's not compliant with the W3C but it's safer not
# to allow for this until we have clear use case
{:reply, {:error, :invalid_transceiver_direction}, state}
end
Expand Down Expand Up @@ -558,6 +571,78 @@ defmodule ExWebRTC.PeerConnection do
end
end

@impl true
def handle_call(:get_stats, _from, state) do
timestamp = System.os_time(:millisecond)

ice_stats = state.ice_transport.get_stats(state.ice_pid)
local_cert_info = DTLSTransport.get_local_cert_info(state.dtls_transport)
remote_cert_info = DTLSTransport.get_remote_cert_info(state.dtls_transport)

remote_certificate = if remote_cert_info != nil do
%{
id: :remote_certificate,
type: :certificate,
timestamp: timestamp,
fingerprint: Utils.hex_dump(remote_cert_info.fingerprint),
fingerprint_algorithm: remote_cert_info.fingerprint_algorithm,
base64_certificate: remote_cert_info.base64_certificate
}
end

rtp_stats =
Enum.flat_map(state.transceivers, fn tr ->
case tr.current_direction do
:sendonly ->
[RTPSender.get_stats(tr.sender, timestamp)]

:recvonly ->
[RTPReceiver.get_stats(tr.receiver, timestamp)]

:sendrecv ->
[
RTPSender.get_stats(tr.sender, timestamp),
RTPReceiver.get_stats(tr.receiver, timestamp)
]

_other ->
[]
end
end)
# TODO revisit this reject
|> Enum.reject(fn stats -> stats == nil end)
|> Map.new(fn stats -> {stats.id, stats} end)

stats = %{
state.stats_id => %{
id: state.stats_id,
type: :peer_connection,
timestamp: timestamp,
datachannels_opened: 0,
datachannels_closed: 0
},
:transport => %{
id: :transport,
type: :transport,
timestamp: timestamp,
bytes_sent: ice_stats.bytes_sent,
bytes_received: ice_stats.bytes_received
},
:local_certificate => %{
id: :local_certificate,
type: :certificate,
timestamp: timestamp,
fingerprint: Utils.hex_dump(local_cert_info.fingerprint),
fingerprint_algorithm: local_cert_info.fingerprint_algorithm,
base64_certificate: local_cert_info.base64_certificate
},
:remote_certificate => remote_certificate
}

stats = Map.merge(stats, rtp_stats)
{:reply, stats, state}
end

@impl true
def handle_cast({:send_rtp, track_id, packet}, state) do
# TODO: iterating over transceivers is not optimal
Expand Down Expand Up @@ -638,10 +723,12 @@ defmodule ExWebRTC.PeerConnection do
@impl true
def handle_info({:dtls_transport, _pid, {:rtp, data}}, state) do
with {:ok, demuxer, mid, packet} <- Demuxer.demux(state.demuxer, data),
%RTPTransceiver{} = t <- Enum.find(state.transceivers, &(&1.mid == mid)) do
track_id = t.receiver.track.id
notify(state.owner, {:rtp, track_id, packet})
{:noreply, %{state | demuxer: demuxer}}
{idx, %RTPTransceiver{} = t} <- find_transceiver(state.transceivers, mid) do
receiver = RTPReceiver.receive(t.receiver, packet, data)
transceivers = List.update_at(state.transceivers, idx, &%{&1 | receiver: receiver})
state = %{state | demuxer: demuxer, transceivers: transceivers}
notify(state.owner, {:rtp, t.receiver.track.id, packet})
{:noreply, state}
else
nil ->
Logger.warning("Received RTP with unrecognized MID: #{inspect(data)}")
Expand Down Expand Up @@ -717,7 +804,7 @@ defmodule ExWebRTC.PeerConnection do
# mline from the last offer/answer, do it (i.e. recycle free mline)
# * If there is no transceiver's mline, just rewrite
# mline from the offer/answer respecting its port number i.e. whether
# it is rejected or not.
# it is rejected or not.
# This is to preserve the same number of mlines
# between subsequent offer/anser exchanges.
# * At the end, add remaining transceiver mlines
Expand Down Expand Up @@ -751,7 +838,7 @@ defmodule ExWebRTC.PeerConnection do
end

# next_mline_idx is future mline idx to use if there are no mlines to recycle
# next_mid is the next free mid
# next_mid is the next free mid
defp assign_mlines(
transceivers,
last_answer,
Expand Down Expand Up @@ -1154,7 +1241,7 @@ defmodule ExWebRTC.PeerConnection do

# If signaling state is not stable i.e. we are during negotiation,
# don't fire negotiation needed notification.
# We will do this when moving to the stable state as part of the
# We will do this when moving to the stable state as part of the
# steps for setting remote description.
defp update_negotiation_needed(%{signaling_state: sig_state} = state) when sig_state != :stable,
do: state
Expand All @@ -1172,14 +1259,14 @@ defmodule ExWebRTC.PeerConnection do

negotiation_needed == false ->
# We need to clear the flag.
# Consider scenario where we add a transceiver and then
# remove it without performing negotiation.
# Consider scenario where we add a transceiver and then
# remove it without performing negotiation.
# At the end of the day, negotiation_needed flag has to be cleared.
%{state | negotiation_needed: false}
end
end

# We don't support MSIDs and stopping transceivers so
# We don't support MSIDs and stopping transceivers so
# we only check 5.2 and 5.3 from 4.7.3#check-if-negotiation-is-needed
# https://www.w3.org/TR/webrtc/#dfn-check-if-negotiation-is-needed
defp negotiation_needed?([], _), do: false
Expand All @@ -1199,7 +1286,7 @@ defmodule ExWebRTC.PeerConnection do
cond do
# Consider the following scenario:
# 1. offerer offers sendrecv
# 2. answerer answers with recvonly
# 2. answerer answers with recvonly
# 3. offerer changes from sendrecv to sendonly
# We don't need to renegotiate in such a case.
local_desc_type == :offer and
Expand Down
30 changes: 28 additions & 2 deletions lib/ex_webrtc/rtp_receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,34 @@ defmodule ExWebRTC.RTPReceiver do
alias ExWebRTC.MediaStreamTrack

@type t() :: %__MODULE__{
track: MediaStreamTrack.t() | nil
track: MediaStreamTrack.t(),
ssrc: non_neg_integer() | nil,
bytes_received: non_neg_integer(),
packets_received: non_neg_integer()
}

defstruct [:track]
defstruct [:track, :ssrc, bytes_received: 0, packets_received: 0]

@doc false
def receive(receiver, packet, raw_packet) do
%__MODULE__{
receiver
| ssrc: packet.ssrc,
bytes_received: receiver.bytes_received + byte_size(raw_packet),
packets_received: receiver.packets_received + 1
}
end

@doc false
@spec get_stats(t(), non_neg_integer()) :: map()
def get_stats(receiver, timestamp) do
%{
id: receiver.track.id,
type: :inbound_rtp,
timestamp: timestamp,
ssrc: receiver.ssrc,
bytes_received: receiver.bytes_received,
packets_received: receiver.packets_received
}
end
end
Loading

0 comments on commit ff8c678

Please sign in to comment.