From 5e5f5b39848bd81eb9f43b1363f1e1778c7d5d82 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 11 Jan 2024 18:19:14 +0100 Subject: [PATCH] Fix tests wip --- lib/membrane/core/element/action_handler.ex | 2 + .../core/element/buffer_controller.ex | 2 +- .../core/element/demand_controller.ex | 16 ++-- .../demand_controller/auto_flow_utils.ex | 4 +- .../core/element/effective_flow_controller.ex | 2 +- lib/membrane/core/element/pad_controller.ex | 5 +- .../core/element/stream_format_controller.ex | 6 ++ .../integration/auto_demands_test.exs | 85 +++++++++---------- 8 files changed, 62 insertions(+), 60 deletions(-) diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0812367b6..cfd725903 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -133,6 +133,8 @@ defmodule Membrane.Core.Element.ActionHandler do %State{type: type} = state ) when type in [:source, :filter, :endpoint] and is_pad_ref(pad_ref) do + require Membrane.Logger, as: L + L.warning("HANDLING ACTION SEND STREAM FORMAT") send_stream_format(pad_ref, stream_format, state) end diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index cc44b03ad..10b040afc 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -105,7 +105,7 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Executes `handle_buffer` callback. """ - @spec exec_buffer_callback( Pad.ref(), [Buffer.t()], State.t() ) :: State.t() + @spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t() def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do Telemetry.report_metric("buffer", 1, inspect(pad_ref)) diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 0d4083042..d061d3979 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -28,13 +28,12 @@ defmodule Membrane.Core.Element.DemandController do if pad_data.direction == :input, do: raise("cannot snapshot atomic counter in input pad") - if state.name == {:filter, 10}, do: IO.puts("snapshot_atomic_demand") + # if state.name == {:filter, 10}, do: IO.puts("snapshot_atomic_demand") # IO.inspect(state) - -# aktualnie bug polega na tym, ze w tescie z tagiem :dupa, -# ze mamy efc push, dodajemy go do satisfied auto output pads -# przechodzimy w pull, ale sciąganie rzeczy z kolejki jest obwarowane if set empty + # aktualnie bug polega na tym, ze w tescie z tagiem :dupa, + # ze mamy efc push, dodajemy go do satisfied auto output pads + # przechodzimy w pull, ale sciąganie rzeczy z kolejki jest obwarowane if set empty do_snapshot_atomic_demand(pad_data, state) else @@ -51,8 +50,7 @@ defmodule Membrane.Core.Element.DemandController do %{flow_control: :auto} = pad_data, %{effective_flow_control: :pull} = state ) do - - # last comment + # last comment # if state.name == {:filter, 10} do # IO.puts("ALA MA KOTA") # AtomicDemand.get(pad_data.atomic_demand) @@ -60,7 +58,6 @@ defmodule Membrane.Core.Element.DemandController do # IO.inspect(state.satisfied_auto_output_pads) # end - if AtomicDemand.get(pad_data.atomic_demand) > 0 do # tutaj powinno mieć miejsce # - usuniecie pada z mapsetu @@ -68,12 +65,9 @@ defmodule Membrane.Core.Element.DemandController do # zwroc uwage, czy gdzies w czyms w stylu handle_outgoing_buffers nie wjedzie ci tutaj jakas nieprzyjemna rekurencja # kolejna rzecz: przerwanie rekurencji moze nastąpić, nawet wtedy, gdy kolejki będą miały w sobie bufory - - state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) # IO.inspect(state.satisfied_auto_output_pads) - # dobra, wyglada git AutoFlowUtils.pop_queues_and_bump_demand(state) diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index c987b7467..ed3955d6c 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -80,13 +80,13 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do @spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t() def store_event_in_queue(pad_ref, event, state) do - queue_item = {:event, event} + queue_item = {:event, event} PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) end @spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t() def store_stream_format_in_queue(pad_ref, stream_format, state) do - queue_item = {:stream_format, stream_format} + queue_item = {:stream_format, stream_format} PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item)) end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 17833c741..6e3af11e3 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -140,7 +140,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do with %{effective_flow_control: :pull} <- state do state.pads_data |> Map.values() - |> Enum.filter(& &1.direction == :output and &1.flow_control == :auto) + |> Enum.filter(&(&1.direction == :output and &1.flow_control == :auto)) |> Enum.reduce(state, fn pad_data, state -> DemandController.snapshot_atomic_demand(pad_data.ref, state) end) diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index ae3f027dd..29ae73a46 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -496,8 +496,9 @@ defmodule Membrane.Core.Element.PadController do PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref)) end) |> PadModel.set_data!(pad_ref, :associated_pads, []) - # |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) - # |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) + + # |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + # |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) # |> AutoFlowUtils.pop_queues_and_bump_demand() diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index b22637cc7..954a0234e 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -94,6 +94,12 @@ defmodule Membrane.Core.Element.StreamFormatController do """ end + require Membrane.Logger, as: L + + L.warning( + "VALIDATING STREAM FORMAT #{inspect({direction, stream_format, params}, pretty: true, limit: :infinity)}" + ) + for {module, pad_name} <- params do unless module.membrane_stream_format_match?(pad_name, stream_format) do pattern_string = diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index d51581900..42ff9b3fa 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -110,6 +110,8 @@ defmodule Membrane.Integration.AutoDemandsTest do assert_end_of_stream(pipeline, :sink) refute_sink_buffer(pipeline, :sink, _buffer, 0) + + Pipeline.terminate(pipeline) end end) @@ -137,9 +139,10 @@ defmodule Membrane.Integration.AutoDemandsTest do end) refute_sink_buffer(pipeline, :left_sink, %{payload: 25_000}) + + Pipeline.terminate(pipeline) end - @tag :asd test "handle removed branch" do pipeline = Pipeline.start_link_supervised!( @@ -231,6 +234,8 @@ defmodule Membrane.Integration.AutoDemandsTest do {:buffer_arrived, %Membrane.Buffer{payload: ^payload}} ) end + + Pipeline.terminate(pipeline) end end) @@ -336,68 +341,62 @@ defmodule Membrane.Integration.AutoDemandsTest do ] ) - # time for NotifyingAutoFilter to enter playing playback - Process.sleep(500) - [pipeline: pipeline] end describe "auto flow queue" do setup :setup_pipeline_with_notifying_auto_filter - @tag :skip + defp receive_processed_buffers(pipeline, limit, acc \\ []) + + defp receive_processed_buffers(_pipeline, limit, acc) when limit <= 0 do + Enum.reverse(acc) + end + + defp receive_processed_buffers(pipeline, limit, acc) do + receive do + {Pipeline, ^pipeline, + {:handle_child_notification, {{:handling_buffer, _pad, buffer}, :filter}}} -> + receive_processed_buffers(pipeline, limit - 1, [buffer | acc]) + after + 500 -> Enum.reverse(acc) + end + end + + @tag :asd test "when there is no demand on the output pad", %{pipeline: pipeline} do - auto_demand_size = 400 + manual_flow_queue_size = 40 assert_pipeline_notified(pipeline, :filter, :playing) - for i <- 1..auto_demand_size, source_idx <- [0, 1] do - expected_buffer = %Buffer{payload: i, metadata: %{creator: {:source, source_idx}}} + buffers = receive_processed_buffers(pipeline, 100) + assert length(buffers) == manual_flow_queue_size - assert_pipeline_notified( - pipeline, - :filter, - {:handling_buffer, _pad, ^expected_buffer} - ) - end + demand = 10_000 + Pipeline.message_child(pipeline, :sink, {:make_demand, demand}) - for _source_idx <- [0, 1] do - refute_pipeline_notified( - pipeline, - :filter, - {:handling_buffer, _pad, %Buffer{}} - ) - end + buffers = receive_processed_buffers(pipeline, 2 * demand) + buffers_number = length(buffers) - Pipeline.message_child(pipeline, :sink, {:make_demand, 2 * auto_demand_size}) + # check if filter processed proper number of buffers + assert demand <= buffers_number + assert buffers_number <= demand + manual_flow_queue_size - for i <- 1..auto_demand_size, source_idx <- [0, 1] do - expected_buffer = %Buffer{payload: i, metadata: %{creator: {:source, source_idx}}} - assert_sink_buffer(pipeline, :sink, ^expected_buffer) - end + # check if filter processed buffers from both sources + buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator) + assert Enum.count(buffers_by_creator) == 2 - for i <- (auto_demand_size + 1)..(auto_demand_size * 2), source_idx <- [0, 1] do - expected_buffer = %Buffer{payload: i, metadata: %{creator: {:source, source_idx}}} + # check if filter balanced procesesd buffers by their origin + counter_0 = Map.fetch!(buffers_by_creator, {:source, 0}) |> length() + counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length() - assert_pipeline_notified( - pipeline, - :filter, - {:handling_buffer, _pad, ^expected_buffer} - ) - end - - for _source_idx <- [0, 1] do - refute_pipeline_notified( - pipeline, - :filter, - {:handling_buffer, _pad, %Buffer{}} - ) - end + assert 0.8 < counter_0 / counter_1 + assert 1.2 > counter_0 / counter_1 Pipeline.terminate(pipeline) end - @tag :skip + # @tag :skip test "when an element returns :pause_auto_demand action", %{pipeline: pipeline} do auto_demand_size = 400