From 6b758d46f6e915cf7fa4ee83de9786e7a56d0022 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 2 Jan 2025 16:10:43 +0100 Subject: [PATCH 1/2] Implement initial auto demand WiP --- .../core/element/auto_flow_controller.ex | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/lib/membrane/core/element/auto_flow_controller.ex b/lib/membrane/core/element/auto_flow_controller.ex index 36c6fc3e1..02c8a8850 100644 --- a/lib/membrane/core/element/auto_flow_controller.ex +++ b/lib/membrane/core/element/auto_flow_controller.ex @@ -18,6 +18,7 @@ defmodule Membrane.Core.Element.AutoFlowController do require Membrane.Pad, as: Pad @empty_map_set MapSet.new() + @initial_auto_demand 20 # Description of the auto flow control queueing mechanism @@ -177,23 +178,25 @@ defmodule Membrane.Core.Element.AutoFlowController do end defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do - if increase_atomic_demand?(pad_data, state) do - %{ - ref: ref, - auto_demand_size: auto_demand_size, - demand: demand, - atomic_demand: atomic_demand, - stalker_metrics: stalker_metrics - } = pad_data + case atomic_demand_target_value(pad_data, state) do + target_value when is_number(target_value) -> + %{ + ref: ref, + demand: demand, + atomic_demand: atomic_demand, + stalker_metrics: stalker_metrics + } = pad_data - diff = auto_demand_size - demand - :ok = AtomicDemand.increase(atomic_demand, diff) + diff = target_value - demand + :ok = AtomicDemand.increase(atomic_demand, diff) - :atomics.put(stalker_metrics.demand, 1, auto_demand_size) + :atomics.put(stalker_metrics.demand, 1, target_value) - PadModel.set_data!(state, ref, :demand, auto_demand_size) - else - state + state + |> PadModel.set_data!(ref, :demand, target_value) + + :pass -> + state end end @@ -201,11 +204,21 @@ defmodule Membrane.Core.Element.AutoFlowController do raise "#{__MODULE__}.auto_adjust_atomic_demand/2 can be called only for auto input pads, while #{inspect(ref)} is not such a pad." end - defp increase_atomic_demand?(pad_data, state) do - state.effective_flow_control == :pull and - not pad_data.auto_demand_paused? and + defp atomic_demand_target_value(pad_data, state) do + cond do + state.effective_flow_control == :push or pad_data.auto_demand_paused? -> + :pass + pad_data.demand < pad_data.auto_demand_size / 2 and - state.satisfied_auto_output_pads == @empty_map_set + state.satisfied_auto_output_pads == @empty_map_set -> + pad_data.auto_demand_size + + not pad_data.start_of_stream? and pad_data.demand == 0 -> + pad_data.auto_demand_size |> min(@initial_auto_demand) + + true -> + :pass + end end @spec pop_queues_and_bump_demand(State.t()) :: State.t() From 386eeaa34adf05983fac41bd1b53d1a7ae47892d Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 2 Jan 2025 16:13:36 +0100 Subject: [PATCH 2/2] Mix format --- lib/membrane/core/element/auto_flow_controller.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane/core/element/auto_flow_controller.ex b/lib/membrane/core/element/auto_flow_controller.ex index 02c8a8850..e58669378 100644 --- a/lib/membrane/core/element/auto_flow_controller.ex +++ b/lib/membrane/core/element/auto_flow_controller.ex @@ -210,7 +210,7 @@ defmodule Membrane.Core.Element.AutoFlowController do :pass pad_data.demand < pad_data.auto_demand_size / 2 and - state.satisfied_auto_output_pads == @empty_map_set -> + state.satisfied_auto_output_pads == @empty_map_set -> pad_data.auto_demand_size not pad_data.start_of_stream? and pad_data.demand == 0 ->