diff --git a/lib/membrane_mp4/container.ex b/lib/membrane_mp4/container.ex index 24f19dd..917d216 100644 --- a/lib/membrane_mp4/container.ex +++ b/lib/membrane_mp4/container.ex @@ -18,7 +18,10 @@ defmodule Membrane.MP4.Container do {:box, box_name_t} | {:field, field_name_t} | {:data, bitstring} - | {:reason, :box_header | {:box_size, header: pos_integer, actual: pos_integer}} + | {:reason, + :box_header + | {:box_size, header: pos_integer, actual: pos_integer} + | {:non_empty_leftover, binary}} ] @type serialize_error_context_t :: [{:box, box_name_t} | {:field, field_name_t}] diff --git a/lib/membrane_mp4/container/parse_helper.ex b/lib/membrane_mp4/container/parse_helper.ex index 35afd9a..02fb9e2 100644 --- a/lib/membrane_mp4/container/parse_helper.ex +++ b/lib/membrane_mp4/container/parse_helper.ex @@ -26,7 +26,8 @@ defmodule Membrane.MP4.Container.ParseHelper do try: {:ok, {fields, rest}, context} <- parse_fields(content, box_schema.fields, context), try: - {:ok, children, <<>>, context} <- parse_boxes(rest, box_schema.children, context, []) do + {:ok, children, rest, context} <- parse_boxes(rest, box_schema.children, context, []), + leftover: <<>> <- rest do box = %{fields: fields, children: children, size: content_size, header_size: header_size} parse_boxes(data, schema, context, [{name, box} | acc]) else @@ -38,19 +39,33 @@ defmodule Membrane.MP4.Container.ParseHelper do box = %{content: content, size: content_size, header_size: header_size} parse_boxes(data, schema, context, [{name, box} | acc]) + leftover: leftover -> + {:error, [box: name, reason: {:non_empty_leftover, leftover}]} + try: {:error, context} -> {:error, [box: name] ++ context} end end - defp parse_fields(data, [], context) do - {:ok, {%{}, data}, context} + defp parse_fields(data, fields, context) do + do_parse_fields(data, fields, context, %{}) end - defp parse_fields(data, [{name, type} | fields], context) do - with {:ok, {term, rest}, context} <- parse_field(data, {name, type}, context), - {:ok, {terms, rest}, context} <- parse_fields(rest, fields, context) do - {:ok, {Map.put(terms, name, term), rest}, context} + defp do_parse_fields(data, [], context, acc) do + {:ok, {acc, data}, context} + end + + defp do_parse_fields(data, [{name, type} | fields], context, acc) do + case parse_field(data, {name, type}, context) do + {:ok, {term, rest}, context} -> + acc = Map.put(acc, name, term) + do_parse_fields(rest, fields, context, acc) + + {:ok, :ignore, context} -> + do_parse_fields(data, fields, context, acc) + + {:error, context} -> + {:error, context} end end @@ -63,27 +78,11 @@ defmodule Membrane.MP4.Container.ParseHelper do end end - defp parse_field(data, {name, {type, store: context_name, when: {key, [mask: mask]}}}, context) do - context_object = Map.get(context, key, 0) - - if (mask &&& context_object) == mask do - parse_field(data, {name, {type, store: context_name}}, context) - else - {:ok, {[], data}, context} - end - end - - defp parse_field( - data, - {name, {type, store: context_name, when: {key, [value: value]}}}, - context - ) do - context_object = Map.get(context, key, 0) - - if context_object == value do + defp parse_field(data, {name, {type, store: context_name, when: when_clause}}, context) do + if handle_when(when_clause, context) do parse_field(data, {name, {type, store: context_name}}, context) else - {:ok, {[], data}, context} + {:ok, :ignore, context} end end @@ -94,23 +93,11 @@ defmodule Membrane.MP4.Container.ParseHelper do {:ok, result, context} end - defp parse_field(data, {name, {type, when: {key, [mask: mask]}}}, context) do - context_object = Map.get(context, key, 0) - - if (mask &&& context_object) == mask do - parse_field(data, {name, type}, context) - else - {:ok, {[], data}, context} - end - end - - defp parse_field(data, {name, {type, when: {key, [value: value]}}}, context) do - context_object = Map.get(context, key, 0) - - if context_object == value do + defp parse_field(data, {name, {type, when: when_clause}}, context) do + if handle_when(when_clause, context) do parse_field(data, {name, type}, context) else - {:ok, {[], data}, context} + {:ok, :ignore, context} end end @@ -123,15 +110,21 @@ defmodule Membrane.MP4.Container.ParseHelper do defp parse_field(data, {name, {:int, size}}, context) do case data do - <> -> {:ok, {int, rest}, context} - _unknown_format -> parse_field_error(data, name) + <> -> + {:ok, {int, rest}, context} + + _unknown_format -> + parse_field_error(data, name) end end defp parse_field(data, {name, {:uint, size}}, context) do case data do - <> -> {:ok, {uint, rest}, context} - _unknown_format -> parse_field_error(data, name) + <> -> + {:ok, {uint, rest}, context} + + _unknown_format -> + parse_field_error(data, name) end end @@ -188,4 +181,16 @@ defmodule Membrane.MP4.Container.ParseHelper do defp parse_field_error(_data, name, context) do {:error, [field: name] ++ context} end + + defp handle_when({key, condition}, context) do + with {:ok, value} <- Map.fetch(context, key) do + case condition do + [value: cond_value] -> value == cond_value + [mask: mask] -> (mask &&& value) == mask + end + else + :error -> + raise "MP4 schema field #{key} not found in context" + end + end end diff --git a/lib/membrane_mp4/container/schema.ex b/lib/membrane_mp4/container/schema.ex index 5cd20cd..28e0fec 100644 --- a/lib/membrane_mp4/container/schema.ex +++ b/lib/membrane_mp4/container/schema.ex @@ -313,7 +313,7 @@ defmodule Membrane.MP4.Container.Schema do fields: @full_box ++ [ - sample_size: :uint32, + sample_size: {:uint32, store: :sample_size}, sample_count: :uint32, entry_list: { {:list, @@ -377,35 +377,28 @@ defmodule Membrane.MP4.Container.Schema do ] ], sidx: [ - version: 1, + version: 0, fields: @full_box ++ [ reference_id: :uint32, timescale: :uint32, - earliest_presentation_time: :uint64, - first_offset: :uint64, + earliest_presentation_time: {:uint32, when: {:version, value: 0}}, + earliest_presentation_time: {:uint64, when: {:version, value: 1}}, + first_offset: {:uint32, when: {:version, value: 0}}, + first_offset: {:uint64, when: {:version, value: 1}}, reserved: <<0::16-integer>>, reference_count: :uint16, - # TODO: make a list once list length is supported - # reference_list: [ - # [ - # reference_type: :bin1, - # referenced_size: :uint31, - # subsegment_duration: :uint32, - # starts_with_sap: :bin1, - # sap_type: :uint3, - # sap_delta_time: :uint28 - # ], - # length: :reference_count - # ] - reference_type: :bin1, - # from the beginning of moof to the end - referenced_size: :uint31, - subsegment_duration: :uint32, - starts_with_sap: :bin1, - sap_type: :uint3, - sap_delta_time: :uint28 + reference_list: + {:list, + [ + reference_type: :bin1, + referenced_size: :uint31, + subsegment_duration: :uint32, + starts_with_sap: :bin1, + sap_type: :uint3, + sap_delta_time: :uint28 + ]} ] ], moof: [ @@ -424,9 +417,10 @@ defmodule Membrane.MP4.Container.Schema do @full_box ++ [ track_id: :uint32, - default_sample_duration: :uint32, - default_sample_size: :uint32, - default_sample_flags: :uint32 + base_data_offset: {:uint64, when: {:fo_flags, mask: 0x00001}}, + default_sample_duration: {:uint32, when: {:fo_flags, mask: 0x000008}}, + default_sample_size: {:uint32, when: {:fo_flags, mask: 0x000010}}, + default_sample_flags: {:uint32, when: {:fo_flags, mask: 0x000020}} ] ], tfdt: [ @@ -434,7 +428,8 @@ defmodule Membrane.MP4.Container.Schema do fields: @full_box ++ [ - base_media_decode_time: :uint64 + base_media_decode_time: {:uint32, when: {:version, value: 0}}, + base_media_decode_time: {:uint64, when: {:version, value: 1}} ] ], trun: [ @@ -443,15 +438,35 @@ defmodule Membrane.MP4.Container.Schema do @full_box ++ [ sample_count: :uint32, - data_offset: :int32, + data_offset: {:int32, when: {:fo_flags, mask: 0x000001}}, + first_sample_flags: {:bin32, when: {:fo_flags, mask: 0x000004}}, + samples: + {:list, + [ + sample_duration: {:uint32, when: {:fo_flags, mask: 0x000100}}, + sample_size: {:uint32, when: {:fo_flags, mask: 0x000200}}, + sample_flags: {:bin32, when: {:fo_flags, mask: 0x000400}}, + sample_composition_offset: + {:uint32, when: {:fo_flags, mask: 0x000800}} + ]} + ] + ], + trun: [ + version: 1, + fields: + @full_box ++ + [ + sample_count: :uint32, + data_offset: {:int32, when: {:fo_flags, mask: 0x000001}}, + first_sample_flags: {:bin32, when: {:fo_flags, mask: 0x000004}}, samples: {:list, [ - sample_duration: :uint32, - sample_size: :uint32, - sample_flags: :bin32, + sample_duration: {:uint32, when: {:fo_flags, mask: 0x000100}}, + sample_size: {:uint32, when: {:fo_flags, mask: 0x000200}}, + sample_flags: {:bin32, when: {:fo_flags, mask: 0x000400}}, sample_composition_offset: - {:uint32, when: {:fo_flags, mask: 0x800}} + {:int32, when: {:fo_flags, mask: 0x000800}} ]} ] ] diff --git a/lib/membrane_mp4/demuxer/cmaf.ex b/lib/membrane_mp4/demuxer/cmaf.ex new file mode 100644 index 0000000..87c92fe --- /dev/null +++ b/lib/membrane_mp4/demuxer/cmaf.ex @@ -0,0 +1,458 @@ +defmodule Membrane.MP4.Demuxer.CMAF do + @moduledoc """ + A demuxer capable of demuxing streams packed in CMAF container. + """ + use Membrane.Filter + + alias Membrane.{MP4, RemoteStream} + alias Membrane.MP4.Container + alias Membrane.MP4.Demuxer.CMAF.SamplesInfo, as: SamplesInfo + + def_input_pad :input, + accepted_format: + %RemoteStream{type: :bytestream, content_format: content_format} + when content_format in [nil, MP4], + flow_control: :auto + + def_output_pad :output, + accepted_format: + any_of( + %Membrane.AAC{config: {:esds, _esds}}, + %Membrane.H264{ + stream_structure: {_avc, _dcr}, + alignment: :au + }, + %Membrane.H265{ + stream_structure: {_hevc, _dcr}, + alignment: :au + }, + %Membrane.Opus{self_delimiting?: false} + ), + availability: :on_request, + options: [ + kind: [ + spec: :video | :audio | nil, + default: nil, + description: """ + Specifies, what kind of data can be handled by a pad. + """ + ] + ] + + @typedoc """ + Notification sent when the tracks are identified in the MP4. + + Upon receiving the notification, `Pad.ref(:output, track_id)` pads should be linked + for all the `track_id` in the list. + The `content` field contains the stream format which is contained in the track. + """ + @type new_tracks_t() :: + {:new_tracks, [{track_id :: integer(), content :: struct()}]} + + @impl true + def handle_init(_ctx, _options) do + state = %{ + unprocessed_boxes: [], + unprocessed_binary: <<>>, + samples_info: nil, + track_to_pad_map: nil, + all_pads_connected?: false, + buffered_actions: [], + fsm_state: :reading_cmaf_header, + track_notifications_sent?: false, + last_timescales: %{}, + how_many_segment_bytes_read: 0, + tracks_info: nil, + tracks_notification_sent?: false + } + + {[], state} + end + + @impl true + def handle_stream_format(:input, _stream_format, _ctx, state) do + {[], state} + end + + @impl true + def handle_buffer(:input, buffer, ctx, state) do + {new_boxes, rest} = Container.parse!(state.unprocessed_binary <> buffer.payload) + + state = %{ + state + | unprocessed_boxes: state.unprocessed_boxes ++ new_boxes, + unprocessed_binary: rest + } + + handle_boxes(ctx, state) + end + + defp handle_boxes(_ctx, %{unprocessed_boxes: []} = state) do + {[], state} + end + + defp handle_boxes(ctx, state) do + [{first_box_name, first_box} | rest_of_boxes] = state.unprocessed_boxes + {this_box_actions, state} = do_handle_box(ctx, first_box_name, first_box, state) + {actions, state} = handle_boxes(ctx, %{state | unprocessed_boxes: rest_of_boxes}) + {this_box_actions ++ actions, state} + end + + defp do_handle_box(ctx, box_name, box, %{fsm_state: :reading_cmaf_header} = state) do + case box_name do + :ftyp -> + {[], state} + + :free -> + {[], state} + + :moov -> + tracks_info = SamplesInfo.read_moov(box) |> reject_unsupported_tracks_info() + track_to_pad_map = match_tracks_with_pads(ctx, tracks_info) + + state = %{ + state + | track_to_pad_map: track_to_pad_map, + fsm_state: :reading_fragment_header, + tracks_info: tracks_info + } + + state = %{state | all_pads_connected?: all_pads_connected?(ctx, state)} + + stream_format_actions = get_stream_format(state) + + if state.all_pads_connected? do + {stream_format_actions, state} + else + {[pause_auto_demand: :input] ++ get_track_notifications(state), + %{ + state + | buffered_actions: state.buffered_actions ++ stream_format_actions, + track_notifications_sent?: true + }} + end + + _other -> + raise """ + Demuxer entered unexpected state. + Demuxer's finite state machine's state: #{inspect(state.fsm_state)} + Encountered box type: #{inspect(box_name)} + """ + end + end + + defp do_handle_box(_ctx, box_name, box, %{fsm_state: :reading_fragment_header} = state) do + case box_name do + :sidx -> + last_timescales = + Map.put(state.last_timescales, box.fields.reference_id, box.fields.timescale) + + {[], %{state | last_timescales: last_timescales}} + + :styp -> + {[], state} + + :moof -> + samples_info = SamplesInfo.get_samples_info(box) + + {[], + %{ + state + | samples_info: samples_info, + fsm_state: :reading_fragment_data, + how_many_segment_bytes_read: box.size + box.header_size + }} + + _other -> + raise """ + Demuxer entered unexpected state. + Demuxer's finite state machine's state: #{inspect(state.fsm_state)} + Encountered box type: #{inspect(box_name)} + """ + end + end + + defp do_handle_box(_ctx, box_name, box, %{fsm_state: :reading_fragment_data} = state) do + case box_name do + :mdat -> + state = Map.update!(state, :how_many_segment_bytes_read, &(&1 + box.header_size)) + {actions, state} = read_mdat(box, state) + + new_fsm_state = + if state.samples_info == [], do: :reading_fragment_header, else: :reading_fragment_data + + {actions, %{state | fsm_state: new_fsm_state}} + + _other -> + raise """ + Demuxer entered unexpected state. + Demuxer's finite state machine's state: #{inspect(state.fsm_state)} + Encountered box type: #{inspect(box_name)} + """ + end + end + + defp read_mdat(mdat_box, state) do + {this_mdat_samples, rest_of_samples_info} = + Enum.split_while( + state.samples_info, + &(&1.offset - state.how_many_segment_bytes_read < byte_size(mdat_box.content)) + ) + + actions = + Enum.map(this_mdat_samples, fn sample -> + payload = + mdat_box.content + |> :erlang.binary_part(sample.offset - state.how_many_segment_bytes_read, sample.size) + + dts = + Ratio.new(sample.ts, state.last_timescales[sample.track_id]) |> Membrane.Time.seconds() + + pts = + Ratio.new(sample.ts + sample.composition_offset, state.last_timescales[sample.track_id]) + |> Membrane.Time.seconds() + + {:buffer, + {Pad.ref(:output, state.track_to_pad_map[sample.track_id]), + %Membrane.Buffer{payload: payload, pts: pts, dts: dts}}} + end) + + state = %{state | samples_info: rest_of_samples_info} + + if state.all_pads_connected? do + {actions, state} + else + {[], %{state | buffered_actions: state.buffered_actions ++ actions}} + end + end + + defp match_tracks_with_pads(ctx, tracks_info) do + output_pads_data = + ctx.pads + |> Map.values() + |> Enum.filter(&(&1.direction == :output)) + + if length(output_pads_data) not in [0, map_size(tracks_info)] do + raise_pads_not_matching_codecs_error!(ctx, tracks_info) + end + + track_to_pad_map = + case output_pads_data do + [] -> + tracks_info + |> Map.new(fn {track_id, _table} -> {track_id, track_id} end) + + [pad_data] -> + {track_id, track_format} = Enum.at(tracks_info, 0) + + if pad_data.options.kind not in [ + nil, + track_format_to_kind(track_format) + ] do + raise_pads_not_matching_codecs_error!(ctx, tracks_info) + end + + %{track_id => pad_data_to_pad_id(pad_data)} + + _many -> + kind_to_pads_data = output_pads_data |> Enum.group_by(& &1.options.kind) + + kind_to_tracks = + tracks_info + |> Enum.group_by( + fn {_track_id, track_format} -> track_format_to_kind(track_format) end, + fn {track_id, _track_format} -> track_id end + ) + + raise? = + Enum.any?(kind_to_pads_data, fn {kind, pads} -> + length(pads) != length(kind_to_tracks[kind]) + end) + + if raise?, do: raise_pads_not_matching_codecs_error!(ctx, tracks_info) + + kind_to_tracks + |> Enum.flat_map(fn {kind, tracks} -> + pad_refs = kind_to_pads_data[kind] |> Enum.map(&pad_data_to_pad_id/1) + Enum.zip(tracks, pad_refs) + end) + |> Map.new() + end + + track_to_pad_map + end + + defp pad_data_to_pad_id(%{ref: Pad.ref(_name, id)}), do: id + + @spec raise_pads_not_matching_codecs_error!(map(), map()) :: no_return() + defp raise_pads_not_matching_codecs_error!(ctx, tracks_info) do + pads_kinds = + ctx.pads + |> Enum.flat_map(fn + {:input, _pad_data} -> [] + {_pad_ref, %{options: %{kind: kind}}} -> [kind] + end) + + tracks_codecs = + tracks_info + |> Enum.map(fn {_track, track_format} -> track_format.__struct__ end) + + raise """ + Pads kinds don't match with tracks codecs. Pads kinds are #{inspect(pads_kinds)}. \ + Tracks codecs are #{inspect(tracks_codecs)} + """ + end + + defp track_format_to_kind(%Membrane.H264{}), do: :video + defp track_format_to_kind(%Membrane.H265{}), do: :video + defp track_format_to_kind(%Membrane.AAC{}), do: :audio + defp track_format_to_kind(%Membrane.Opus{}), do: :audio + + defp get_track_notifications(state) do + new_tracks = + state.tracks_info + |> Enum.map(fn {track_id, track_format} -> + pad_id = state.track_to_pad_map[track_id] + {pad_id, track_format} + end) + + [{:notify_parent, {:new_tracks, new_tracks}}] + end + + defp get_stream_format(state) do + state.tracks_info + |> Enum.map(fn {track_id, track_format} -> + pad_id = state.track_to_pad_map[track_id] + {:stream_format, {Pad.ref(:output, pad_id), track_format}} + end) + end + + @impl true + def handle_pad_added(:input, _ctx, state) do + {[], state} + end + + def handle_pad_added(_pad, _ctx, %{all_pads_connected?: true}) do + raise "All tracks have corresponding pad already connected" + end + + def handle_pad_added(Pad.ref(:output, _track_id) = pad_ref, ctx, state) do + state = + case ctx.playback do + :stopped -> + state + + :playing when state.track_notifications_sent? -> + state + + :playing -> + raise """ + Pads can be linked either before #{inspect(__MODULE__)} enters :playing playback or after it \ + sends {:new_tracks, ...} notification + """ + end + + :ok = validate_pad_kind!(pad_ref, ctx.pad_options.kind, ctx, state) + all_pads_connected? = all_pads_connected?(ctx, state) + + {actions, state} = + if all_pads_connected? do + {actions, state} = flush_samples(state) + {actions ++ [resume_auto_demand: :input], state} + else + {[], state} + end + + state = %{state | all_pads_connected?: all_pads_connected?} + {actions, state} + end + + defp validate_pad_kind!(pad_ref, pad_kind, ctx, state) do + allowed_kinds = [nil, :audio, :video] + + if pad_kind not in allowed_kinds do + raise """ + Pad #{inspect(pad_ref)} has :kind option set to #{inspect(pad_kind)}, while it has te be one of \ + #{[:audio, :video] |> inspect()} or be unspecified. + """ + end + + if not state.track_notifications_sent? and + Enum.count(ctx.pads, &match?({Pad.ref(:output, _id), %{options: %{kind: nil}}}, &1)) > 1 do + raise """ + If pads are linked before :new_tracks notifications and there are more then one of them, pad option \ + :kind has to be specyfied. + """ + end + + if state.track_notifications_sent? do + Pad.ref(:output, pad_id) = pad_ref + + related_track = + state.track_to_pad_map + |> Map.keys() + |> Enum.find(&(state.track_to_pad_map[&1] == pad_id)) + + if related_track == nil do + raise """ + Pad #{inspect(pad_ref)} doesn't have a related track. If you link pads after #{inspect(__MODULE__)} \ + sent the track notification, you have to restrict yourself to the pad occuring in this notification. \ + Tracks, that occured in this notification are: #{Map.keys(state.track_to_pad_map) |> inspect()} + """ + end + + track_kind = + state.tracks_info[related_track] + |> track_format_to_kind() + + if pad_kind != nil and pad_kind != track_kind do + raise """ + Pad option :kind must match with the kind of the related track or be equal nil, but pad #{inspect(pad_ref)} \ + kind is #{inspect(pad_kind)}, while the related track kind is #{inspect(track_kind)} + """ + end + end + + :ok + end + + @impl true + def handle_end_of_stream(:input, _ctx, %{all_pads_connected?: false} = state) do + {[], %{state | buffered_actions: state.buffered_actions ++ get_end_of_stream_actions(state)}} + end + + def handle_end_of_stream(:input, _ctx, %{all_pads_connected?: true} = state) do + {get_end_of_stream_actions(state), state} + end + + defp all_pads_connected?(_ctx, %{tracks_info: nil}), do: false + + defp all_pads_connected?(ctx, state) do + count_of_supported_tracks = + state.tracks_info + |> Enum.count() + + pads = + ctx.pads + |> Enum.flat_map(fn + {Pad.ref(:output, pad_id), _data} -> [pad_id] + _pad -> [] + end) + + count_of_supported_tracks == length(pads) + end + + defp flush_samples(state) do + {state.buffered_actions, %{state | buffered_actions: []}} + end + + defp get_end_of_stream_actions(state) do + Enum.map(state.tracks_info, fn {track_id, _track_format} -> + {:end_of_stream, Pad.ref(:output, state.track_to_pad_map[track_id])} + end) + end + + defp reject_unsupported_tracks_info(tracks_info) do + Map.reject(tracks_info, fn {_track_id, track_format} -> track_format == nil end) + end +end diff --git a/lib/membrane_mp4/demuxer/cmaf/samples_info.ex b/lib/membrane_mp4/demuxer/cmaf/samples_info.ex new file mode 100644 index 0000000..d372ab1 --- /dev/null +++ b/lib/membrane_mp4/demuxer/cmaf/samples_info.ex @@ -0,0 +1,93 @@ +defmodule Membrane.MP4.Demuxer.CMAF.SamplesInfo do + @moduledoc false + + alias Membrane.MP4.MovieBox.SampleTableBox + + @type sample_description :: %{ + duration: non_neg_integer(), + ts: non_neg_integer(), + size: non_neg_integer(), + composition_offset: non_neg_integer(), + offset: non_neg_integer(), + track_id: non_neg_integer() + } + + @spec read_moov(moov_box :: map()) :: %{non_neg_integer() => struct()} + def read_moov(%{children: boxes}) do + tracks = + boxes + |> Enum.filter(fn {type, _content} -> type == :trak end) + |> Enum.into(%{}, fn {:trak, %{children: boxes}} -> + {boxes[:tkhd].fields.track_id, boxes} + end) + + Map.new(tracks, fn {track_id, boxes} -> + sample_table = + SampleTableBox.unpack( + boxes[:mdia].children[:minf].children[:stbl], + boxes[:mdia].children[:mdhd].fields.timescale + ) + + {track_id, sample_table.sample_description} + end) + end + + @spec get_samples_info(moof_box :: map()) :: [sample_description()] + def get_samples_info(%{children: boxes}) do + boxes + |> Enum.filter(fn {type, _content} -> type == :traf end) + |> Enum.map(fn {:traf, box} -> box end) + |> Enum.flat_map(&handle_traf/1) + |> Enum.sort_by(& &1.offset) + end + + defp handle_traf(traf_box) do + track_description = %{ + track_id: traf_box.children[:tfhd].fields.track_id, + base_data_offset: traf_box.children[:tfhd].fields[:base_data_offset] || 0, + default_sample_duration: traf_box.children[:tfhd].fields[:default_sample_duration] || 0, + default_sample_size: traf_box.children[:tfhd].fields[:default_sample_size], + base_media_decode_time: traf_box.children[:tfdt].fields.base_media_decode_time || 0 + } + + {samples, _ts_acc} = + Enum.filter(traf_box.children, fn {box_name, _box} -> box_name == :trun end) + |> Enum.flat_map_reduce(track_description.base_media_decode_time, fn {:trun, trun_box}, + ts_acc -> + {samples, {_size_acc, new_ts_acc}} = + handle_trun( + trun_box, + ts_acc, + track_description + ) + + {samples, new_ts_acc} + end) + + samples + end + + defp handle_trun( + trun_box, + ts_acc, + track_description + ) do + Enum.map_reduce( + trun_box.fields.samples, + {track_description.base_data_offset + trun_box.fields.data_offset, ts_acc}, + fn sample, {size_acc, ts_acc} -> + size = sample[:sample_size] || track_description.default_sample_size + duration = sample[:sample_duration] || track_description.default_sample_duration + + {%{ + duration: duration, + ts: ts_acc, + size: size, + composition_offset: sample[:composition_offset] || 0, + offset: size_acc, + track_id: track_description.track_id + }, {size_acc + size, ts_acc + duration}} + end + ) + end +end diff --git a/lib/membrane_mp4/movie_box/sample_table_box.ex b/lib/membrane_mp4/movie_box/sample_table_box.ex index af0f032..cd5bf67 100644 --- a/lib/membrane_mp4/movie_box/sample_table_box.ex +++ b/lib/membrane_mp4/movie_box/sample_table_box.ex @@ -12,7 +12,6 @@ defmodule Membrane.MP4.MovieBox.SampleTableBox do sample_deltas = assemble_sample_deltas(table) maybe_sample_sync = maybe_sample_sync(table) sample_to_chunk = assemble_sample_to_chunk(table) - sample_sizes = assemble_sample_sizes(table) chunk_offsets = assemble_chunk_offsets(table) [ @@ -47,15 +46,7 @@ defmodule Membrane.MP4.MovieBox.SampleTableBox do entry_list: sample_to_chunk } }, - stsz: %{ - fields: %{ - version: 0, - flags: 0, - sample_size: 0, - sample_count: table.sample_count, - entry_list: sample_sizes - } - }, + stsz: assemble_stsz(table), stco: %{ fields: %{ version: 0, @@ -256,12 +247,27 @@ defmodule Membrane.MP4.MovieBox.SampleTableBox do } ) - defp assemble_sample_sizes(%{sample_sizes: sample_sizes}), - do: Enum.map(sample_sizes, &%{entry_size: &1}) - defp assemble_chunk_offsets(%{chunk_offsets: chunk_offsets}), do: Enum.map(chunk_offsets, &%{chunk_offset: &1}) + defp assemble_stsz(%{sample_sizes: sample_sizes, sample_count: sample_count}) do + fields = + if sample_sizes != [] and Enum.all?(sample_sizes) == hd(sample_sizes) do + %{sample_size: hd(sample_sizes), entry_list: []} + else + %{sample_size: 0, entry_list: Enum.map(sample_sizes, &%{entry_size: &1})} + end + + %{ + fields: + Map.merge(fields, %{ + version: 0, + flags: 0, + sample_count: sample_count + }) + } + end + @spec unpack(%{children: Container.t(), fields: map()}, timescale :: pos_integer()) :: SampleTable.t() def unpack(%{children: boxes}, timescale) do @@ -293,12 +299,12 @@ defmodule Membrane.MP4.MovieBox.SampleTableBox do offsets |> Enum.map(fn %{chunk_offset: offset} -> offset end) end - defp unpack_sample_sizes(%{fields: %{entry_list: [], sample_count: 1, sample_size: sample_size}}) do - [sample_size] + defp unpack_sample_sizes(%{fields: %{sample_size: 0, entry_list: sizes}}) do + Enum.map(sizes, fn %{entry_size: size} -> size end) end - defp unpack_sample_sizes(%{fields: %{entry_list: sizes}}) do - sizes |> Enum.map(fn %{entry_size: size} -> size end) + defp unpack_sample_sizes(%{fields: %{sample_size: sample_size, sample_count: sample_count}}) do + Bunch.Enum.repeated(sample_size, sample_count) end defp unpack_sample_description(%{children: [{avc, %{children: boxes, fields: fields}}]}) diff --git a/lib/membrane_mp4/segment_index_box.ex b/lib/membrane_mp4/segment_index_box.ex index 75e29d2..df53974 100644 --- a/lib/membrane_mp4/segment_index_box.ex +++ b/lib/membrane_mp4/segment_index_box.ex @@ -22,18 +22,22 @@ defmodule Membrane.MP4.SegmentIndexBox do sidx: %{ children: [], fields: %{ + reference_id: config.id, + timescale: config.timescale, earliest_presentation_time: config.base_timestamp, first_offset: 0, flags: 0, reference_count: 1, - reference_id: config.id, - reference_type: <<0::size(1)>>, - referenced_size: config.referenced_size, - sap_delta_time: 0, - sap_type: 0, - starts_with_sap: <<1::size(1)>>, - subsegment_duration: config.duration, - timescale: config.timescale, + reference_list: [ + %{ + reference_type: <<0::size(1)>>, + referenced_size: config.referenced_size, + subsegment_duration: config.duration, + starts_with_sap: <<1::size(1)>>, + sap_type: 0, + sap_delta_time: 0 + } + ], version: 1 } } diff --git a/test/fixtures/cmaf/ref_audio.aac b/test/fixtures/cmaf/ref_audio.aac new file mode 100644 index 0000000..a69cdde Binary files /dev/null and b/test/fixtures/cmaf/ref_audio.aac differ diff --git a/test/fixtures/cmaf/ref_video.h264 b/test/fixtures/cmaf/ref_video.h264 new file mode 100644 index 0000000..3dd47e5 Binary files /dev/null and b/test/fixtures/cmaf/ref_video.h264 differ diff --git a/test/membrane_mp4/demuxer/cmaf/demuxer_test.exs b/test/membrane_mp4/demuxer/cmaf/demuxer_test.exs new file mode 100644 index 0000000..41b0682 --- /dev/null +++ b/test/membrane_mp4/demuxer/cmaf/demuxer_test.exs @@ -0,0 +1,177 @@ +defmodule Membrane.MP4.Demuxer.CMAF.DemuxerTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + require Membrane.RCPipeline, as: RCPipeline + require Membrane.Pad, as: Pad + + alias Membrane.MP4.Demuxer.MultiFileSource + alias Membrane.RCMessage + alias Membrane.Testing.Pipeline + + # Fixtures used in demuxer tests below were generated with `chunk_duration` option set to `Membrane.Time.seconds(1)`. + describe "CMAF demuxer" do + @tag :tmp_dir + test "demuxes fragmented MP4 with just audio track", %{tmp_dir: dir} do + input_paths = get_files("test/fixtures/cmaf/", ["ref_audio_header.mp4", "ref_audio_*.m4s"]) + audio_output_path = Path.join(dir, "out.aac") + + pipeline = + start_testing_pipeline!( + input_paths: input_paths, + audio_output_file: audio_output_path, + audio_pad_ref: Pad.ref(:output, 1) + ) + + assert_end_of_stream(pipeline, :audio_sink) + assert :ok == Pipeline.terminate(pipeline) + + assert_files_equal(audio_output_path, "test/fixtures/cmaf/ref_audio.aac") + end + + @tag :sometag + @tag :tmp_dir + test "demuxes fragmented MP4 with just video track", %{tmp_dir: dir} do + video_output_path = Path.join(dir, "out.h264") + + pipeline = + start_testing_pipeline!( + input_paths: + get_files("test/fixtures/cmaf", ["ref_video_header.mp4", "ref_video_segment*.m4s"]), + video_output_file: video_output_path, + video_pad_ref: Pad.ref(:output, 1) + ) + + assert_end_of_stream(pipeline, :video_sink) + assert :ok == Pipeline.terminate(pipeline) + + assert_files_equal(video_output_path, "test/fixtures/cmaf/ref_video.h264") + end + + @tag :tmp_dir + test "demuxes fragmented MP4 with interleaved audio and video samples", %{tmp_dir: dir} do + input_paths = + get_files("test/fixtures/cmaf/muxed_audio_video/", ["header.mp4", "segment_*.m4s"]) + + video_output_path = Path.join(dir, "out.h264") + audio_output_path = Path.join(dir, "out.aac") + + pipeline = + start_testing_pipeline!( + input_paths: input_paths, + video_output_file: video_output_path, + audio_output_file: audio_output_path, + video_pad_ref: Pad.ref(:output, 1), + audio_pad_ref: Pad.ref(:output, 2) + ) + + assert_end_of_stream(pipeline, :video_sink) + assert_end_of_stream(pipeline, :audio_sink) + assert :ok == Pipeline.terminate(pipeline) + + assert_files_equal(video_output_path, "test/fixtures/in_video.h264") + assert_files_equal(audio_output_path, "test/fixtures/in_audio.aac") + end + + @tag :tmp_dir + test "resolves tracks from fragmented MP4 and allows to link output pads when tracks are resolved", + %{tmp_dir: dir} do + input_paths = + get_files("test/fixtures/cmaf/muxed_audio_video/", ["header.mp4", "segment_*.m4s"]) + + spec = + child(:file, %MultiFileSource{ + paths: input_paths + }) + |> child(:demuxer, Membrane.MP4.Demuxer.CMAF) + + pipeline = RCPipeline.start_link!() + RCPipeline.exec_actions(pipeline, spec: spec) + RCPipeline.subscribe(pipeline, %RCMessage.Notification{}) + RCPipeline.subscribe(pipeline, %RCMessage.EndOfStream{}) + + assert_receive %RCMessage.Notification{ + element: :demuxer, + data: {:new_tracks, [{1, _payload}, {2, _payload2}]}, + from: _ + }, + 2000 + + video_output_path = Path.join(dir, "out.h264") + audio_output_path = Path.join(dir, "out.aac") + + structure = [ + get_child(:demuxer) + |> via_out(Pad.ref(:output, 1)) + |> child(Membrane.AAC.Parser) + |> child(:audio_sink, %Membrane.File.Sink{location: audio_output_path}), + get_child(:demuxer) + |> via_out(Pad.ref(:output, 2)) + |> child(%Membrane.H264.Parser{output_stream_structure: :annexb}) + |> child(:video_sink, %Membrane.File.Sink{location: video_output_path}) + ] + + RCPipeline.exec_actions(pipeline, spec: structure) + assert_receive %RCMessage.EndOfStream{element: :demuxer, pad: :input}, 2000 + assert_receive %RCMessage.EndOfStream{element: :audio_sink, pad: :input}, 2000 + assert_receive %RCMessage.EndOfStream{element: :video_sink, pad: :input}, 2000 + RCPipeline.terminate(pipeline) + + assert_files_equal(video_output_path, "test/fixtures/in_video.h264") + assert_files_equal(audio_output_path, "test/fixtures/in_audio.aac") + end + end + + defp start_testing_pipeline!(opts) do + input_spec = [ + child(:file, %MultiFileSource{paths: opts[:input_paths]}) + |> child(:demuxer, Membrane.MP4.Demuxer.CMAF) + ] + + video_spec = + if opts[:video_output_file] do + [ + get_child(:demuxer) + |> via_out(opts[:video_pad_ref], options: [kind: :video]) + |> child(%Membrane.H264.Parser{output_stream_structure: :annexb}) + |> child(:video_sink, %Membrane.File.Sink{location: opts[:video_output_file]}) + ] + else + [] + end + + audio_spec = + if opts[:audio_output_file] do + [ + get_child(:demuxer) + |> via_out(opts[:audio_pad_ref], options: [kind: :audio]) + |> child(:parser, Membrane.AAC.Parser) + |> child(:audio_sink, %Membrane.File.Sink{location: opts[:audio_output_file]}) + ] + else + [] + end + + spec = input_spec ++ video_spec ++ audio_spec + Pipeline.start_link_supervised!(spec: spec) + end + + defp assert_files_equal(file_a, file_b) do + assert {:ok, a} = File.read(file_a) + assert {:ok, b} = File.read(file_b) + assert a == b + end + + defp get_files(directory_path, regexes) do + directory_path + |> Path.expand() + |> (fn path -> + Enum.map(regexes, &Path.join([path, "/", &1])) + end).() + |> Enum.map(&Path.wildcard/1) + |> List.flatten() + |> Enum.filter(&File.regular?(&1)) + end +end diff --git a/test/support/demuxer/multi_file_source.ex b/test/support/demuxer/multi_file_source.ex new file mode 100644 index 0000000..78aa330 --- /dev/null +++ b/test/support/demuxer/multi_file_source.ex @@ -0,0 +1,39 @@ +defmodule Membrane.MP4.Demuxer.MultiFileSource do + @moduledoc false + use Membrane.Source + + def_output_pad :output, accepted_format: _any, flow_control: :manual, demand_unit: :bytes + + def_options paths: [spec: [Path.t()]] + + @impl true + def handle_init(_ctx, opts) do + {[], %{paths: opts.paths, binary: nil}} + end + + @impl true + def handle_setup(_ctx, state) do + binary = Enum.map_join(state.paths, &File.read!/1) + {[], %{state | binary: binary}} + end + + @impl true + def handle_playing(_ctx, state) do + {[stream_format: {:output, %Membrane.RemoteStream{type: :bytestream}}], state} + end + + @impl true + def handle_demand(:output, demand_size, :bytes, ctx, state) do + case state.binary do + <> -> + {[buffer: {:output, %Membrane.Buffer{payload: first}}], %{state | binary: rest}} + + other -> + final_buffers = + if other == <<>>, do: [], else: [buffer: {:output, %Membrane.Buffer{payload: other}}] + + maybe_eos = if ctx.pads.output.end_of_stream?, do: [], else: [end_of_stream: :output] + {final_buffers ++ maybe_eos, %{state | binary: <<>>}} + end + end +end