From d5a747c93b1c12adffecd7d7b8294ff559ab0266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Wed, 21 May 2025 14:40:48 +0200 Subject: [PATCH] wip add support for dtmf --- examples/dtmf/.formatter.exs | 4 + examples/dtmf/.gitignore | 23 +++ examples/dtmf/README.md | 11 + examples/dtmf/config/config.exs | 8 + examples/dtmf/lib/dtmf.ex | 15 ++ examples/dtmf/lib/dtmf/peer_handler.ex | 207 +++++++++++++++++++ examples/dtmf/lib/dtmf/router.ex | 15 ++ examples/dtmf/mix.exs | 32 +++ examples/dtmf/mix.lock | 36 ++++ examples/dtmf/priv/static/index.html | 54 +++++ examples/dtmf/priv/static/script.js | 66 ++++++ lib/ex_webrtc/rtp/depayloader.ex | 8 +- lib/ex_webrtc/rtp/dtmf/depayloader.ex | 55 +++++ lib/ex_webrtc/rtp/g711/depayloader.ex | 2 - lib/ex_webrtc/rtp/g711/payloader.ex | 1 - lib/ex_webrtc/rtp/opus/depayloader.ex | 2 - lib/ex_webrtc/rtp/opus/payloader.ex | 1 - lib/ex_webrtc/rtp_receiver.ex | 25 ++- test/ex_webrtc/rtp/depayloader_test.exs | 14 ++ test/ex_webrtc/rtp/dtmf/depayloader_test.exs | 58 ++++++ test/ex_webrtc/rtp/vp8/depayloader_test.exs | 2 +- 21 files changed, 624 insertions(+), 15 deletions(-) create mode 100644 examples/dtmf/.formatter.exs create mode 100644 examples/dtmf/.gitignore create mode 100644 examples/dtmf/README.md create mode 100644 examples/dtmf/config/config.exs create mode 100644 examples/dtmf/lib/dtmf.ex create mode 100644 examples/dtmf/lib/dtmf/peer_handler.ex create mode 100644 examples/dtmf/lib/dtmf/router.ex create mode 100644 examples/dtmf/mix.exs create mode 100644 examples/dtmf/mix.lock create mode 100644 examples/dtmf/priv/static/index.html create mode 100644 examples/dtmf/priv/static/script.js create mode 100644 lib/ex_webrtc/rtp/dtmf/depayloader.ex create mode 100644 test/ex_webrtc/rtp/dtmf/depayloader_test.exs diff --git a/examples/dtmf/.formatter.exs b/examples/dtmf/.formatter.exs new file mode 100644 index 00000000..d2cda26e --- /dev/null +++ b/examples/dtmf/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/examples/dtmf/.gitignore b/examples/dtmf/.gitignore new file mode 100644 index 00000000..cc5804b4 --- /dev/null +++ b/examples/dtmf/.gitignore @@ -0,0 +1,23 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +dtmf-*.tar + +# Temporary files, for example, from tests. +/tmp/ diff --git a/examples/dtmf/README.md b/examples/dtmf/README.md new file mode 100644 index 00000000..839b047d --- /dev/null +++ b/examples/dtmf/README.md @@ -0,0 +1,11 @@ +# DTMF + +Receive DTMF tones from a browser and log them in the console. + +While in `examples/dtmf` directory + +1. Run `mix deps.get` +2. Run `mix run --no-halt` +3. Visit `http://127.0.0.1:8829/index.html` in your browser and start sending events via dial pad. + +The IP and port of the app can be configured in `config/config.exs`. diff --git a/examples/dtmf/config/config.exs b/examples/dtmf/config/config.exs new file mode 100644 index 00000000..f1ab9a9c --- /dev/null +++ b/examples/dtmf/config/config.exs @@ -0,0 +1,8 @@ +import Config + +config :logger, level: :info + +# normally you take these from env variables in `config/runtime.exs` +config :dtmf, + ip: {127, 0, 0, 1}, + port: 8829 diff --git a/examples/dtmf/lib/dtmf.ex b/examples/dtmf/lib/dtmf.ex new file mode 100644 index 00000000..c4f24585 --- /dev/null +++ b/examples/dtmf/lib/dtmf.ex @@ -0,0 +1,15 @@ +defmodule Dtmf do + use Application + + @ip Application.compile_env!(:dtmf, :ip) + @port Application.compile_env!(:dtmf, :port) + + @impl true + def start(_type, _args) do + children = [ + {Bandit, plug: __MODULE__.Router, ip: @ip, port: @port} + ] + + Supervisor.start_link(children, strategy: :one_for_one) + end +end diff --git a/examples/dtmf/lib/dtmf/peer_handler.ex b/examples/dtmf/lib/dtmf/peer_handler.ex new file mode 100644 index 00000000..4fd561e8 --- /dev/null +++ b/examples/dtmf/lib/dtmf/peer_handler.ex @@ -0,0 +1,207 @@ +defmodule Dtmf.PeerHandler do + require Logger + + alias ExWebRTC.{ + ICECandidate, + MediaStreamTrack, + PeerConnection, + RTPCodecParameters, + RTP.Depayloader, + RTP.JitterBuffer, + SessionDescription + } + + @behaviour WebSock + + @ice_servers [ + %{urls: "stun:stun.l.google.com:19302"} + ] + + @audio_codecs [ + %RTPCodecParameters{ + payload_type: 111, + mime_type: "audio/opus", + clock_rate: 48_000, + channels: 2 + }, + %RTPCodecParameters{ + payload_type: 126, + mime_type: "audio/telephone-event", + clock_rate: 8000, + channels: 1 + } + ] + + @impl true + def init(_) do + {:ok, pc} = + PeerConnection.start_link( + ice_servers: @ice_servers, + video_codecs: [], + audio_codecs: @audio_codecs + ) + + state = %{ + peer_connection: pc, + in_audio_track_id: nil, + # The flow of this example is as follows: + # we first feed rtp packets into jitter buffer to + # wait for retransmissions and fix ordering. + # Once ordering and gaps are fixed, we feed packets + # to the depayloader, which detects DTMF events. + # Note that depayloader takes all RTP packets (both Opus and DTMF), + # but ignores those that are not DTMF ones. + # This is to avoid demuxing packets by the user. + jitter_buffer: nil, + jitter_timer: nil, + depayloader: nil + } + + {:ok, state} + end + + @impl true + def handle_in({msg, [opcode: :text]}, state) do + msg + |> Jason.decode!() + |> handle_ws_msg(state) + end + + @impl true + def handle_info({:ex_webrtc, _from, msg}, state) do + handle_webrtc_msg(msg, state) + end + + @impl true + def handle_info(:jitter_buffer_timeout, state) do + state = %{state | jitter_timer: nil} + + state.jitter_buffer + |> JitterBuffer.handle_timeout() + |> handle_jitter_buffer_result(state) + end + + @impl true + def handle_info({:EXIT, pc, reason}, %{peer_connection: pc} = state) do + # Bandit traps exits under the hood so our PeerConnection.start_link + # won't automatically bring this process down. + Logger.info("Peer connection process exited, reason: #{inspect(reason)}") + {:stop, {:shutdown, :pc_closed}, state} + end + + @impl true + def terminate(reason, _state) do + Logger.info("WebSocket connection was terminated, reason: #{inspect(reason)}") + end + + defp handle_ws_msg(%{"type" => "offer", "data" => data}, state) do + Logger.info("Received SDP offer:\n#{data["sdp"]}") + + offer = SessionDescription.from_json(data) + :ok = PeerConnection.set_remote_description(state.peer_connection, offer) + + {:ok, answer} = PeerConnection.create_answer(state.peer_connection) + :ok = PeerConnection.set_local_description(state.peer_connection, answer) + + answer_json = SessionDescription.to_json(answer) + + msg = + %{"type" => "answer", "data" => answer_json} + |> Jason.encode!() + + Logger.info("Sent SDP answer:\n#{answer_json["sdp"]}") + + {:push, {:text, msg}, state} + end + + defp handle_ws_msg(%{"type" => "ice", "data" => data}, state) do + Logger.info("Received ICE candidate: #{data["candidate"]}") + + candidate = ICECandidate.from_json(data) + :ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate) + {:ok, state} + end + + defp handle_webrtc_msg({:connection_state_change, conn_state}, state) do + Logger.info("Connection state changed: #{conn_state}") + + if conn_state == :failed do + {:stop, {:shutdown, :pc_failed}, state} + else + {:ok, state} + end + end + + defp handle_webrtc_msg({:ice_candidate, candidate}, state) do + candidate_json = ICECandidate.to_json(candidate) + + msg = + %{"type" => "ice", "data" => candidate_json} + |> Jason.encode!() + + Logger.info("Sent ICE candidate: #{candidate_json["candidate"]}") + + {:push, {:text, msg}, state} + end + + defp handle_webrtc_msg({:track, %MediaStreamTrack{kind: :audio, id: id}}, state) do + # Find dtmf codec. Its config (payload type) might have changed during negotiation. + tr = + state.peer_connection + |> PeerConnection.get_transceivers() + |> Enum.find(fn tr -> tr.receiver.track.id == id end) + + codec = Enum.find(tr.codecs, fn codec -> codec.mime_type == "audio/telephone-event" end) + + if codec == nil do + raise "DTMF for the track has not been negotiated." + end + + jitter_buffer = JitterBuffer.new() + {:ok, depayloader} = Depayloader.new(codec) + + state = %{ + state + | in_audio_track_id: id, + jitter_buffer: jitter_buffer, + depayloader: depayloader + } + + {:ok, state} + end + + defp handle_webrtc_msg({:rtp, id, nil, packet}, %{in_audio_track_id: id} = state) do + state.jitter_buffer + |> JitterBuffer.insert(packet) + |> handle_jitter_buffer_result(state) + end + + defp handle_webrtc_msg(_msg, state), do: {:ok, state} + + defp handle_jitter_buffer_result({packets, timeout, jitter_buffer}, state) do + state = %{state | jitter_buffer: jitter_buffer} + + # set a new timer only if the previous one has expired + state = + if timeout != nil and state.jitter_timer == nil do + timer = Process.send_after(self(), :jitter_buffer_timeout, timeout) + %{state | jitter_timer: timer} + else + state + end + + state = + Enum.reduce(packets, state, fn packet, state -> + case Depayloader.depayload(state.depayloader, packet) do + {nil, depayloader} -> + %{state | depayloader: depayloader} + + {event, depayloader} -> + Logger.info("Received DTMF event: #{event.event}") + %{state | depayloader: depayloader} + end + end) + + {:ok, state} + end +end diff --git a/examples/dtmf/lib/dtmf/router.ex b/examples/dtmf/lib/dtmf/router.ex new file mode 100644 index 00000000..ddd3c639 --- /dev/null +++ b/examples/dtmf/lib/dtmf/router.ex @@ -0,0 +1,15 @@ +defmodule Dtmf.Router do + use Plug.Router + + plug(Plug.Static, at: "/", from: :dtmf) + plug(:match) + plug(:dispatch) + + get "/ws" do + WebSockAdapter.upgrade(conn, Dtmf.PeerHandler, %{}, []) + end + + match _ do + send_resp(conn, 404, "not found") + end +end diff --git a/examples/dtmf/mix.exs b/examples/dtmf/mix.exs new file mode 100644 index 00000000..d1b29cba --- /dev/null +++ b/examples/dtmf/mix.exs @@ -0,0 +1,32 @@ +defmodule Dtmf.MixProject do + use Mix.Project + + def project do + [ + app: :dtmf, + version: "0.1.0", + elixir: "~> 1.18", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger], + mod: {Dtmf, []} + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:plug, "~> 1.15.0"}, + {:bandit, "~> 1.2.0"}, + {:websock_adapter, "~> 0.5.0"}, + {:jason, "~> 1.4.0"}, + {:ex_webrtc, path: "../../."} + ] + end +end diff --git a/examples/dtmf/mix.lock b/examples/dtmf/mix.lock new file mode 100644 index 00000000..31f267e3 --- /dev/null +++ b/examples/dtmf/mix.lock @@ -0,0 +1,36 @@ +%{ + "bandit": {:hex, :bandit, "1.2.3", "a98d664a96fec23b68e776062296d76a94b4459795b38209f4ae89cb4225709c", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "3e29150245a9b5f56944434e5240966e75c917dad248f689ab589b32187a81af"}, + "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, + "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"}, + "bundlex": {:hex, :bundlex, "1.5.4", "3726acd463f4d31894a59bbc177c17f3b574634a524212f13469f41c4834a1d9", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "e745726606a560275182a8ac1c8ebd5e11a659bb7460d8abf30f397e59b4c5d2"}, + "crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"}, + "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, + "ex_dtls": {:hex, :ex_dtls, "0.17.0", "dbe1d494583a307c26148cb5ea5d7c14e65daa8ec96cc73002cc3313ce4b9a81", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "3eaa7221ec08fa9e4bc9430e426cbd5eb4feb8d8f450b203cf39b2114a94d713"}, + "ex_ice": {:hex, :ex_ice, "0.12.0", "b52ec3ff878d5fb632ef9facc7657dfdf59e2ff9f23e634b0918e6ce1a05af48", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.2.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "a86024a5fbf9431082784be4bb3606d3cde9218fb325a9f208ccd6e0abfd0d73"}, + "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, + "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, + "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, + "ex_sdp": {:hex, :ex_sdp, "1.1.1", "1a7b049491e5ec02dad9251c53d960835dc5631321ae978ec331831f3e4f6d5f", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "1b13a72ac9c5c695b8824dbdffc671be8cbb4c0d1ccb4ff76a04a6826759f233"}, + "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, + "ex_turn": {:hex, :ex_turn, "0.2.0", "4e1f9b089e9a5ee44928d12370cc9ea7a89b84b2f6256832de65271212eb80de", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "08e884f0af2c4a147e3f8cd4ffe33e3452a256389f0956e55a8c4d75bf0e74cd"}, + "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, + "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"}, + "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, + "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, + "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, + "req": {:hex, :req, "0.5.10", "a3a063eab8b7510785a467f03d30a8d95f66f5c3d9495be3474b61459c54376c", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "8a604815743f8a2d3b5de0659fa3137fa4b1cffd636ecb69b30b2b9b2c2559be"}, + "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "thousand_island": {:hex, :thousand_island, "1.3.13", "d598c609172275f7b1648c9f6eddf900e42312b09bfc2f2020358f926ee00d39", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5a34bdf24ae2f965ddf7ba1a416f3111cfe7df50de8d66f6310e01fc2e80b02a"}, + "unifex": {:hex, :unifex, "1.2.1", "6841c170a6e16509fac30b19e4e0a19937c33155a59088b50c15fc2c36251b6b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "8c9d2e3c48df031e9995dd16865bab3df402c0295ba3a31f38274bb5314c7d37"}, + "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.8", "3b97dc94e407e2d1fc666b2fb9acf6be81a1798a2602294aac000260a7c4a47d", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "315b9a1865552212b5f35140ad194e67ce31af45bcee443d4ecb96b5fd3f3782"}, + "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"}, +} diff --git a/examples/dtmf/priv/static/index.html b/examples/dtmf/priv/static/index.html new file mode 100644 index 00000000..fb649e95 --- /dev/null +++ b/examples/dtmf/priv/static/index.html @@ -0,0 +1,54 @@ + + + + + + + + Elixir WebRTC Dtmf Example + + + +

Elixir WebRTC Dtmf Example

+

A simple example showing how to send DTMF tones using JS API and receive them in Elixir WebRTC.
+ Click buttons on the dial pad and observe that DTMF events are logged in the terminal runnig Elixir code. +

+
+
+ + + + +
+
+ + + + +
+
+ + + + +
+
+ + + + +
+
+ +
+ Sent tones: + +
+ +
+
+ + + + + \ No newline at end of file diff --git a/examples/dtmf/priv/static/script.js b/examples/dtmf/priv/static/script.js new file mode 100644 index 00000000..6588f217 --- /dev/null +++ b/examples/dtmf/priv/static/script.js @@ -0,0 +1,66 @@ +const pcConfig = { iceServers: [{ urls: "stun:stun.l.google.com:19302" }] }; +const mediaConstraints = { video: false, audio: true }; + +const proto = window.location.protocol === "https:" ? "wss:" : "ws:"; +const ws = new WebSocket(`${proto}//${window.location.host}/ws`); +ws.onopen = (_) => start_connection(ws); +ws.onclose = (event) => + console.log("WebSocket connection was terminated:", event); + +const start_connection = async (ws) => { + const pc = new RTCPeerConnection(pcConfig); + // expose pc for easier debugging and experiments + window.pc = pc; + pc.onicecandidate = (event) => { + if (event.candidate === null) return; + + console.log("Sent ICE candidate:", event.candidate); + ws.send(JSON.stringify({ type: "ice", data: event.candidate })); + }; + + pc.onconnectionstatechange = () => { + document.getElementById( + "connection-state" + ).innerText += `Connection state change: ${pc.connectionState}\n`; + + if (pc.connectionState === "connected") { + pc.getSenders()[0].dtmf.ontonechange = (ev) => { + if (ev.tone !== "") { + document.getElementById("sent-tones").value += `${ev.tone}`; + } + }; + + const dialPad = document.getElementById("dial-pad"); + const buttons = dialPad.getElementsByTagName("button"); + for (let i = 0; i !== buttons.length; i++) { + buttons[i].onclick = (event) => { + pc.getSenders()[0].dtmf.insertDTMF(event.target.textContent); + }; + } + } + }; + + const localStream = await navigator.mediaDevices.getUserMedia( + mediaConstraints + ); + pc.addTrack(localStream.getAudioTracks()[0]); + + ws.onmessage = async (event) => { + const { type, data } = JSON.parse(event.data); + + switch (type) { + case "answer": + console.log("Received SDP answer:", data); + await pc.setRemoteDescription(data); + break; + case "ice": + console.log("Received ICE candidate:", data); + await pc.addIceCandidate(data); + } + }; + + const offer = await pc.createOffer(); + await pc.setLocalDescription(offer); + console.log("Sent SDP offer:", offer); + ws.send(JSON.stringify({ type: "offer", data: offer })); +}; diff --git a/lib/ex_webrtc/rtp/depayloader.ex b/lib/ex_webrtc/rtp/depayloader.ex index d8aca10d..79b067cd 100644 --- a/lib/ex_webrtc/rtp/depayloader.ex +++ b/lib/ex_webrtc/rtp/depayloader.ex @@ -24,10 +24,11 @@ defmodule ExWebRTC.RTP.Depayloader do @doc """ Processes binary data from a single RTP packet, and outputs a frame if assembled. - Returns the frame (or `nil` if a frame could not be depayloaded yet) - together with the updated depayloader. + Returns a tuple where the first element is a frame, dtmf event (map), or `nil` if a frame/dtmf event + could not be depayloaded yet, and the second element is the updated depayloader. """ - @spec depayload(depayloader(), ExRTP.Packet.t()) :: {binary() | nil, depayloader()} + @spec depayload(depayloader(), ExRTP.Packet.t()) :: + {(frame :: binary()) | (dtmf_event :: map()) | nil, depayloader()} def depayload(%module{} = depayloader, packet) do module.depayload(depayloader, packet) end @@ -38,6 +39,7 @@ defmodule ExWebRTC.RTP.Depayloader do "audio/opus" -> {:ok, ExWebRTC.RTP.Depayloader.Opus} "audio/pcma" -> {:ok, ExWebRTC.RTP.Depayloader.G711} "audio/pcmu" -> {:ok, ExWebRTC.RTP.Depayloader.G711} + "audio/telephone-event" -> {:ok, ExWebRTC.RTP.Depayloader.DTMF} _other -> {:error, :no_depayloader_for_codec} end end diff --git a/lib/ex_webrtc/rtp/dtmf/depayloader.ex b/lib/ex_webrtc/rtp/dtmf/depayloader.ex new file mode 100644 index 00000000..1d31a6fd --- /dev/null +++ b/lib/ex_webrtc/rtp/dtmf/depayloader.ex @@ -0,0 +1,55 @@ +defmodule ExWebRTC.RTP.Depayloader.DTMF do + @moduledoc false + # Decapsulates DTMF tones out of RTP packet. + # + # Notes: + # * this depayloader does only return notification about the start of the new DTMF event + # * there is no support for detecting the end of the event. + # In particular there is no support for the duration of the event. + # * we assume there is always only one DTMF event in one RTP packet + # + # Based on [RFC 4733](https://datatracker.ietf.org/doc/html/rfc4733) + + alias ExRTP.Packet + + @behaviour ExWebRTC.RTP.Depayloader.Behaviour + + @type t :: %__MODULE__{} + + defstruct last_event_timestamp: nil + + @impl true + def new() do + %__MODULE__{last_event_timestamp: nil} + end + + @impl true + def depayload(%__MODULE__{} = depayloader, %Packet{payload: payload} = packet) do + case payload do + <> -> + # As described in RFC 4733, sec. 2.5.1.4: + # The final packet for each event and for each segment SHOULD be sent a + # total of three times at the interval used by the source for updates. + # Hence, we need to check against timestamp, not to report the same event multiple times. + if packet.marker == true and + (depayloader.last_event_timestamp == nil or + depayloader.last_event_timestamp < packet.timestamp) do + depayloader = %{depayloader | last_event_timestamp: packet.timestamp} + {%{code: event, event: event_to_string(event)}, depayloader} + else + {nil, depayloader} + end + + _ -> + {nil, depayloader} + end + end + + defp event_to_string(num) when num in 0..9, do: "#{num}" + defp event_to_string(10), do: "*" + defp event_to_string(11), do: "#" + defp event_to_string(12), do: "A" + defp event_to_string(13), do: "B" + defp event_to_string(14), do: "C" + defp event_to_string(15), do: "D" +end diff --git a/lib/ex_webrtc/rtp/g711/depayloader.ex b/lib/ex_webrtc/rtp/g711/depayloader.ex index 4abab631..9e295dd0 100644 --- a/lib/ex_webrtc/rtp/g711/depayloader.ex +++ b/lib/ex_webrtc/rtp/g711/depayloader.ex @@ -13,13 +13,11 @@ defmodule ExWebRTC.RTP.Depayloader.G711 do defstruct [] @impl true - @spec new() :: t() def new() do %__MODULE__{} end @impl true - @spec depayload(t(), Packet.t()) :: {binary(), t()} def depayload(%__MODULE__{} = depayloader, %Packet{payload: payload}), do: {payload, depayloader} end diff --git a/lib/ex_webrtc/rtp/g711/payloader.ex b/lib/ex_webrtc/rtp/g711/payloader.ex index 82bed3bc..dccfdd60 100644 --- a/lib/ex_webrtc/rtp/g711/payloader.ex +++ b/lib/ex_webrtc/rtp/g711/payloader.ex @@ -16,7 +16,6 @@ defmodule ExWebRTC.RTP.Payloader.G711 do end @impl true - @spec payload(t(), binary()) :: {[ExRTP.Packet.t()], t()} def payload(%__MODULE__{} = payloader, packet) when packet != <<>> do {[ExRTP.Packet.new(packet)], payloader} end diff --git a/lib/ex_webrtc/rtp/opus/depayloader.ex b/lib/ex_webrtc/rtp/opus/depayloader.ex index 1bb36bcf..17ade7eb 100644 --- a/lib/ex_webrtc/rtp/opus/depayloader.ex +++ b/lib/ex_webrtc/rtp/opus/depayloader.ex @@ -13,13 +13,11 @@ defmodule ExWebRTC.RTP.Depayloader.Opus do defstruct [] @impl true - @spec new() :: t() def new() do %__MODULE__{} end @impl true - @spec depayload(t(), Packet.t()) :: {binary(), t()} def depayload(%__MODULE__{} = depayloader, %Packet{payload: payload}), do: {payload, depayloader} end diff --git a/lib/ex_webrtc/rtp/opus/payloader.ex b/lib/ex_webrtc/rtp/opus/payloader.ex index a0e8215a..34e235c5 100644 --- a/lib/ex_webrtc/rtp/opus/payloader.ex +++ b/lib/ex_webrtc/rtp/opus/payloader.ex @@ -16,7 +16,6 @@ defmodule ExWebRTC.RTP.Payloader.Opus do end @impl true - @spec payload(t(), binary()) :: {[ExRTP.Packet.t()], t()} def payload(%__MODULE__{} = payloader, packet) when packet != <<>> do {[ExRTP.Packet.new(packet)], payloader} end diff --git a/lib/ex_webrtc/rtp_receiver.ex b/lib/ex_webrtc/rtp_receiver.ex index 64e63828..079de4a1 100644 --- a/lib/ex_webrtc/rtp_receiver.ex +++ b/lib/ex_webrtc/rtp_receiver.ex @@ -17,6 +17,7 @@ defmodule ExWebRTC.RTPReceiver do id: id(), track: MediaStreamTrack.t(), codec: RTPCodecParameters.t() | nil, + codecs: %{(payload_type :: non_neg_integer()) => RTPCodecParameters.t()}, simulcast_demuxer: SimulcastDemuxer.t(), reports?: boolean(), inbound_rtx?: boolean(), @@ -46,17 +47,17 @@ defmodule ExWebRTC.RTPReceiver do @type t() :: %__MODULE__{ id: id(), track: MediaStreamTrack.t(), - codec: RTPCodecParameters.t() | nil + codecs: %{(pt :: non_neg_integer()) => RTPCodecParameters.t()} } - @enforce_keys [:id, :track, :codec] + @enforce_keys [:id, :track, :codec, :codecs] defstruct @enforce_keys @doc false @spec to_struct(receiver()) :: t() def to_struct(receiver) do receiver - |> Map.take([:id, :track, :codec]) + |> Map.take([:id, :track, :codec, :codecs]) |> then(&struct!(__MODULE__, &1)) end @@ -65,12 +66,14 @@ defmodule ExWebRTC.RTPReceiver do def new(track, codecs, rtp_hdr_exts, features) do {_rtx_codecs, media_codecs} = Utils.split_rtx_codecs(codecs) codec = List.first(media_codecs) + codecs = Map.new(media_codecs, fn codec -> {codec.payload_type, codec} end) # layer `nil` is for the packets without RID/ no simulcast %{ id: Utils.generate_id(), track: track, codec: codec, + codecs: codecs, simulcast_demuxer: SimulcastDemuxer.new(rtp_hdr_exts), reports?: :rtcp_reports in features, inbound_rtx?: :inbound_rtx in features, @@ -83,6 +86,7 @@ defmodule ExWebRTC.RTPReceiver do def update(receiver, codecs, rtp_hdr_exts, stream_ids) do {_rtx_codecs, media_codecs} = Utils.split_rtx_codecs(codecs) codec = List.first(media_codecs) + codecs = Map.new(media_codecs, fn codec -> {codec.payload_type, codec} end) simulcast_demuxer = SimulcastDemuxer.update(receiver.simulcast_demuxer, rtp_hdr_exts) track = %MediaStreamTrack{receiver.track | streams: stream_ids} @@ -99,6 +103,7 @@ defmodule ExWebRTC.RTPReceiver do %{ receiver | codec: codec, + codecs: codecs, simulcast_demuxer: simulcast_demuxer, layers: layers, track: track @@ -109,9 +114,19 @@ defmodule ExWebRTC.RTPReceiver do @spec receive_packet(receiver(), ExRTP.Packet.t(), non_neg_integer()) :: {String.t() | nil, receiver()} def receive_packet(receiver, packet, size) do - if packet.payload_type != receiver.codec.payload_type do + packet_codec = Map.get(receiver.codecs, packet.payload_type) + + if packet_codec == nil do Logger.warning("Received packet with unexpected payload_type \ -(received #{packet.payload_type}, expected #{receiver.codec.payload_type})") +(received #{packet.payload_type}, expected #{inspect(Map.keys(receiver.codecs))})") + end + + if receiver.codec.clock_rate != packet_codec.clock_rate do + Logger.warning(""" + Received packet with non-matching clock-rate. This can result + in incorrect jitter in RTCP reports. Expected: #{receiver.codec.clock_rate}, \ + received: #{packet_codec.clock_rate}\ + """) end {rid, simulcast_demuxer} = SimulcastDemuxer.demux_packet(receiver.simulcast_demuxer, packet) diff --git a/test/ex_webrtc/rtp/depayloader_test.exs b/test/ex_webrtc/rtp/depayloader_test.exs index 003868d1..f25df040 100644 --- a/test/ex_webrtc/rtp/depayloader_test.exs +++ b/test/ex_webrtc/rtp/depayloader_test.exs @@ -61,6 +61,20 @@ defmodule ExWebRTC.RTP.DepayloaderTest do Depayloader.G711.depayload(depayloader, @packet) end + test "creates an DTMF depayloader and dispatches calls to its module" do + assert {:ok, depayloader} = + %RTPCodecParameters{ + payload_type: 110, + mime_type: "audio/telephone-event", + clock_rate: 8_000, + channels: 1 + } + |> Depayloader.new() + + assert Depayloader.depayload(depayloader, @packet) == + Depayloader.DTMF.depayload(depayloader, @packet) + end + test "returns error if no depayloader exists for given codec" do assert {:error, :no_depayloader_for_codec} = %RTPCodecParameters{payload_type: 97, mime_type: "video/H264", clock_rate: 90_000} diff --git a/test/ex_webrtc/rtp/dtmf/depayloader_test.exs b/test/ex_webrtc/rtp/dtmf/depayloader_test.exs new file mode 100644 index 00000000..4732387b --- /dev/null +++ b/test/ex_webrtc/rtp/dtmf/depayloader_test.exs @@ -0,0 +1,58 @@ +defmodule ExWebRTC.RTP.DTMF.DepayloaderTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTP.Depayloader + alias ExRTP.Packet + + test "does not return multiple events when timestamp does not change" do + depayloader = Depayloader.DTMF.new() + + # Marker denotes beginning of a new event. + # The last packet is transmitted 3 times. + ev = <<0::8, 1::1, 0::1, 0::6, 8000::16>> + packet1 = Packet.new(ev, marker: true, sequence_number: 1234, timestamp: 1234) + packet2 = Packet.new(ev, marker: true, sequence_number: 1235, timestamp: 1234) + packet3 = Packet.new(ev, marker: true, sequence_number: 1236, timestamp: 1234) + + assert {%{event: "0", code: 0}, depayloader} = + Depayloader.DTMF.depayload(depayloader, packet1) + + assert {nil, depayloader} = Depayloader.DTMF.depayload(depayloader, packet2) + assert {nil, _depayloader} = Depayloader.DTMF.depayload(depayloader, packet3) + end + + test "does not return multiple events when the event is split across multiple RTP packets" do + depayloader = Depayloader.DTMF.new() + + packet1 = + Packet.new(<<0::8, 0::1, 0::1, 0::6, 0xFF::16>>, + marker: true, + sequence_number: 1234, + timestamp: 1234 + ) + + assert {%{event: "0", code: 0}, depayloader} = + Depayloader.DTMF.depayload(depayloader, packet1) + + packet2 = + Packet.new(<<0::8, 1::1, 0::1, 0::6, 8000::16>>, + marker: false, + sequence_number: 1235, + timestamp: 1234 + 0xFF + ) + + assert {nil, _depayloader} = Depayloader.DTMF.depayload(depayloader, packet2) + end + + test "ignores invalid packets" do + depayloader = Depayloader.DTMF.new() + + ev = <<>> + packet = Packet.new(ev, marker: true) + assert {nil, depayloader} = Depayloader.DTMF.depayload(depayloader, packet) + + ev = <<1, 2, 3, 4, 5>> + packet = Packet.new(ev, marker: true) + assert {nil, _depayloader} = Depayloader.DTMF.depayload(depayloader, packet) + end +end diff --git a/test/ex_webrtc/rtp/vp8/depayloader_test.exs b/test/ex_webrtc/rtp/vp8/depayloader_test.exs index ff656194..ff4e1787 100644 --- a/test/ex_webrtc/rtp/vp8/depayloader_test.exs +++ b/test/ex_webrtc/rtp/vp8/depayloader_test.exs @@ -4,7 +4,7 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do alias ExWebRTC.RTP.Depayloader alias ExWebRTC.RTP.VP8.Payload - test "write/2" do + test "depayload/2" do depayloader = Depayloader.VP8.new() # random vp8 data, not necessarily correct data = <<0, 1, 2, 3>>