diff --git a/README.md b/README.md index 7e1376b..948035a 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep ```elixir defp deps do [ - {:membrane_mp4_plugin, "~> 0.35.0"} + {:membrane_mp4_plugin, "~> 0.35.1"} ] end ``` diff --git a/examples/demuxer_isom.exs b/examples/demuxer_isom.exs index 81c6ffe..13f0b27 100644 --- a/examples/demuxer_isom.exs +++ b/examples/demuxer_isom.exs @@ -24,11 +24,11 @@ defmodule Example do hackney_opts: [follow_redirect: true] }) |> child(:demuxer, Membrane.MP4.Demuxer.ISOM) - |> via_out(Pad.ref(:output, 1)) + |> via_out(:output, options: [kind: :video]) |> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb}) |> child(:sink_video, %Membrane.File.Sink{location: @output_video}), get_child(:demuxer) - |> via_out(Pad.ref(:output, 2)) + |> via_out(:output, options: [kind: :audio]) |> child(:audio_parser, %Membrane.AAC.Parser{ out_encapsulation: :ADTS }) diff --git a/lib/membrane_mp4/demuxer/isom.ex b/lib/membrane_mp4/demuxer/isom.ex index 0ebd104..26ea986 100644 --- a/lib/membrane_mp4/demuxer/isom.ex +++ b/lib/membrane_mp4/demuxer/isom.ex @@ -5,7 +5,22 @@ defmodule Membrane.MP4.Demuxer.ISOM do The MP4 must have `fast start` enabled, i.e. the `moov` box must precede the `mdat` box. Once the Demuxer identifies the tracks in the MP4, `t:new_tracks_t/0` notification is sent for each of the tracks. - All the tracks in the MP4 must have a corresponding output pad linked (`Pad.ref(:output, track_id)`). + All pads has to be linked either before `handle_playing/2` callback or after the Element sends `{:new_tracks, ...}` + notification. + + Number of pads has to be equal to the number of demuxed tracks. + + If the demuxed data contains only one track, linked pad doesn't have to specify `:kind` option. + + If there are more than one track and pads are linked before `handle_playing/2`, every pad has to specify `:kind` + option. + + If any of pads isn't linked before `handle_playing/2`, #{inspect(__MODULE__)} will send `{:new_tracks, ...}` + notification to the parent. Otherwise, if any of them is linked before `handle_playing/3`, this notification won't + be sent. + + If pads are linked after the `{:new_tracks, ...}` notfitaction, their references must match MP4 tracks ids + (`Pad.ref(:output, track_id)`). """ use Membrane.Filter @@ -35,7 +50,15 @@ defmodule Membrane.MP4.Demuxer.ISOM do %Membrane.Opus{self_delimiting?: false} ), availability: :on_request, - flow_control: :auto + options: [ + kind: [ + spec: :video | :audio | nil, + default: nil, + description: """ + Specifies, what kind of data can be handled by a pad. + """ + ] + ] def_options optimize_for_non_fast_start?: [ default: false, @@ -82,7 +105,10 @@ defmodule Membrane.MP4.Demuxer.ISOM do boxes_size: 0, mdat_beginning: nil, mdat_size: nil, - mdat_header_size: nil + mdat_header_size: nil, + track_to_pad_id: %{}, + track_notifications_sent?: false, + pads_linked_before_notification?: false } {[], state} @@ -147,7 +173,7 @@ defmodule Membrane.MP4.Demuxer.ISOM do state.partial <> buffer.payload ) - buffers = get_buffer_actions(samples) + buffers = get_buffer_actions(samples, state) {buffers, %{state | samples_info: samples_info, partial: rest}} end @@ -356,22 +382,26 @@ defmodule Membrane.MP4.Demuxer.ISOM do state = %{state | samples_info: samples_info, partial: rest} + state = match_tracks_with_pads(ctx, state) + all_pads_connected? = all_pads_connected?(ctx, state) {buffers, state} = if all_pads_connected? do - {get_buffer_actions(samples), state} + {get_buffer_actions(samples, state), state} else {[], store_samples(state, samples)} end - notifications = get_track_notifications(state) + notifications = maybe_get_track_notifications(state) + stream_format = if all_pads_connected?, do: get_stream_format(state), else: [] state = %{ state - | all_pads_connected?: all_pads_connected? + | all_pads_connected?: all_pads_connected?, + track_notifications_sent?: true } |> update_fsm_state() @@ -385,9 +415,10 @@ defmodule Membrane.MP4.Demuxer.ISOM do end) end - defp get_buffer_actions(samples) do + defp get_buffer_actions(samples, state) do Enum.map(samples, fn {buffer, track_id} -> - {:buffer, {Pad.ref(:output, track_id), buffer}} + pad_id = state.track_to_pad_id[track_id] + {:buffer, {Pad.ref(:output, pad_id), buffer}} end) end @@ -398,12 +429,98 @@ defmodule Membrane.MP4.Demuxer.ISOM do end end - defp get_track_notifications(state) do + defp match_tracks_with_pads(ctx, state) do + sample_tables = state.samples_info.sample_tables + + output_pads_data = + ctx.pads + |> Map.values() + |> Enum.filter(&(&1.direction == :output)) + + if length(output_pads_data) not in [0, map_size(sample_tables)] do + raise_pads_not_matching_codecs_error!(ctx, state) + end + + track_to_pad_id = + case output_pads_data do + [] -> + sample_tables + |> Map.new(fn {track_id, _table} -> {track_id, track_id} end) + + [pad_data] -> + {track_id, table} = Enum.at(sample_tables, 0) + + if pad_data.options.kind not in [ + nil, + sample_description_to_kind(table.sample_description) + ] do + raise_pads_not_matching_codecs_error!(ctx, state) + end + + %{track_id => pad_data_to_pad_id(pad_data)} + + _many -> + kind_to_pads_data = output_pads_data |> Enum.group_by(& &1.options.kind) + + kind_to_tracks = + sample_tables + |> Enum.group_by( + fn {_track_id, table} -> sample_description_to_kind(table.sample_description) end, + fn {track_id, _table} -> track_id end + ) + + raise? = + Enum.any?(kind_to_pads_data, fn {kind, pads} -> + length(pads) != length(kind_to_tracks[kind]) + end) + + if raise?, do: raise_pads_not_matching_codecs_error!(ctx, state) + + kind_to_tracks + |> Enum.flat_map(fn {kind, tracks} -> + pad_refs = kind_to_pads_data[kind] |> Enum.map(&pad_data_to_pad_id/1) + Enum.zip(tracks, pad_refs) + end) + |> Map.new() + end + + %{state | track_to_pad_id: Map.new(track_to_pad_id)} + end + + defp pad_data_to_pad_id(%{ref: Pad.ref(_name, id)}), do: id + + @spec raise_pads_not_matching_codecs_error!(map(), map()) :: no_return() + defp raise_pads_not_matching_codecs_error!(ctx, state) do + pads_kinds = + ctx.pads + |> Enum.flat_map(fn + {:input, _pad_data} -> [] + {_pad_ref, %{options: %{kind: kind}}} -> [kind] + end) + + tracks_codecs = + state.samples_info.sample_tables + |> Enum.map(fn {_track, table} -> table.sample_description.__struct__ end) + + raise """ + Pads kinds don't match with tracks codecs. Pads kinds are #{inspect(pads_kinds)}. \ + Tracks codecs are #{inspect(tracks_codecs)} + """ + end + + defp sample_description_to_kind(%Membrane.H264{}), do: :video + defp sample_description_to_kind(%Membrane.H265{}), do: :video + defp sample_description_to_kind(%Membrane.AAC{}), do: :audio + defp sample_description_to_kind(%Membrane.Opus{}), do: :audio + + defp maybe_get_track_notifications(%{pads_linked_before_notification?: true}), do: [] + + defp maybe_get_track_notifications(%{pads_linked_before_notification?: false} = state) do new_tracks = state.samples_info.sample_tables |> Enum.map(fn {track_id, table} -> - content = table.sample_description - {track_id, content} + pad_id = state.track_to_pad_id[track_id] + {pad_id, table.sample_description} end) [{:notify_parent, {:new_tracks, new_tracks}}] @@ -412,7 +529,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do defp get_stream_format(state) do state.samples_info.sample_tables |> Enum.map(fn {track_id, table} -> - {:stream_format, {Pad.ref(:output, track_id), table.sample_description}} + pad_id = state.track_to_pad_id[track_id] + {:stream_format, {Pad.ref(:output, pad_id), table.sample_description}} end) end @@ -425,7 +543,23 @@ defmodule Membrane.MP4.Demuxer.ISOM do raise "All tracks have corresponding pad already connected" end - def handle_pad_added(Pad.ref(:output, _track_id), ctx, state) do + def handle_pad_added(Pad.ref(:output, _track_id) = pad_ref, ctx, state) do + state = + case ctx.playback do + :stopped -> + %{state | pads_linked_before_notification?: true} + + :playing when state.track_notifications_sent? -> + state + + :playing -> + raise """ + Pads can be linked either before #{inspect(__MODULE__)} enters :playing playback or after it \ + sends {:new_tracks, ...} notification + """ + end + + :ok = validate_pad_kind!(pad_ref, ctx.pad_options.kind, ctx, state) all_pads_connected? = all_pads_connected?(ctx, state) {actions, state} = @@ -444,6 +578,55 @@ defmodule Membrane.MP4.Demuxer.ISOM do {actions, state} end + defp validate_pad_kind!(pad_ref, pad_kind, ctx, state) do + allowed_kinds = [nil, :audio, :video] + + if pad_kind not in allowed_kinds do + raise """ + Pad #{inspect(pad_ref)} has :kind option set to #{inspect(pad_kind)}, while it has te be one of \ + #{[:audio, :video] |> inspect()} or be unspecified. + """ + end + + if not state.track_notifications_sent? and + Enum.count(ctx.pads, &match?({Pad.ref(:output, _id), %{options: %{kind: nil}}}, &1)) > 1 do + raise """ + If pads are linked before :new_tracks notifications and there are more then one of them, pad option \ + :kind has to be specyfied. + """ + end + + if state.track_notifications_sent? do + Pad.ref(:output, pad_id) = pad_ref + + related_track = + state.track_to_pad_id + |> Map.keys() + |> Enum.find(&(state.track_to_pad_id[&1] == pad_id)) + + if related_track == nil do + raise """ + Pad #{inspect(pad_ref)} doesn't have a related track. If you link pads after #{inspect(__MODULE__)} \ + sent the track notification, you have to restrict yourself to the pad occuring in this notification. \ + Tracks, that occured in this notification are: #{Map.keys(state.track_to_pad_id) |> inspect()} + """ + end + + track_kind = + state.samples_info.sample_tables[related_track].sample_description + |> sample_description_to_kind() + + if pad_kind != nil and pad_kind != track_kind do + raise """ + Pad option :kind must match with the kind of the related track or be equal nil, but pad #{inspect(pad_ref)} \ + kind is #{inspect(pad_kind)}, while the related track kind is #{inspect(track_kind)} + """ + end + end + + :ok + end + @impl true def handle_end_of_stream(:input, _ctx, %{all_pads_connected?: false} = state) do {[], %{state | end_of_stream?: true}} @@ -465,12 +648,6 @@ defmodule Membrane.MP4.Demuxer.ISOM do _pad -> [] end) - Enum.each(pads, fn pad -> - if pad not in tracks do - raise "An output pad connected with #{pad} id, however no matching track exists" - end - end) - Range.size(tracks) == length(pads) end @@ -482,7 +659,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do |> Enum.reverse() |> Enum.map(fn {buffer, ^track_id} -> buffer end) - {:buffer, {Pad.ref(:output, track_id), buffers}} + pad_id = state.track_to_pad_id[track_id] + {:buffer, {Pad.ref(:output, pad_id), buffers}} end) state = %{state | buffered_samples: %{}} diff --git a/mix.exs b/mix.exs index 2655188..2c9e42a 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.MP4.Plugin.MixProject do use Mix.Project - @version "0.35.0" + @version "0.35.1" @github_url "https://github.com/membraneframework/membrane_mp4_plugin" def project do diff --git a/mix.lock b/mix.lock index a599f76..9d2cd94 100644 --- a/mix.lock +++ b/mix.lock @@ -6,7 +6,7 @@ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, - "credo": {:hex, :credo, "1.7.5", "643213503b1c766ec0496d828c90c424471ea54da77c8a168c725686377b9545", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "f799e9b5cd1891577d8c773d245668aa74a2fcd15eb277f51a0131690ebfb3fd"}, + "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, @@ -16,7 +16,7 @@ "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "jason": {:hex, :jason, "1.4.3", "d3f984eeb96fe53b85d20e0b049f03e57d075b5acda3ac8d465c969a2536c17b", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9a90e868927f7c777689baa16d86f4d0e086d968db5c05d917ccff6d443e58a3"}, "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, diff --git a/test/membrane_mp4/demuxer/isom/demuxer_test.exs b/test/membrane_mp4/demuxer/isom/demuxer_test.exs index a442b58..2a6fbb8 100644 --- a/test/membrane_mp4/demuxer/isom/demuxer_test.exs +++ b/test/membrane_mp4/demuxer/isom/demuxer_test.exs @@ -284,28 +284,27 @@ defmodule Membrane.MP4.Demuxer.ISOM.DemuxerTest do end defp start_testing_pipeline!(opts) do - structure = [ + spec = child(:file, %Membrane.File.Source{location: opts[:input_file]}) |> child(:demuxer, Membrane.MP4.Demuxer.ISOM) |> via_out(Pad.ref(:output, 1)) |> child(:sink, %Membrane.File.Sink{location: opts[:output_file]}) - ] - Pipeline.start_link_supervised!(spec: structure) + Pipeline.start_link_supervised!(spec: spec) end defp start_testing_pipeline_with_two_tracks!(opts) do - structure = [ + spec = [ child(:file, %Membrane.File.Source{location: opts[:input_file]}) |> child(:demuxer, Membrane.MP4.Demuxer.ISOM) - |> via_out(Pad.ref(:output, 1)) + |> via_out(:output, options: [kind: :video]) |> child(:video_sink, %Membrane.File.Sink{location: opts[:video_output_file]}), get_child(:demuxer) - |> via_out(Pad.ref(:output, 2)) + |> via_out(Pad.ref(:output, 2), options: [kind: :audio]) |> child(:audio_sink, %Membrane.File.Sink{location: opts[:audio_output_file]}) ] - Pipeline.start_link_supervised!(spec: structure) + Pipeline.start_link_supervised!(spec: spec) end defp start_remote_pipeline!(opts) do diff --git a/test/membrane_mp4/demuxer/isom/integration_test.exs b/test/membrane_mp4/demuxer/isom/integration_test.exs index 717ff17..62a8b75 100644 --- a/test/membrane_mp4/demuxer/isom/integration_test.exs +++ b/test/membrane_mp4/demuxer/isom/integration_test.exs @@ -168,11 +168,11 @@ defmodule Membrane.MP4.Demuxer.ISOM.IntegrationTest do demuxing_spec = [ child(:file, %Membrane.File.Source{location: mp4_path}) |> child(:demuxer, Membrane.MP4.Demuxer.ISOM) - |> via_out(Pad.ref(:output, 1)) + |> via_out(:output, options: [kind: :video]) |> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb}) |> child(:sink_video, %Membrane.File.Sink{location: out_video_path}), get_child(:demuxer) - |> via_out(Pad.ref(:output, 2)) + |> via_out(Pad.ref(:output, 2), options: [kind: :audio]) |> child(:audio_parser, %Membrane.AAC.Parser{ out_encapsulation: :ADTS })