diff --git a/CHANGELOG.md b/CHANGELOG.md index c68edaab2..95f8c125c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 1.2.0 * Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876) + * Implement `Membrane.ForwardingFilter`. [#904](https://github.com/membraneframework/membrane_core/pull/904) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) diff --git a/lib/membrane/forwarding_filter.ex b/lib/membrane/forwarding_filter.ex new file mode 100644 index 000000000..e7fc680a2 --- /dev/null +++ b/lib/membrane/forwarding_filter.ex @@ -0,0 +1,140 @@ +defmodule Membrane.ForwardingFilter do + @moduledoc """ + Membrane Filter with input and output dynamic pads, that forwards incoming data to the opposite + side than the one from which it came. + + If pad from one side is not linked yet, the data will be buffered until both pads are linked. + """ + + use Membrane.Filter + + def_input_pad :input, + accepted_format: _any, + availability: :on_request, + max_instances: 1 + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + max_instances: 1 + + def_options notify_on_stream_format?: [ + description: """ + If `true`, #{inspect(__MODULE__)} will send `{:stream_format, pad_ref, stream_format}` \ + notification to parent on every stream format arrival. + + Defaults to `false`. + """, + spec: boolean(), + default: false + ], + notify_on_event?: [ + description: """ + If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` notification to \ + parent on every event arrival. + + Defaults to `false`. + """, + spec: boolean(), + default: false + ] + + defguardp flowing?(ctx, state) + when ctx.playback == :playing and state.input != nil and state.output != nil + + defguardp input_demand_should_be_paused?(ctx, state) + when ctx.playback == :playing and state.input != nil and state.output == nil + + @impl true + def handle_init(_ctx, opts) do + state = + opts + |> Map.from_struct() + |> Map.merge(%{input: nil, output: nil, queue: [], input_demand_paused?: false}) + + {[], state} + end + + @impl true + def handle_pad_added(Pad.ref(direction, _ref) = pad, ctx, state) do + state = state |> Map.put(direction, pad) + handle_flowing_state_changed(ctx, state) + end + + @impl true + def handle_playing(ctx, state), do: handle_flowing_state_changed(ctx, state) + + [handle_buffer: :buffer, handle_event: :event, handle_stream_format: :stream_format] + |> Enum.map(fn {callback, action} -> + @impl true + def unquote(callback)(pad, item, ctx, state) when flowing?(ctx, state) do + actions = [{unquote(action), {opposite_pad(pad, state), item}}] + actions = actions ++ maybe_notify_parent(unquote(action), pad, item, state) + {actions, state} + end + + @impl true + def unquote(callback)(pad, item, _ctx, state) do + state = unquote(action) |> store_in_queue(pad, item, state) + actions = unquote(action) |> maybe_notify_parent(pad, item, state) + {actions, state} + end + end) + + @impl true + def handle_end_of_stream(pad, ctx, state) when flowing?(ctx, state) do + {[end_of_stream: opposite_pad(pad, state)], state} + end + + @impl true + def handle_end_of_stream(pad, _ctx, state) do + {[], store_in_queue(:end_of_stream, pad, nil, state)} + end + + defp store_in_queue(action, pad, item, state) do + state |> Map.update!(:queue, &[{action, pad, item} | &1]) + end + + defp handle_flowing_state_changed(ctx, state) do + {pause_or_resume_demand, state} = manage_input_demand(ctx, state) + {flush_queue_actions, state} = maybe_flush_queue(ctx, state) + {pause_or_resume_demand ++ flush_queue_actions, state} + end + + defp manage_input_demand(ctx, %{input_demand_paused?: true} = state) + when not input_demand_should_be_paused?(ctx, state) do + {[resume_auto_demand: state.input], %{state | input_demand_paused?: false}} + end + + defp manage_input_demand(ctx, %{input_demand_paused?: false} = state) + when input_demand_should_be_paused?(ctx, state) do + {[pause_auto_demand: state.input], %{state | input_demand_paused?: true}} + end + + defp manage_input_demand(_ctx, state), do: {[], state} + + defp maybe_flush_queue(ctx, state) when flowing?(ctx, state) do + actions = + state.queue + |> Enum.reverse() + |> Enum.map(fn + {:end_of_stream, _input, nil} -> {:end_of_stream, state.output} + {action, pad, item} -> {action, {opposite_pad(pad, state), item}} + end) + + {actions, %{state | queue: nil}} + end + + defp maybe_flush_queue(_ctx, state), do: {[], state} + + defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output + defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input + + defp maybe_notify_parent(:event, pad, event, %{notify_on_event?: true}), + do: [notify_parent: {:event, pad, event}] + + defp maybe_notify_parent(:stream_format, pad, format, %{notify_on_stream_format?: true}), + do: [notify_parent: {:stream_format, pad, format}] + + defp maybe_notify_parent(_type, _pad, _item, _state), do: [] +end diff --git a/test/membrane/integration/forwarding_filter_test.exs b/test/membrane/integration/forwarding_filter_test.exs new file mode 100644 index 000000000..607a419c7 --- /dev/null +++ b/test/membrane/integration/forwarding_filter_test.exs @@ -0,0 +1,124 @@ +defmodule Membrane.Integration.ForwardingFilterTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.ForwardingFilter + alias Membrane.Testing + + require Membrane.Pad, as: Pad + + defmodule Format do + defstruct [:field] + end + + defmodule Source do + use Membrane.Source + def_output_pad :output, accepted_format: _any, flow_control: :push + + @impl true + def handle_parent_notification({action, item}, _ctx, state), + do: {[{action, {:output, item}}], state} + end + + test "Membrane.ForwardingFilter buffers data until output pad is linked" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:source, Source) + |> child(:filter, %ForwardingFilter{ + notify_on_event?: true, + notify_on_stream_format?: true + }) + ) + + data = generate_data(100, [:stream_format, :buffer, :event]) + + data + |> Enum.each(fn {type, item} -> + Testing.Pipeline.notify_child(pipeline, :source, {type, item}) + + if type in [:stream_format, :event] do + assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item}) + end + end) + + spec = get_child(:filter) |> child(:sink, Testing.Sink) + Testing.Pipeline.execute_actions(pipeline, spec: spec) + + data + |> Enum.each(fn + {:buffer, item} -> assert_sink_buffer(pipeline, :sink, ^item) + {:event, item} -> assert_sink_event(pipeline, :sink, ^item) + {:stream_format, item} -> assert_sink_stream_format(pipeline, :sink, ^item) + end) + + data = generate_data(100, [:stream_format, :buffer, :event], 200) + + data + |> Enum.each(fn {type, item} -> + Testing.Pipeline.notify_child(pipeline, :source, {type, item}) + + if type in [:stream_format, :event] do + assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item}) + end + + case type do + :buffer -> assert_sink_buffer(pipeline, :sink, ^item) + :event -> assert_sink_event(pipeline, :sink, ^item) + :stream_format -> assert_sink_stream_format(pipeline, :sink, ^item) + end + end) + + Testing.Pipeline.terminate(pipeline) + end + + test "Membrane.ForwardingFilter pauses input demand if output pad is not linked" do + atomics_ref = :atomics.new(1, []) + :atomics.put(atomics_ref, 1, 0) + + generator = fn _initial_state, _demand_size -> + :atomics.add(atomics_ref, 1, 1) + buffer = %Membrane.Buffer{payload: <<>>} + {[buffer: {:output, buffer}, redemand: :output], nil} + end + + auto_demand_size = 20 + + spec = + child(:source, %Testing.Source{output: {nil, generator}}) + |> via_in(:input, auto_demand_size: auto_demand_size) + |> child(:forwarding_filter, ForwardingFilter) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(500) + + assert :atomics.get(atomics_ref, 1) == auto_demand_size + + spec = get_child(:forwarding_filter) |> child(:sink, Testing.Sink) + Testing.Pipeline.execute_actions(pipeline, spec: spec) + + Process.sleep(500) + + assert :atomics.get(atomics_ref, 1) > auto_demand_size + 100 + + Testing.Pipeline.terminate(pipeline) + end + + defp generate_data(number, types, pts_offset \\ 0) do + data = + 1..(number - 1) + |> Enum.map(fn i -> + case Enum.random(types) do + :stream_format -> {:stream_format, %Format{field: i}} + :event -> {:event, %Testing.Event{}} + :buffer -> {:buffer, %Buffer{pts: i + pts_offset, payload: <<>>}} + end + end) + + [stream_format: %Format{field: 0}] ++ data + end +end