diff --git a/README.md b/README.md index fcb2aca..895a851 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Add the following line to your deps in `mix.exs`: ```elixir def deps do [ - {:membrane_rtsp_plugin, "~> 0.6.1"} + {:membrane_rtsp_plugin, "~> 0.7.0"} ] end ``` @@ -45,15 +45,13 @@ defmodule RtspPipeline do end @impl true - def handle_child_notification({:new_track, ssrc, _track}, _element, _ctx, state) do - spec = [ - get_child(:source) - |> via_out(Pad.ref(:output, ssrc)) - |> child(:funnel, Membrane.Funnel) - |> child(:sink, , %Membrane.File.Source{ - location: "video.h264" - }) - ] + def handle_child_notification({:set_up_tracks, tracks}, _element, _ctx, state) do + spec = + Enum.map(track, fn track -> + get_child(:source) + |> via_out(Pad.ref(:output, track.control_path)) + |> child(:sink, %Membrane.File.Source{location: "video.h264"}) + end) {[spec: spec], state} end @@ -62,10 +60,5 @@ defmodule RtspPipeline do def handle_child_notification(_message, _element, _ctx, state) do {[], state} end - - @impl true - def handle_child_pad_removed(:source, _pad, _ctx, state) do - {[], state} - end end ``` diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index 1508a99..a18523b 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -2,7 +2,7 @@ defmodule Membrane.RTSP.Source do @moduledoc """ Source bin responsible for connecting to an RTSP server. - This element connects to an RTSP server, depayload and parses the received media if possible. + This element connects to an RTSP server, depayloads and parses the received media if possible. If there's no suitable depayloader and parser, the raw payload is sent to the subsequent elements in the pipeline. @@ -15,9 +15,8 @@ defmodule Membrane.RTSP.Source do * `Opus` When the element finishes setting up all tracks it will send a `t:set_up_tracks/0` notification. - Each time a track is parsed and available for further processing the element will send a - `t:new_track/0` notification. An output pad `Pad.ref(:output, ssrc)` should be linked to receive - the data. + To receive a track a corresponding `Pad.ref(:output, control_path)` pad has to be connected, + where each track's control path is provided in the `t:set_up_tracks/0` notification. """ use Membrane.Bin @@ -29,7 +28,6 @@ defmodule Membrane.RTSP.Source do alias Membrane.{RTSP, Time} @type set_up_tracks_notification :: {:set_up_tracks, [track()]} - @type new_track_notification :: {:new_track, ssrc :: pos_integer(), track :: track()} @type track :: %{ control_path: String.t(), type: :video | :audio | :application, @@ -99,11 +97,11 @@ defmodule Membrane.RTSP.Source do timeout: Time.t(), keep_alive_interval: Time.t(), tracks: [ConnectionManager.track()], - ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()}, rtsp_session: Membrane.RTSP.t() | nil, keep_alive_timer: reference() | nil, on_connection_closed: :raise_error | :send_eos, - end_of_stream: boolean() + end_of_stream: boolean(), + play_request_sent: boolean() } @enforce_keys [ @@ -120,7 +118,8 @@ defmodule Membrane.RTSP.Source do ssrc_to_track: %{}, rtsp_session: nil, keep_alive_timer: nil, - end_of_stream: false + end_of_stream: false, + play_request_sent: false ] end @@ -140,7 +139,7 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_child_playing(:rtp_session, _ctx, state) do + def handle_child_playing(_child, _ctx, %State{play_request_sent: false} = state) do {[], ConnectionManager.play(state)} end @@ -149,25 +148,6 @@ defmodule Membrane.RTSP.Source do {[], state} end - @impl true - def handle_child_notification( - {:new_rtp_stream, ssrc, pt, _extensions}, - :rtp_session, - _ctx, - state - ) do - case Enum.find(state.tracks, fn track -> track.rtpmap.payload_type == pt end) do - nil -> - raise "No track of payload type #{inspect(pt)} has been requested with SETUP" - - track -> - ssrc_to_track = Map.put(state.ssrc_to_track, ssrc, track) - - {[notify_parent: {:new_track, ssrc, Map.delete(track, :transport)}], - %{state | ssrc_to_track: ssrc_to_track}} - end - end - @impl true def handle_child_notification({:request_socket_control, _socket, pid}, :tcp_source, _ctx, state) do RTSP.transfer_socket_control(state.rtsp_session, pid) @@ -175,7 +155,8 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_child_notification(_notification, _element, _ctx, state) do + def handle_child_notification(notification, _element, _ctx, state) do + Membrane.Logger.warning("Ignoring child notification: #{inspect(notification)}") {[], state} end @@ -216,12 +197,22 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do - track = Map.fetch!(state.ssrc_to_track, ssrc) + def handle_pad_added(Pad.ref(:output, control_path) = pad, _ctx, state) do + track = Enum.find(state.tracks, &(&1.control_path == control_path)) + + demuxer_name = + case state.transport do + :tcp -> :rtp_demuxer + {:udp, _port_range_start, _port_range_end} -> {:rtp_demuxer, track.control_path} + end spec = - get_child(:rtp_session) - |> via_out(Pad.ref(:output, ssrc), options: [depayloader: get_rtp_depayloader(track)]) + get_child(demuxer_name) + |> via_out(:output, options: [stream_id: {:payload_type, track.rtpmap.payload_type}]) + |> child({:jitter_buffer, make_ref()}, %Membrane.RTP.JitterBuffer{ + clock_rate: track.rtpmap.clock_rate + }) + |> depayloader(track) |> parser(track) |> bin_output(pad) @@ -240,11 +231,6 @@ defmodule Membrane.RTSP.Source do @spec create_sources_spec(State.t()) :: Membrane.ChildrenSpec.t() defp create_sources_spec(state) do - fmt_mapping = - Map.new(state.tracks, fn %{rtpmap: rtpmap} -> - {rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}} - end) - case state.transport do :tcp -> {:tcp, socket} = List.first(state.tracks).transport @@ -254,46 +240,58 @@ defmodule Membrane.RTSP.Source do local_socket: socket, on_connection_closed: state.on_connection_closed }) - |> child(:tcp_depayloader, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session}) - |> via_in(Pad.ref(:rtp_input, make_ref())) - |> child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping}) + |> child(:tcp_decapsulator, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session}) + |> child(:rtp_demuxer, Membrane.RTP.Demuxer) {:udp, _port_range_start, _port_range_end} -> [ - child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping}) - | Enum.flat_map(state.tracks, fn track -> - {:udp, rtp_port, rtcp_port} = track.transport - - [ - child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port}) - |> via_in(Pad.ref(:rtp_input, make_ref())) - |> get_child(:rtp_session), - child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port}) - |> via_in(Pad.ref(:rtp_input, make_ref())) - |> get_child(:rtp_session) - ] - end) + Enum.flat_map(state.tracks, fn track -> + {:udp, rtp_port, rtcp_port} = track.transport + + [ + child({:udp_source, rtp_port}, %Membrane.UDP.Source{local_port_no: rtp_port}) + |> child({:rtp_demuxer, track.control_path}, Membrane.RTP.Demuxer), + child({:udp_source, rtcp_port}, %Membrane.UDP.Source{local_port_no: rtcp_port}) + |> child({:rtcp_demuxer, track.control_path}, Membrane.RTP.Demuxer) + ] + end) ] end end - @spec get_rtp_depayloader(ConnectionManager.track()) :: module() | nil - defp get_rtp_depayloader(%{rtpmap: %{encoding: "H264"}}), do: Membrane.RTP.H264.Depayloader - defp get_rtp_depayloader(%{rtpmap: %{encoding: "H265"}}), do: Membrane.RTP.H265.Depayloader - defp get_rtp_depayloader(%{rtpmap: %{encoding: "opus"}}), do: Membrane.RTP.Opus.Depayloader + @spec depayloader(ChildrenSpec.builder(), ConnectionManager.track()) :: ChildrenSpec.builder() + defp depayloader(builder, track) do + depayloader_definition = + case track do + %{rtpmap: %{encoding: "H264"}} -> + Membrane.RTP.H264.Depayloader + + %{rtpmap: %{encoding: "H265"}} -> + Membrane.RTP.H265.Depayloader + + %{rtpmap: %{encoding: "opus"}} -> + Membrane.RTP.Opus.Depayloader + + %{type: :audio, rtpmap: %{encoding: "mpeg4-generic"}} -> + mode = + case track.fmtp do + %{mode: :AAC_hbr} -> :hbr + %{mode: :AAC_lbr} -> :lbr + end - defp get_rtp_depayloader(%{type: :audio, rtpmap: %{encoding: "mpeg4-generic"}} = track) do - mode = - case track.fmtp do - %{mode: :AAC_hbr} -> :hbr - %{mode: :AAC_lbr} -> :lbr + %Membrane.RTP.AAC.Depayloader{mode: mode} + + %{rtpmap: %{encoding: _other}} -> + nil end - %Membrane.RTP.AAC.Depayloader{mode: mode} + if depayloader_definition != nil do + child(builder, {:depayloader, make_ref()}, depayloader_definition) + else + builder + end end - defp get_rtp_depayloader(%{rtpmap: %{encoding: _other}}), do: nil - @spec parser(ChildrenSpec.builder(), ConnectionManager.track()) :: ChildrenSpec.builder() defp parser(link_builder, %{rtpmap: %{encoding: "H264"}} = track) do sps = track.fmtp.sprop_parameter_sets && track.fmtp.sprop_parameter_sets.sps diff --git a/lib/membrane_rtsp_plugin/source/connection_manager.ex b/lib/membrane_rtsp_plugin/source/connection_manager.ex index 64c9610..13d3151 100644 --- a/lib/membrane_rtsp_plugin/source/connection_manager.ex +++ b/lib/membrane_rtsp_plugin/source/connection_manager.ex @@ -52,7 +52,7 @@ defmodule Membrane.RTSP.Source.ConnectionManager do case RTSP.play(state.rtsp_session) do {:ok, %{status: 200}} -> - %{state | keep_alive_timer: start_keep_alive_timer(state)} + %{state | keep_alive_timer: start_keep_alive_timer(state), play_request_sent: true} _error -> handle_rtsp_error(:play_rtsp_failed, state) diff --git a/mix.exs b/mix.exs index 982d732..de8aed8 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTSP.Plugin.Mixfile do use Mix.Project - @version "0.6.1" + @version "0.7.0" @github_url "https://github.com/gBillal/membrane_rtsp_plugin" def project do diff --git a/test/membrane_rtsp_plugin/source_test.exs b/test/membrane_rtsp_plugin/source_test.exs index ec838d4..310f32b 100644 --- a/test/membrane_rtsp_plugin/source_test.exs +++ b/test/membrane_rtsp_plugin/source_test.exs @@ -34,20 +34,22 @@ defmodule Membrane.RTSP.SourceTest do end @impl true - def handle_child_notification({:new_track, ssrc, track}, _element, _ctx, state) do - file_name = - case track.rtpmap.encoding do - "H264" -> "out.h264" - "H265" -> "out.hevc" - "plain" -> "out.txt" - end - + def handle_child_notification({:set_up_tracks, tracks}, _element, _ctx, state) do spec = - get_child(:source) - |> via_out(Pad.ref(:output, ssrc)) - |> child({:sink, ssrc}, %Membrane.File.Sink{ - location: Path.join(state.dest_folder, file_name) - }) + Enum.map(tracks, fn track -> + file_name = + case track.rtpmap.encoding do + "H264" -> "out.h264" + "H265" -> "out.hevc" + "plain" -> "out.txt" + end + + get_child(:source) + |> via_out(Pad.ref(:output, track.control_path)) + |> child({:sink, track.control_path}, %Membrane.File.Sink{ + location: Path.join(state.dest_folder, file_name) + }) + end) {[spec: spec], state} end @@ -93,24 +95,6 @@ defmodule Membrane.RTSP.SourceTest do %{type: :application, rtpmap: %{encoding: "plain"}} ] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end) - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H264"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H265"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} - ) - assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) @@ -143,18 +127,6 @@ defmodule Membrane.RTSP.SourceTest do pid = Membrane.Testing.Pipeline.start_link_supervised!(options) - assert_pipeline_notified( - pid, - :source, - {:set_up_tracks, [%{type: :application, rtpmap: %{encoding: "plain"}}]} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} - ) - assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) :ok = Membrane.Testing.Pipeline.terminate(pid) @@ -192,24 +164,6 @@ defmodule Membrane.RTSP.SourceTest do %{type: :application, rtpmap: %{encoding: "plain"}} ] = Enum.sort_by(tracks, fn %{rtpmap: %{encoding: encoding}} -> encoding end) - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H264"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :video, rtpmap: %{encoding: "H265"}}} - ) - - assert_pipeline_notified( - pid, - :source, - {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} - ) - assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000) assert_end_of_stream(pid, {:sink, _ref}, :input, 5_000)