diff --git a/README.md b/README.md index 64e2eb9a..351e89b1 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep ```elixir defp deps do [ - {:membrane_mp4_plugin, "~> 0.32.0"} + {:membrane_mp4_plugin, "~> 0.33.0"} ] end ``` diff --git a/lib/membrane_mp4/muxer/cmaf.ex b/lib/membrane_mp4/muxer/cmaf.ex index 52a7dd65..0224bd2b 100644 --- a/lib/membrane_mp4/muxer/cmaf.ex +++ b/lib/membrane_mp4/muxer/cmaf.ex @@ -187,7 +187,9 @@ defmodule Membrane.MP4.Muxer.CMAF do pads_registration_order: [], sample_queues: %{}, finish_current_segment?: false, - video_pad: nil + video_pad: nil, + all_input_pads_ready?: false, + buffers_awaiting_init: [] }) |> set_chunk_duration_range() @@ -309,15 +311,26 @@ defmodule Membrane.MP4.Muxer.CMAF do if are_all_group_pads_ready?(pad, ctx, state) do stream_format = generate_output_stream_format(output_pad, state) + old_input_pads_ready? = state.all_input_pads_ready? + + state = update_input_pads_ready(pad, ctx, state) + + {actions, state} = + if old_input_pads_ready? != state.all_input_pads_ready? do + replay_init_buffers(ctx, state) + else + {[], state} + end + cond do is_nil(ctx.pads[output_pad].stream_format) -> - {[stream_format: {output_pad, stream_format}], state} + {[{:stream_format, {output_pad, stream_format}} | actions], state} stream_format != ctx.pads[output_pad].stream_format -> - {[], SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)} + {actions, SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)} true -> - {[], state} + {actions, state} end else {[], state} @@ -325,7 +338,8 @@ defmodule Membrane.MP4.Muxer.CMAF do end @impl true - def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) do + def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) + when state.all_input_pads_ready? do use Numbers, overload_operators: true, comparison: true # In case DTS is not set, use PTS. This is the case for audio tracks or H264 originated @@ -335,7 +349,7 @@ defmodule Membrane.MP4.Muxer.CMAF do {sample, state} = state - |> maybe_init_segment_base_timestamp(pad, sample) + |> maybe_init_segment_timestamps(pad, sample) |> process_buffer_awaiting_duration(pad, sample) state = SegmentHelper.update_awaiting_stream_format(state, pad) @@ -359,6 +373,11 @@ defmodule Membrane.MP4.Muxer.CMAF do end end + @impl true + def handle_buffer(pad, sample, _ctx, state) do + {[], %{state | buffers_awaiting_init: [{pad, sample} | state.buffers_awaiting_init]}} + end + @impl true def handle_event(_pad, %__MODULE__.RequestMediaFinalization{}, _ctx, state) do {[], %{state | finish_current_segment?: true}} @@ -438,9 +457,12 @@ defmodule Membrane.MP4.Muxer.CMAF do track_data = %{ id: track_id, track: nil, - # base timestamp of the current segment, initialized with DTS of the first buffer + # decoding timestamp of the current segment, initialized with DTS of the first buffer + # and then incremented by duration of every produced segment + segment_decoding_timestamp: nil, + # presentation timestamp of the current segment, initialized with PTS of the first buffer # and then incremented by duration of every produced segment - segment_base_timestamp: nil, + segment_presentation_timestamp: nil, end_timestamp: 0, buffer_awaiting_duration: nil, chunks_duration: Membrane.Time.seconds(0) @@ -587,7 +609,7 @@ defmodule Membrane.MP4.Muxer.CMAF do sequence_number: state.seq_nums[output_pad], timescale: timescale, base_timestamp: - track_data.segment_base_timestamp + track_data.segment_presentation_timestamp |> Helper.timescalify(timescale) |> Ratio.trunc(), unscaled_duration: duration, @@ -603,7 +625,12 @@ defmodule Membrane.MP4.Muxer.CMAF do state = tracks_data |> Enum.reduce(state, fn %{unscaled_duration: duration, pad: pad}, state -> - update_in(state, [:pad_to_track_data, pad, :segment_base_timestamp], &(&1 + duration)) + state + |> update_in([:pad_to_track_data, pad, :segment_decoding_timestamp], &(&1 + duration)) + |> update_in( + [:pad_to_track_data, pad, :segment_presentation_timestamp], + &(&1 + duration) + ) end) |> update_in([:seq_nums, output_pad], &(&1 + 1)) @@ -712,16 +739,40 @@ defmodule Membrane.MP4.Muxer.CMAF do end end - defp maybe_init_segment_base_timestamp(state, pad, sample) do + defp maybe_init_segment_timestamps(state, pad, sample) do case state do - %{pad_to_track_data: %{^pad => %{segment_base_timestamp: nil}}} -> - put_in(state, [:pad_to_track_data, pad, :segment_base_timestamp], sample.dts) + %{pad_to_track_data: %{^pad => %{segment_decoding_timestamp: nil}}} -> + update_in(state, [:pad_to_track_data, pad], fn data -> + Map.merge(data, %{ + segment_decoding_timestamp: sample.dts, + segment_presentation_timestamp: sample.pts + }) + end) _else -> state end end + defp update_input_pads_ready(pad, ctx, state) do + all_input_pads_ready? = + Enum.all?(ctx.pads, fn + {^pad, _data} -> true + {Pad.ref(:output, _id), _data} -> true + {Pad.ref(:input, _id), data} -> data.stream_format != nil + end) + + %{state | all_input_pads_ready?: all_input_pads_ready?} + end + + defp replay_init_buffers(ctx, state) do + {buffers, state} = Map.pop!(state, :buffers_awaiting_init) + + Enum.flat_map_reduce(buffers, state, fn {pad, buffer}, state -> + handle_buffer(pad, buffer, ctx, state) + end) + end + @min_chunk_duration Membrane.Time.milliseconds(50) defp set_chunk_duration_range( %{ diff --git a/lib/membrane_mp4/muxer/cmaf/segment_helper.ex b/lib/membrane_mp4/muxer/cmaf/segment_helper.ex index 7edb16ec..820d0541 100644 --- a/lib/membrane_mp4/muxer/cmaf/segment_helper.ex +++ b/lib/membrane_mp4/muxer/cmaf/segment_helper.ex @@ -120,7 +120,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do end defp push_video_segment(state, queue, pad, sample) do - base_timestamp = max_segment_base_timestamp(state) + base_timestamp = max_segment_decoding_timestamp(state) queue = if state.finish_current_segment? do @@ -144,7 +144,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do end defp push_audio_segment(state, queue, pad, sample) do - base_timestamp = max_segment_base_timestamp(state) + base_timestamp = max_segment_decoding_timestamp(state) {video_pad, video_queue} = Enum.find(state.sample_queues, {nil, nil}, fn {_pad, queue} -> @@ -190,7 +190,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do total_collected_durations = Map.fetch!(state.pad_to_track_data, pad).chunks_duration + collected_duration - base_timestamp = state.pad_to_track_data[pad].segment_base_timestamp + base_timestamp = state.pad_to_track_data[pad].segment_decoding_timestamp queue = cond do @@ -262,7 +262,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do if video_queue do SamplesQueue.force_push(queue, sample) else - base_timestamp = max_segment_base_timestamp(state) + base_timestamp = max_segment_decoding_timestamp(state) SamplesQueue.plain_push_until_target(queue, sample, base_timestamp) end @@ -288,7 +288,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do defp update_queue_for(pad, queue, state), do: put_in(state, [:sample_queues, pad], queue) defp collect_samples_for_video_track(pad, queue, state) do - end_timestamp = SamplesQueue.last_collected_dts(queue) + end_timestamp = SamplesQueue.collectable_end_timestamp(queue) state = update_queue_for(pad, queue, state) if tracks_ready_for_collection?(state, end_timestamp) do @@ -311,7 +311,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do end defp collect_samples_for_audio_track(pad, queue, state) do - end_timestamp = SamplesQueue.last_collected_dts(queue) + end_timestamp = SamplesQueue.collectable_end_timestamp(queue) state = update_queue_for(pad, queue, state) if tracks_ready_for_collection?(state, end_timestamp) do @@ -338,7 +338,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do defp tracks_ready_for_collection?(state, end_timestamp) do Enum.all?(state.sample_queues, fn {_pad, queue} -> - SamplesQueue.last_collected_dts(queue) >= end_timestamp + SamplesQueue.collectable_end_timestamp(queue) >= end_timestamp end) end @@ -393,11 +393,11 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do maybe_return_segment(segment, reset_chunks_duration(state)) end - defp max_segment_base_timestamp(state) do + defp max_segment_decoding_timestamp(state) do state.pad_to_track_data - |> Enum.reject(fn {_key, track_data} -> is_nil(track_data.segment_base_timestamp) end) + |> Enum.reject(fn {_key, track_data} -> is_nil(track_data.segment_decoding_timestamp) end) |> Enum.map(fn {_key, track_data} -> - Ratio.to_float(track_data.segment_base_timestamp) + Ratio.to_float(track_data.segment_decoding_timestamp) end) |> Enum.max() end diff --git a/lib/membrane_mp4/muxer/cmaf/track_samples_queue.ex b/lib/membrane_mp4/muxer/cmaf/track_samples_queue.ex index a4833279..ac99fb3b 100644 --- a/lib/membrane_mp4/muxer/cmaf/track_samples_queue.ex +++ b/lib/membrane_mp4/muxer/cmaf/track_samples_queue.ex @@ -280,24 +280,31 @@ defmodule Membrane.MP4.Muxer.CMAF.TrackSamplesQueue do end @doc """ - Returns dts of the latest sample that is eligible for collection. + Returns the end timestamp for latest sample that is eligible for collection. In case of collectable state it is the last sample that has been put to queue, otherwise it is the last sample that will be in return from 'collect/1'. """ - @spec last_collected_dts(t()) :: integer() - def last_collected_dts(%__MODULE__{ + @spec collectable_end_timestamp(t()) :: integer() + def collectable_end_timestamp(%__MODULE__{ collectable?: false, target_samples: target_samples, excess_samples: excess_samples - }), - do: latest_collected_dts(excess_samples) || latest_collected_dts(target_samples) || -1 + }) do + sample = List.first(excess_samples) || List.first(target_samples) + + if sample do + sample.dts + sample.metadata.duration + else + -1 + end + end - def last_collected_dts(%__MODULE__{collectable?: true, target_samples: target_samples}), - do: latest_collected_dts(List.last(target_samples, []) |> List.wrap()) || -1 + def collectable_end_timestamp(%__MODULE__{collectable?: true, target_samples: target_samples}) do + sample = List.last(target_samples) - defp latest_collected_dts([]), do: nil - defp latest_collected_dts([sample | _rest]), do: Ratio.to_float(sample.dts) + sample.dts + sample.metadata.duration + end @doc """ Returns the most recenlty pushed sample. diff --git a/mix.exs b/mix.exs index 5b21f690..1e35442a 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.MP4.Plugin.MixProject do use Mix.Project - @version "0.32.0" + @version "0.33.0" @github_url "https://github.com/membraneframework/membrane_mp4_plugin" def project do diff --git a/test/fixtures/cmaf/muxed_audio_video/segment_1.m4s b/test/fixtures/cmaf/muxed_audio_video/segment_1.m4s index f160f25d..9f510a55 100644 Binary files a/test/fixtures/cmaf/muxed_audio_video/segment_1.m4s and b/test/fixtures/cmaf/muxed_audio_video/segment_1.m4s differ diff --git a/test/fixtures/cmaf/muxed_audio_video/segment_2.m4s b/test/fixtures/cmaf/muxed_audio_video/segment_2.m4s index 275f11f1..e5e1bcc0 100644 Binary files a/test/fixtures/cmaf/muxed_audio_video/segment_2.m4s and b/test/fixtures/cmaf/muxed_audio_video/segment_2.m4s differ diff --git a/test/fixtures/cmaf/ref_video_hevc_segment1.m4s b/test/fixtures/cmaf/ref_video_hevc_segment1.m4s index 4f42157a..7872eff1 100644 Binary files a/test/fixtures/cmaf/ref_video_hevc_segment1.m4s and b/test/fixtures/cmaf/ref_video_hevc_segment1.m4s differ diff --git a/test/fixtures/cmaf/ref_video_hevc_segment2.m4s b/test/fixtures/cmaf/ref_video_hevc_segment2.m4s index 10fc747b..8f15af0a 100644 Binary files a/test/fixtures/cmaf/ref_video_hevc_segment2.m4s and b/test/fixtures/cmaf/ref_video_hevc_segment2.m4s differ diff --git a/test/fixtures/cmaf/ref_video_segment1.m4s b/test/fixtures/cmaf/ref_video_segment1.m4s index 1727dd1f..29470379 100644 Binary files a/test/fixtures/cmaf/ref_video_segment1.m4s and b/test/fixtures/cmaf/ref_video_segment1.m4s differ diff --git a/test/fixtures/cmaf/ref_video_segment2.m4s b/test/fixtures/cmaf/ref_video_segment2.m4s index 02b5a4f8..4acf2102 100644 Binary files a/test/fixtures/cmaf/ref_video_segment2.m4s and b/test/fixtures/cmaf/ref_video_segment2.m4s differ diff --git a/test/membrane_mp4/muxer/cmaf/segment_helper_test.exs b/test/membrane_mp4/muxer/cmaf/segment_helper_test.exs index 230a30a1..59a4f042 100644 --- a/test/membrane_mp4/muxer/cmaf/segment_helper_test.exs +++ b/test/membrane_mp4/muxer/cmaf/segment_helper_test.exs @@ -16,8 +16,18 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelperTest do input_to_output_pad: %{audio: :output, video: :output}, input_groups: %{output: [:audio, :video]}, pad_to_track_data: %{ - audio: %{segment_base_timestamp: 0, chunks_duration: 0, buffer_awaiting_duration: nil}, - video: %{segment_base_timestamp: 0, chunks_duration: 0, buffer_awaiting_duration: nil} + audio: %{ + segment_decoding_timestamp: 0, + segment_presentation_timestamp: 0, + chunks_duration: 0, + buffer_awaiting_duration: nil + }, + video: %{ + segment_decoding_timestamp: 0, + segment_presentation_timestamp: 0, + chunks_duration: 0, + buffer_awaiting_duration: nil + } }, sample_queues: %{ audio: %Queue{track_with_keyframes?: false, duration_range: chunk_duration_range},