diff --git a/README.md b/README.md index 5215f89..1ed0524 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Add the following line to your deps in `mix.exs`: ```elixir def deps do [ - {:membrane_rtsp_plugin, "~> 0.2.0"} + {:membrane_rtsp_plugin, "~> 0.3.0"} ] end ``` @@ -68,4 +68,4 @@ defmodule RtspPipeline do {[], state} end end -``` \ No newline at end of file +``` diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index d26d079..ab788f3 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -12,9 +12,10 @@ defmodule Membrane.RTSP.Source do * `H264` * `H265` - ### Notifications - * `{:new_track, ssrc, track}` - sent when the track is parsed and available for consumption by the next - elements. An output pad `Pad.ref(:output, ssrc)` should be linked to receive the data. + When the element finishes setting up all tracks it will send a `t:set_up_tracks/0` notification. + Each time a track is parsed and available for further processing the element will send a + `t:new_track/0` notification. An output pad `Pad.ref(:output, ssrc)` should be linked to receive + the data. """ use Membrane.Bin @@ -25,6 +26,15 @@ defmodule Membrane.RTSP.Source do alias __MODULE__.ConnectionManager alias Membrane.{RTSP, Time} + @type set_up_tracks_notification :: {:set_up_tracks, [track()]} + @type new_track_notification :: {:new_track, ssrc :: pos_integer(), track :: track()} + @type track :: %{ + control_path: String.t(), + type: :video | :audio | :application, + fmtp: ExSDP.Attribute.FMTP.t() | nil, + rtpmap: ExSDP.Attribute.RTPMapping.t() | nil + } + @type transport :: {:udp, port_range_start :: non_neg_integer(), port_range_end :: non_neg_integer()} | :tcp @@ -62,6 +72,15 @@ defmodule Membrane.RTSP.Source do Interval of a heartbeat sent to the RTSP server at a regular interval to keep the session alive. """ + ], + on_connection_closed: [ + spec: :raise_error | :send_eos, + default: :raise_error, + description: """ + Defines the element's behavior if the TCP connection is closed by the RTSP server: + - `:raise_error` - Raise an error. + - `:send_eos` - Send an `:end_of_stream` to the output pad. + """ ] def_output_pad :output, @@ -79,16 +98,26 @@ defmodule Membrane.RTSP.Source do tracks: [ConnectionManager.track()], ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()}, rtsp_session: Membrane.RTSP.t() | nil, - keep_alive_timer: reference() | nil + keep_alive_timer: reference() | nil, + on_connection_closed: :raise_error | :send_eos, + end_of_stream: boolean() } - @enforce_keys [:stream_uri, :allowed_media_types, :transport, :timeout, :keep_alive_interval] + @enforce_keys [ + :stream_uri, + :allowed_media_types, + :transport, + :timeout, + :keep_alive_interval, + :on_connection_closed + ] defstruct @enforce_keys ++ [ tracks: [], ssrc_to_track: %{}, rtsp_session: nil, - keep_alive_timer: nil + keep_alive_timer: nil, + end_of_stream: false ] end @@ -102,7 +131,19 @@ defmodule Membrane.RTSP.Source do @impl true def handle_setup(_ctx, state) do state = ConnectionManager.establish_connection(state) - {[spec: create_sources_spec(state)], state} + + {[spec: create_sources_spec(state), notify_parent: get_set_up_tracks_notification(state)], + state} + end + + @impl true + def handle_child_playing(:rtp_session, _ctx, state) do + {[], ConnectionManager.play(state)} + end + + @impl true + def handle_child_playing(_child, _ctx, state) do + {[], state} end @impl true @@ -114,7 +155,7 @@ defmodule Membrane.RTSP.Source do ) do case Enum.find(state.tracks, fn track -> track.rtpmap.payload_type == pt end) do nil -> - {[], state} + raise "No track of payload type #{inspect(pt)} has been requested with SETUP" track -> ssrc_to_track = Map.put(state.ssrc_to_track, ssrc, track) @@ -136,17 +177,31 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_child_playing(:rtp_session, _ctx, state) do - {[], ConnectionManager.play(state)} + def handle_info( + {:EXIT, rtsp_session, :socket_closed}, + ctx, + %State{rtsp_session: rtsp_session, on_connection_closed: :send_eos} = state + ) do + notify_udp_sources_actions = + ctx.children + |> Map.keys() + |> Enum.filter(&match?({:udp_source, _ref}, &1)) + |> Enum.map(&{:notify_child, {&1, :close_socket}}) + + {notify_udp_sources_actions, %{state | end_of_stream: true}} end @impl true - def handle_child_playing(_child, _ctx, state) do - {[], state} + def handle_info( + {:EXIT, rtsp_session, reason}, + _ctx, + %State{rtsp_session: rtsp_session} = state + ) do + {[terminate: {:rtsp_session_crash, reason}], state} end @impl true - def handle_info(:keep_alive, _ctx, state) do + def handle_info(:keep_alive, _ctx, %{end_of_stream: false} = state) do {[], ConnectionManager.keep_alive(state)} end @@ -173,13 +228,17 @@ defmodule Membrane.RTSP.Source do {[terminate: :normal], state} end + @spec get_set_up_tracks_notification(State.t()) :: set_up_tracks_notification() + def get_set_up_tracks_notification(state) do + {:set_up_tracks, Enum.map(state.tracks, &Map.delete(&1, :transport))} + end + @spec create_sources_spec(State.t()) :: Membrane.ChildrenSpec.t() defp create_sources_spec(state) do fmt_mapping = - Enum.map(state.tracks, fn %{rtpmap: rtpmap} -> + Map.new(state.tracks, fn %{rtpmap: rtpmap} -> {rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}} end) - |> Enum.into(%{}) case state.transport do :tcp -> @@ -187,7 +246,8 @@ defmodule Membrane.RTSP.Source do child(:tcp_source, %Membrane.TCP.Source{ connection_side: :client, - local_socket: socket + local_socket: socket, + on_connection_closed: state.on_connection_closed }) |> child(:tcp_depayloader, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session}) |> via_in(Pad.ref(:rtp_input, make_ref())) @@ -200,10 +260,10 @@ defmodule Membrane.RTSP.Source do {:udp, rtp_port, rtcp_port} = track.transport [ - child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port}) + child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port}) |> via_in(Pad.ref(:rtp_input, make_ref())) |> get_child(:rtp_session), - child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port}) + child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port}) |> via_in(Pad.ref(:rtp_input, make_ref())) |> get_child(:rtp_session) ] diff --git a/lib/membrane_rtsp_plugin/source/connection_manager.ex b/lib/membrane_rtsp_plugin/source/connection_manager.ex index 81d84e4..e4eef63 100644 --- a/lib/membrane_rtsp_plugin/source/connection_manager.ex +++ b/lib/membrane_rtsp_plugin/source/connection_manager.ex @@ -86,6 +86,7 @@ defmodule Membrane.RTSP.Source.ConnectionManager do response_timeout: Membrane.Time.as_milliseconds(state.timeout, :round) ) do {:ok, session} -> + Process.flag(:trap_exit, true) {:ok, %{state | rtsp_session: session}} {:error, reason} -> diff --git a/mix.exs b/mix.exs index c7dd66f..8c45dd4 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTSP.Plugin.Mixfile do use Mix.Project - @version "0.2.0" + @version "0.3.0" @github_url "https://github.com/gBillal/membrane_rtsp_plugin" def project do @@ -37,13 +37,13 @@ defmodule Membrane.RTSP.Plugin.Mixfile do defp deps do [ {:membrane_core, "~> 1.1"}, - {:membrane_rtsp, "~> 0.7.1"}, - {:membrane_rtp_plugin, "~> 0.28.0"}, + {:membrane_rtsp, "~> 0.9.0"}, + {:membrane_rtp_plugin, "~> 0.29.0"}, {:membrane_rtp_h264_plugin, "~> 0.19.0"}, {:membrane_rtp_h265_plugin, "~> 0.5.1"}, - {:membrane_tcp_plugin, "~> 0.4.0"}, + {:membrane_tcp_plugin, "~> 0.6.0"}, {:membrane_h26x_plugin, "~> 0.10.0"}, - {:membrane_udp_plugin, "~> 0.13.0"}, + {:membrane_udp_plugin, "~> 0.14.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, {:credo, ">= 0.0.0", only: :dev, runtime: false}, diff --git a/mix.lock b/mix.lock index 6e7c79e..c3ed528 100644 --- a/mix.lock +++ b/mix.lock @@ -3,40 +3,40 @@ "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, - "credo": {:hex, :credo, "1.7.6", "b8f14011a5443f2839b04def0b252300842ce7388f3af177157c86da18dfbeea", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "146f347fb9f8cbc5f7e39e3f22f70acbef51d441baa6d10169dd604bfbc55296"}, + "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, - "ex_sdp": {:hex, :ex_sdp, "0.15.0", "53815fb5b5e4fae0f3b26de90f372446bb8e0eed62a3cc20394d3c29519698be", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "d3f23596b73e7057521ff0f0d55b1189c6320a2f04388aa3a80a0aa97ffb379f"}, - "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, + "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, + "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, + "ex_sdp": {:hex, :ex_sdp, "0.17.0", "4c50e7814f01f149c0ccf258fba8428f8567dffecf1c416ec3f6aaaac607a161", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "c7fe0625902be2a835b5fe6834a189f7db7639d2625c8e9d8b3564e6d704145f"}, + "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, - "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, - "membrane_core": {:hex, :membrane_core, "1.1.0", "c3bbaa5af7c26a7c3748e573efe343c2104801e3463b9e491a607e82860334a4", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3209d7f7e86d736cb7caffbba16b075c571cebb9439ab939ed6119c50fb59a5"}, - "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.0", "e855a848e84eaed537b41fd4436712038fc5518059eadc8609c83cd2d819653a", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "9c3653ca9f13bb409b36257d6094798d4625c739ab7a4035c12308622eb16e0b"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, + "membrane_core": {:hex, :membrane_core, "1.1.1", "4dcff6e9f3b2ecd4f437c20e201e53957731772c0f15b3005062c41f7f58f500", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3802f3fc071505c59d48792487d9927e803d4edb4039710ffa52cdb60bb0aecc"}, + "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"}, "membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.0", "9cfe09e44d65751f7d9d8d3c42e14797f7be69e793ac112ea63cd224af70a7bf", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "988790aca59d453a6115109f050699f7f45a2eb6a7f8dc5c96392760cddead54"}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"}, "membrane_rtp_format": {:hex, :membrane_rtp_format, "0.8.0", "828924bbd27efcf85b2015ae781e824c4a9928f0a7dc132abc66817b2c6edfc4", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "bc75d2a649dfaef6df563212fbb9f9f62eebc871393692f9dae8d289bd4f94bb"}, - "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.19.1", "8c61b3d2968e54e1b459a42f070ea71f597056eba4059df780eaa8da8aee6035", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "fb2bb5d4ed18f38523851cb449a2c78ed533e58028dc058342b8cfd659f812d5"}, + "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.19.2", "de3eeaf35052f9f709d469fa7630d9ecc8f5787019f7072516eae1fd881bc792", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "d298e9cd471ab3601366c48ca0fec84135966707500152bbfcf3f968700647ae"}, "membrane_rtp_h265_plugin": {:hex, :membrane_rtp_h265_plugin, "0.5.1", "1e72309e340eaae5fce04f47b7b563accd563ab10bac139596626f0f0b4c72af", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "283d4b1b0271719f300b3bad4e05bef4db1cf3190f87291785e3f973106a1476"}, - "membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.28.0", "46654f458033a2c5b0c44f528cd6723df4f78705ae64ad0d98bcd58d93b2feda", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtsp, "~> 0.7.0", [hex: :membrane_rtsp, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "6413aebfc6942c38ba71bd19f3fbf6eaa2ba320555d724183350d96ecc153a16"}, - "membrane_rtsp": {:hex, :membrane_rtsp, "0.7.1", "4f54d0d335fe9c2fb6e6e5c10d244db78f7e3ba781b6daf18094c9feff296c6c", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.15.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.4.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e47f9cd0caf02730d723dc3785bcce4a1cfafcc3bb0f434c63abb5a5db0f7978"}, - "membrane_tcp_plugin": {:hex, :membrane_tcp_plugin, "0.4.0", "95a9a2fcbe7c94e3f18b61ef1d35ea07ccb96838185b8c2f2e3ea890a33f3473", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "27df6c36381cbfabf039a6bfd53469e1de61d10cda54f1015d1e4ac12f152834"}, + "membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.29.0", "0277310eb599b8e6de9e0b864807f23b3b245865e39a28f0cbab695d1f2c157e", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "1b3fd808114e06332b6a4e000238998a9188d1ef625c414ca3239aee70f0775d"}, + "membrane_rtsp": {:hex, :membrane_rtsp, "0.9.0", "6232f716bdf128b9893bc8302d6d81a0374eef2f114a65bf0eafcdb8d681a98f", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.17.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.4.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "b1c4c91dd38b3c4c41f39c7ba025c892e6c85a23eabd1a5e2bdc6f555cbc39e9"}, + "membrane_tcp_plugin": {:hex, :membrane_tcp_plugin, "0.6.0", "1f8dba5525504fb2d49070932f24113d1b26c7e5429c700671ed80433ac83f2f", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "820440f5a8181a96cff461ad2d5ed426d47eacfdd7764dd9596dad68ad892d3d"}, "membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"}, - "membrane_udp_plugin": {:hex, :membrane_udp_plugin, "0.13.0", "c4d10b4cb152a95779e36fac4338e11ef0b0cb545c78ca337d7676f6df5d5709", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "47a1661038ef65025fe36cfcae8ce23c022f9dc0867b8340c46dd4963f5a1bcb"}, - "mimic": {:hex, :mimic, "1.7.4", "cd2772ffbc9edefe964bc668bfd4059487fa639a5b7f1cbdf4fd22946505aa4f", [:mix], [], "hexpm", "437c61041ecf8a7fae35763ce89859e4973bb0666e6ce76d75efc789204447c3"}, - "mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"}, + "membrane_udp_plugin": {:hex, :membrane_udp_plugin, "0.14.0", "d533ee5f6fcdd0551ad690045cdb6c1a76307a155d9255cc4a4606f85774bc37", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "902d1a7aa228ec377482d53a605b100e20e0b6e59196f94f94147bb62b23c47e"}, + "mimic": {:hex, :mimic, "1.9.0", "c96367749a884556718f64657a4bdc99ce0cb5d19333aa04308fbd061c31b8b7", [:mix], [], "hexpm", "92107697938490b300566317c2a1490ef52e23aeac16632c0e56740721189116"}, + "mockery": {:hex, :mockery, "2.3.3", "3dba87bd0422a513e6af6e0d811383f38f82ac6be5d3d285a5fcca9c299bd0ac", [:mix], [], "hexpm", "17282be00613286254298117cd25e607a39f15ac03b41c631f60e52f5b5ec974"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.2", "2caabe9344ec17eafe5403304771c3539f3b6e2f7fb6a6f602558c825d0d0bfb", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b43db0dc33863930b9ef9d27137e78974756f5f198cae18409970ed6fa5b561"}, } diff --git a/test/membrane_rtsp_plugin/source/connection_manager_test.exs b/test/membrane_rtsp_plugin/source/connection_manager_test.exs index 79b992e..0197cc5 100644 --- a/test/membrane_rtsp_plugin/source/connection_manager_test.exs +++ b/test/membrane_rtsp_plugin/source/connection_manager_test.exs @@ -31,7 +31,8 @@ defmodule Membrane.RTSP.Source.ConnectionManagerTest do allowed_media_types: @allowed_media_types, transport: :tcp, timeout: Membrane.Time.seconds(15), - keep_alive_interval: Membrane.Time.seconds(15) + keep_alive_interval: Membrane.Time.seconds(15), + on_connection_closed: :raise_error } %{state: state} diff --git a/test/membrane_rtsp_plugin/source_test.exs b/test/membrane_rtsp_plugin/source_test.exs index e026b52..ec838d4 100644 --- a/test/membrane_rtsp_plugin/source_test.exs +++ b/test/membrane_rtsp_plugin/source_test.exs @@ -26,12 +26,14 @@ defmodule Membrane.RTSP.SourceTest do allowed_media_types: opts[:allowed_media_types] || [:video, :audio, :application], stream_uri: "rtsp://localhost:#{opts[:port]}/", timeout: opts[:timeout] || Membrane.Time.seconds(15), - keep_alive_interval: opts[:keep_alive_interval] || Membrane.Time.seconds(15) + keep_alive_interval: opts[:keep_alive_interval] || Membrane.Time.seconds(15), + on_connection_closed: :send_eos }) {[spec: spec], %{dest_folder: opts[:dest_folder]}} end + @impl true def handle_child_notification({:new_track, ssrc, track}, _element, _ctx, state) do file_name = case track.rtpmap.encoding do @@ -83,6 +85,14 @@ defmodule Membrane.RTSP.SourceTest do pid = Membrane.Testing.Pipeline.start_link_supervised!(options) + assert_pipeline_notified(pid, :source, {:set_up_tracks, tracks}) + + assert [ + %{type: :video, rtpmap: %{encoding: "H264"}}, + %{type: :video, rtpmap: %{encoding: "H265"}}, + %{type: :application, rtpmap: %{encoding: "plain"}} + ] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end) + assert_pipeline_notified( pid, :source, @@ -124,11 +134,21 @@ defmodule Membrane.RTSP.SourceTest do test "stream specific media using tcp", %{server_port: port, tmp_dir: tmp_dir} do options = [ module: TestPipeline, - custom_args: %{port: port, dest_folder: tmp_dir, allowed_media_types: [:application]} + custom_args: %{ + port: port, + dest_folder: tmp_dir, + allowed_media_types: [:application] + } ] pid = Membrane.Testing.Pipeline.start_link_supervised!(options) + assert_pipeline_notified( + pid, + :source, + {:set_up_tracks, [%{type: :application, rtpmap: %{encoding: "plain"}}]} + ) + assert_pipeline_notified( pid, :source, @@ -164,6 +184,14 @@ defmodule Membrane.RTSP.SourceTest do pid = Membrane.Testing.Pipeline.start_link_supervised!(options) + assert_pipeline_notified(pid, :source, {:set_up_tracks, tracks}) + + assert [ + %{type: :video, rtpmap: %{encoding: "H264"}}, + %{type: :video, rtpmap: %{encoding: "H265"}}, + %{type: :application, rtpmap: %{encoding: "plain"}} + ] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end) + assert_pipeline_notified( pid, :source, @@ -182,7 +210,10 @@ defmodule Membrane.RTSP.SourceTest do {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} ) - Process.sleep(500) + assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) + assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) + assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) + :ok = Membrane.Testing.Pipeline.terminate(pid) :gen_udp.close(blocking_socket1) diff --git a/test/support/handler.ex b/test/support/handler.ex index f8f85a5..dc95278 100644 --- a/test/support/handler.ex +++ b/test/support/handler.ex @@ -1,7 +1,7 @@ defmodule Membrane.RTSP.RequestHandler do @moduledoc false - @behaviour Membrane.RTSP.Server.Handler + use Membrane.RTSP.Server.Handler require Logger @@ -26,7 +26,7 @@ defmodule Membrane.RTSP.RequestHandler do """ @impl true - def handle_open_connection(_conn) do + def init(_config) do sources = %{ H264: %{ encoding: :H264, @@ -50,10 +50,16 @@ defmodule Membrane.RTSP.RequestHandler do %{ sources: sources, - pipeline_pid: nil + pipeline_pid: nil, + socket: nil } end + @impl true + def handle_open_connection(conn, state) do + %{state | socket: conn} + end + @impl true def handle_describe(_req, state) do Response.new(200) @@ -80,8 +86,10 @@ defmodule Membrane.RTSP.RequestHandler do Map.put(sources, source_key, Map.merge(state.sources[source_key], config)) end) + arg = %{sources: Map.values(sources), socket: state.socket} + {:ok, _sup_pid, pipeline_pid} = - Membrane.RTSP.ServerPipeline.start_link(Map.values(sources)) + Membrane.RTSP.ServerPipeline.start_link(arg) {Response.new(200), %{state | pipeline_pid: pipeline_pid}} end @@ -93,6 +101,7 @@ defmodule Membrane.RTSP.RequestHandler do @impl true def handle_teardown(state) do + :gen_tcp.close(state.socket) {Response.new(200), state} end diff --git a/test/support/server_pipeline.ex b/test/support/server_pipeline.ex index cd669c5..930bb43 100644 --- a/test/support/server_pipeline.ex +++ b/test/support/server_pipeline.ex @@ -87,9 +87,9 @@ defmodule Membrane.RTSP.ServerPipeline do plain: Membrane.RTP.Plain.Payloader } - @spec start_link(list()) :: Membrane.Pipeline.on_start() - def start_link(sources) do - Membrane.Pipeline.start_link(__MODULE__, sources) + @spec start_link(%{sources: list(), socket: :gen_tcp.socket()}) :: Membrane.Pipeline.on_start() + def start_link(config) do + Membrane.Pipeline.start_link(__MODULE__, config) end @impl true @@ -99,7 +99,7 @@ defmodule Membrane.RTSP.ServerPipeline do end @impl true - def handle_init(_ctx, sources) do + def handle_init(_ctx, %{sources: sources, socket: socket}) do spec = [child(:session_bin, Membrane.RTP.SessionBin)] ++ Enum.flat_map(sources, fn source -> @@ -117,12 +117,13 @@ defmodule Membrane.RTSP.ServerPipeline do ] end) - {[spec: spec], %{total_sources: length(sources), ended_stream: 0}} + {[spec: spec], %{total_sources: length(sources), ended_stream: 0, socket: socket}} end @impl true def handle_element_end_of_stream({:sink, _ref}, _pad, _ctx, state) do if state.ended_stream + 1 == state.total_sources do + :gen_tcp.close(state.socket) {[terminate: :shutdown], state} else {[], %{state | ended_stream: state.ended_stream + 1}}