diff --git a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex index 3acb2aa8..0950282d 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex @@ -24,11 +24,11 @@ defmodule Membrane.RTMP.SourceBin do def_output_pad :video, accepted_format: H264, - availability: :always + availability: :on_request def_output_pad :audio, accepted_format: AAC, - availability: :always + availability: :on_request def_options socket: [ spec: :gen_tcp.socket() | :ssl.sslsocket(), @@ -57,34 +57,101 @@ defmodule Membrane.RTMP.SourceBin do @impl true def handle_init(_ctx, %__MODULE__{} = opts) do - structure = [ + spec = [ child(:src, %RTMP.Source{ socket: opts.socket, validator: opts.validator, use_ssl?: opts.use_ssl? }) - |> child(:demuxer, Membrane.FLV.Demuxer), - # - child(:audio_parser, %Membrane.AAC.Parser{ - out_encapsulation: :none - }), - child(:video_parser, Membrane.H264.Parser), - # - get_child(:demuxer) - |> via_out(Pad.ref(:audio, 0)) - |> get_child(:audio_parser) - |> bin_output(:audio), - # - get_child(:demuxer) - |> via_out(Pad.ref(:video, 0)) - |> get_child(:video_parser) - |> bin_output(:video) + |> child(:demuxer, Membrane.FLV.Demuxer) ] - {[spec: structure], %{}} + state = %{ + demuxer_audio_pad_ref: nil, + demuxer_video_pad_ref: nil + } + + {[spec: spec], state} + end + + @impl true + def handle_pad_added(Pad.ref(:audio, _ref) = pad, _ctx, state) do + spec = + if state.demuxer_audio_pad_ref != nil do + [ + get_child(:demuxer) + |> via_out(state.demuxer_audio_pad_ref) + |> child(:audio_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none + }) + |> bin_output(pad) + ] + else + [ + child(:funnel_audio, Membrane.Funnel) + |> bin_output(pad) + ] + end + + {[spec: spec], state} + end + + def handle_pad_added(Pad.ref(:video, _ref) = pad, _ctx, state) do + spec = + if state.demuxer_video_pad_ref != nil do + [ + get_child(:demuxer) + |> via_out(state.demuxer_video_pad_ref) + |> child(:video_parser, Membrane.H264.Parser) + |> bin_output(pad) + ] + else + [ + child(:funnel_video, Membrane.Funnel) + |> bin_output(pad) + ] + end + + {[spec: spec], state} end @impl true + def handle_child_notification({:new_stream, pad_ref, :AAC}, :demuxer, ctx, state) do + audio_pad_ref = get_pad(:audio, ctx) + + if audio_pad_ref != nil do + {[ + spec: [ + get_child(:demuxer) + |> via_out(pad_ref) + |> child(:audio_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none + }) + |> get_child(:funnel_audio) + ] + ], state} + else + {[], %{state | demuxer_audio_pad_ref: pad_ref}} + end + end + + def handle_child_notification({:new_stream, pad_ref, :H264}, :demuxer, ctx, state) do + video_pad_ref = get_pad(:video, ctx) + + if video_pad_ref != nil do + {[ + spec: [ + get_child(:demuxer) + |> via_out(pad_ref) + |> child(:video_parser, Membrane.H264.Parser) + |> get_child(:funnel_video) + ] + ], state} + else + {[], %{state | demuxer_video_pad_ref: pad_ref}} + end + end + def handle_child_notification( {type, _socket, _pid} = notification, :src, @@ -128,4 +195,10 @@ defmodule Membrane.RTMP.SourceBin do def secure_pass_control(socket, source) do :ssl.controlling_process(socket, source) end + + defp get_pad(name, ctx) do + ctx.pads + |> Map.keys() + |> Enum.find(fn pad_ref -> Pad.name_by_ref(pad_ref) == name end) + end end diff --git a/mix.exs b/mix.exs index 010e186c..1402b85d 100644 --- a/mix.exs +++ b/mix.exs @@ -46,6 +46,7 @@ defmodule Membrane.RTMP.Mixfile do {:membrane_aac_plugin, "~> 0.18.1"}, {:membrane_flv_plugin, "~> 0.12.0"}, {:membrane_file_plugin, "~> 0.17.0"}, + {:membrane_funnel_plugin, "~> 0.9.0"}, # testing {:membrane_hackney_plugin, "~> 0.11.0", only: :test}, {:ffmpex, "~> 0.10.0", only: :test}, diff --git a/mix.lock b/mix.lock index 0ecb91af..8cf2f288 100644 --- a/mix.lock +++ b/mix.lock @@ -27,8 +27,10 @@ "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"}, "membrane_core": {:hex, :membrane_core, "1.0.1", "08aa546c0d131c66f8b906b3dfb2b8f2749b56859f6fc52bd3ac846b944b3baa", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a35ed68561bdf0a2dbb2f994333be78cf4e1c4d734e4cd927d77d92049bb1273"}, + "membrane_fake_plugin": {:hex, :membrane_fake_plugin, "0.11.0", "3a2d26f15ad4940a4d44cee3354dff38fa9a39963e9b2dcb49802e150ff9a9dc", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c6b6a06eaa4e820d1e4836510ddb4bcb386c8918d0b37542a21caf6b87cbe72"}, "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.0", "e855a848e84eaed537b41fd4436712038fc5518059eadc8609c83cd2d819653a", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "9c3653ca9f13bb409b36257d6094798d4625c739ab7a4035c12308622eb16e0b"}, "membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.12.0", "d715ad405af86dcaf4b2f479e34088e1f6738c7280366828e1066b39d2aa493a", [:mix], [{:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}], "hexpm", "a317872d6d394e550c7bfd8979f12a3a1cc1e89b547d75360321025b403d3279"}, + "membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.0", "9cfe09e44d65751f7d9d8d3c42e14797f7be69e793ac112ea63cd224af70a7bf", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "988790aca59d453a6115109f050699f7f45a2eb6a7f8dc5c96392760cddead54"}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.0", "0a7c6b9a7678e8c111b22b5417465ac31cf6e598cff6a53ab53a9c379bdfa1ef", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "e9cde8c8995ace9fc26355037cbcc780f1727a3f63d36c21b52232fd29d0ad40"},