From fc60864cb71b9cee407084705f8e9c6c18913fb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Per=C5=BCy=C5=82o?= Date: Fri, 23 Feb 2024 15:39:56 +0100 Subject: [PATCH] Add waiting for all input pads before assembling first segment --- lib/membrane_mp4/muxer/cmaf.ex | 59 +++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/lib/membrane_mp4/muxer/cmaf.ex b/lib/membrane_mp4/muxer/cmaf.ex index a7164c6..da4e70e 100644 --- a/lib/membrane_mp4/muxer/cmaf.ex +++ b/lib/membrane_mp4/muxer/cmaf.ex @@ -187,7 +187,9 @@ defmodule Membrane.MP4.Muxer.CMAF do pads_registration_order: [], sample_queues: %{}, finish_current_segment?: false, - video_pad: nil + video_pad: nil, + all_input_pads_ready?: false, + buffers_awaiting_init: [] }) |> set_chunk_duration_range() @@ -309,15 +311,26 @@ defmodule Membrane.MP4.Muxer.CMAF do if are_all_group_pads_ready?(pad, ctx, state) do stream_format = generate_output_stream_format(output_pad, state) + old_input_pads_ready? = state.all_input_pads_ready? + + state = update_input_pads_ready(pad, ctx, state) + + {actions, state} = + if old_input_pads_ready? != state.all_input_pads_ready? do + replay_init_buffers(ctx, state) + else + {[], state} + end + cond do is_nil(ctx.pads[output_pad].stream_format) -> - {[stream_format: {output_pad, stream_format}], state} + {[{:stream_format, {output_pad, stream_format}} | actions], state} stream_format != ctx.pads[output_pad].stream_format -> - {[], SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)} + {actions, SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)} true -> - {[], state} + {actions, state} end else {[], state} @@ -325,7 +338,8 @@ defmodule Membrane.MP4.Muxer.CMAF do end @impl true - def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) do + def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) + when state.all_input_pads_ready? do use Numbers, overload_operators: true, comparison: true # In case DTS is not set, use PTS. This is the case for audio tracks or H264 originated @@ -359,6 +373,11 @@ defmodule Membrane.MP4.Muxer.CMAF do end end + @impl true + def handle_buffer(pad, sample, _ctx, state) do + {[], %{state | buffers_awaiting_init: [{pad, sample} | state.buffers_awaiting_init]}} + end + @impl true def handle_event(_pad, %__MODULE__.RequestMediaFinalization{}, _ctx, state) do {[], %{state | finish_current_segment?: true}} @@ -735,6 +754,36 @@ defmodule Membrane.MP4.Muxer.CMAF do end end + defp update_input_pads_ready(pad, ctx, state) do + all_input_pads_ready? = + Enum.all?(ctx.pads, fn + {^pad, _data} -> true + {Pad.ref(:output, _id), _data} -> true + {Pad.ref(:input, _id), data} -> data.stream_format != nil + end) + + %{state | all_input_pads_ready?: all_input_pads_ready?} + end + + defp replay_init_buffers(ctx, state) do + {buffers, state} = Map.pop!(state, :buffers_awaiting_init) + + buffers + |> Enum.reduce({[], state}, fn {pad, buffer}, {actions_batch, state} -> + {actions, state} = handle_buffer(pad, buffer, ctx, state) + + {[actions | actions_batch], state} + end) + |> then(fn {actions_batch, state} -> + actions = + actions_batch + |> Enum.reverse() + |> List.flatten() + + {actions, state} + end) + end + @min_chunk_duration Membrane.Time.milliseconds(50) defp set_chunk_duration_range( %{