diff --git a/lib/membrane/forwarding_filter.ex b/lib/membrane/forwarding_filter.ex index 7b9d691f9..08108ff1c 100644 --- a/lib/membrane/forwarding_filter.ex +++ b/lib/membrane/forwarding_filter.ex @@ -1,4 +1,11 @@ defmodule Membrane.ForwardingFilter do + @moduledoc """ + Membrane Filter with input and output dynamic pads, that forwards incoming data to the opposite + side than the one from which it came. + + If pad from one side is not linked yet, the data will be buffered until both pads are linked. + """ + use Membrane.Filter def_input_pad :input, @@ -23,8 +30,8 @@ defmodule Membrane.ForwardingFilter do ], notify_on_event?: [ description: """ - If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` \ - notification to parent on every event arrival. + If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` notification to \ + parent on every event arrival. Defaults to `false`. """, @@ -35,12 +42,15 @@ defmodule Membrane.ForwardingFilter do defguardp flowing?(ctx, state) when ctx.playback == :playing and state.input != nil and state.output != nil + defguardp input_demand_should_be_paused?(ctx, state) + when ctx.playback == :playing and state.input != nil and state.output == nil + @impl true def handle_init(_ctx, opts) do state = opts |> Map.from_struct() - |> Map.merge(%{input: nil, output: nil, queue: []}) + |> Map.merge(%{input: nil, output: nil, queue: [], input_demand_paused?: false}) {[], state} end @@ -48,11 +58,11 @@ defmodule Membrane.ForwardingFilter do @impl true def handle_pad_added(Pad.ref(direction, _ref) = pad, ctx, state) do state = state |> Map.put(direction, pad) - flush_queue_if_flowing(ctx, state) + handle_flowing_state_changed(ctx, state) end @impl true - def handle_playing(ctx, state), do: flush_queue_if_flowing(ctx, state) + def handle_playing(ctx, state), do: handle_flowing_state_changed(ctx, state) [handle_buffer: :buffer, handle_event: :event, handle_stream_format: :stream_format] |> Enum.map(fn {callback, action} -> @@ -85,7 +95,25 @@ defmodule Membrane.ForwardingFilter do state |> Map.update!(:queue, &[{action, pad, item} | &1]) end - defp flush_queue_if_flowing(ctx, state) when flowing?(ctx, state) do + defp handle_flowing_state_changed(ctx, state) do + {pause_or_resume_demand, state} = manage_input_demand(ctx, state) + {flush_queue_actions, state} = maybe_flush_queue(ctx, state) + {pause_or_resume_demand ++ flush_queue_actions, state} + end + + defp manage_input_demand(ctx, %{input_demand_paused?: true} = state) + when not input_demand_should_be_paused?(ctx, state) do + {[resume_auto_demand: state.input], %{state | input_demand_paused?: false}} + end + + defp manage_input_demand(ctx, %{input_demand_paused?: false} = state) + when input_demand_should_be_paused?(ctx, state) do + {[pause_auto_demand: state.input], %{state | input_demand_paused?: true}} + end + + defp manage_input_demand(_ctx, state), do: {[], state} + + defp maybe_flush_queue(ctx, state) when flowing?(ctx, state) do actions = state.queue |> Enum.reverse() @@ -97,7 +125,7 @@ defmodule Membrane.ForwardingFilter do {actions, %{state | queue: nil}} end - defp flush_queue_if_flowing(_ctx, state), do: {[], state} + defp maybe_flush_queue(_ctx, state), do: {[], state} defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input diff --git a/test/membrane/integration/forwarding_filter_test.exs b/test/membrane/integration/forwarding_filter_test.exs index a2cacf840..2a930e559 100644 --- a/test/membrane/integration/forwarding_filter_test.exs +++ b/test/membrane/integration/forwarding_filter_test.exs @@ -75,6 +75,39 @@ defmodule Membrane.Integration.ForwardingFilterTest do Testing.Pipeline.terminate(pipeline) end + test "Membrane.ForwardingFilter pauses input demand if output pad is not linked" do + atomics_ref = :atomics.new(1, []) + :atomics.put(atomics_ref, 1, 0) + + generator = fn _initial_state, _demand_size -> + :atomics.add(atomics_ref, 1, 1) + buffer = %Membrane.Buffer{payload: <<>>} + {[buffer: {:output, buffer}, redemand: :output], nil} + end + + auto_demand_size = 20 + + spec = + child(:source, %Testing.Source{output: {nil, generator}}) + |> via_in(:input, auto_demand_size: auto_demand_size) + |> child(:forwarding_filter, ForwardingFilter) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(500) + + assert :atomics.get(atomics_ref, 1) == auto_demand_size + + spec = get_child(:forwarding_filter) |> child(:sink, Testing.Sink) + Testing.Pipeline.execute_actions(pipeline, spec: spec) + + Process.sleep(500) + + assert :atomics.get(atomics_ref, 1) > auto_demand_size + 100 + + Testing.Pipeline.terminate(pipeline) + end + defp generate_data(number, types, pts_offset \\ 0) do data = 1..(number - 1)