diff --git a/lib/membrane/core/element/auto_flow_controller.ex b/lib/membrane/core/element/auto_flow_controller.ex index 36c6fc3e1..e58669378 100644 --- a/lib/membrane/core/element/auto_flow_controller.ex +++ b/lib/membrane/core/element/auto_flow_controller.ex @@ -18,6 +18,7 @@ defmodule Membrane.Core.Element.AutoFlowController do require Membrane.Pad, as: Pad @empty_map_set MapSet.new() + @initial_auto_demand 20 # Description of the auto flow control queueing mechanism @@ -177,23 +178,25 @@ defmodule Membrane.Core.Element.AutoFlowController do end defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do - if increase_atomic_demand?(pad_data, state) do - %{ - ref: ref, - auto_demand_size: auto_demand_size, - demand: demand, - atomic_demand: atomic_demand, - stalker_metrics: stalker_metrics - } = pad_data + case atomic_demand_target_value(pad_data, state) do + target_value when is_number(target_value) -> + %{ + ref: ref, + demand: demand, + atomic_demand: atomic_demand, + stalker_metrics: stalker_metrics + } = pad_data - diff = auto_demand_size - demand - :ok = AtomicDemand.increase(atomic_demand, diff) + diff = target_value - demand + :ok = AtomicDemand.increase(atomic_demand, diff) - :atomics.put(stalker_metrics.demand, 1, auto_demand_size) + :atomics.put(stalker_metrics.demand, 1, target_value) - PadModel.set_data!(state, ref, :demand, auto_demand_size) - else - state + state + |> PadModel.set_data!(ref, :demand, target_value) + + :pass -> + state end end @@ -201,11 +204,21 @@ defmodule Membrane.Core.Element.AutoFlowController do raise "#{__MODULE__}.auto_adjust_atomic_demand/2 can be called only for auto input pads, while #{inspect(ref)} is not such a pad." end - defp increase_atomic_demand?(pad_data, state) do - state.effective_flow_control == :pull and - not pad_data.auto_demand_paused? and + defp atomic_demand_target_value(pad_data, state) do + cond do + state.effective_flow_control == :push or pad_data.auto_demand_paused? -> + :pass + pad_data.demand < pad_data.auto_demand_size / 2 and - state.satisfied_auto_output_pads == @empty_map_set + state.satisfied_auto_output_pads == @empty_map_set -> + pad_data.auto_demand_size + + not pad_data.start_of_stream? and pad_data.demand == 0 -> + pad_data.auto_demand_size |> min(@initial_auto_demand) + + true -> + :pass + end end @spec pop_queues_and_bump_demand(State.t()) :: State.t()