diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 18bdd7b71..e6747fff4 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -6,7 +6,7 @@ defmodule Membrane.Integration.AutoDemandsTest do alias Membrane.Testing.{Pipeline, Sink, Source} - defmodule AutoDemandFilter do + defmodule ExponentialAutoFilter do use Membrane.Filter def_input_pad :input, accepted_format: _any @@ -35,6 +35,26 @@ defmodule Membrane.Integration.AutoDemandsTest do end end + defmodule NotifyingAutoFilter do + use Membrane.Fitler + + def_input_pad :input, accepted_format: _any, availability: :on_request + def_output_pad :output, accepted_format: _any + + @impl true + def handle_buffer(pad, buffer, _ctx, state) do + actions = [ + notify_parent: {:handling_buffer, pad, buffer}, + buffer: {:output, buffer} + ] + + {actions, state} + end + + @impl true + def handle_end_of_stream(_ctx, state), do: {[], state} + end + defmodule AutoDemandTee do use Membrane.Filter @@ -64,7 +84,7 @@ defmodule Membrane.Integration.AutoDemandsTest do :down -> {mult_payloads, payloads} end - filter = %AutoDemandFilter{factor: factor, direction: direction} + filter = %ExponentialAutoFilter{factor: factor, direction: direction} pipeline = Pipeline.start_link_supervised!( @@ -202,7 +222,7 @@ defmodule Membrane.Integration.AutoDemandsTest do Pipeline.start_link_supervised!( spec: child(:source, PushSource) - |> child(:filter, AutoDemandFilter) + |> child(:filter, ExponentialAutoFilter) |> child(:sink, Sink) ) @@ -230,7 +250,7 @@ defmodule Membrane.Integration.AutoDemandsTest do Pipeline.start_supervised!( spec: child(:source, PushSource) - |> child(:filter, AutoDemandFilter) + |> child(:filter, ExponentialAutoFilter) |> child(:sink, %Sink{autodemand: false}) ) @@ -246,6 +266,34 @@ defmodule Membrane.Integration.AutoDemandsTest do ) end + test "auto flow queue" do + # Testing.Source fed with such a buffer generator will produce buffers with incremenal + # sequence of numbers as payloads + buffer_generator = + fn counter, _size -> + actions = [ + buffer: {:output, %Buffer{payload: counter}}, + redemand: :output + ] + + {actions, counter + 1} + end + + source_definiton = %Source{output: {0, buffer_generator}} + + pipeline = + Pipeline.start_supervised!( + spec: [ + child({:source, 0}, source_definiton) + |> child(:filter, NotifyingAutoFilter) + |> child(:sink, %Sink{autodemand: false}), + child({:source, 1}, source_definiton) + |> get_child(:filter) + ] + ) + + end + defp reduce_link(link, enum, fun) do Enum.reduce(enum, link, &fun.(&2, &1)) end