From 503f1c625a9b958b63802a91709b0f47f12ff588 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 30 Oct 2024 14:04:42 +0100 Subject: [PATCH 1/7] Implement ForwardingFilter --- lib/membrane/forwarding_filter.ex | 84 +++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 lib/membrane/forwarding_filter.ex diff --git a/lib/membrane/forwarding_filter.ex b/lib/membrane/forwarding_filter.ex new file mode 100644 index 000000000..4dda1e223 --- /dev/null +++ b/lib/membrane/forwarding_filter.ex @@ -0,0 +1,84 @@ +defmodule Membrane.ForwardingFilter do + use Membrane.Filter + + def_input_pad :input, + accepted_format: _any, + availability: :on_request, + max_instances: 1 + + def_output_pad :output, + accepted_format: _any, + availability: :on_request, + max_instances: 1 + + def_options notify_on_stream_format?: [default: false], notify_on_event?: [default: false] + + defguardp flowing?(ctx, state) when ctx.playback == :playing and state.input != nil and state.output != nil + + @impl true + def handle_init(_ctx, opts) do + state = + opts + |> Map.from_struct() + |> Map.merge(%{input: nil, output: nil, queue: []}) + + {[], state} + end + + @impl true + def handle_pad_added(Pad.ref(direction, _ref) = pad, ctx, state) do + state = state |> Map.put(direction, pad) + flush_queue_if_flowing(ctx, state) + end + + @impl true + def handle_playing(ctx, state), do: flush_queue_if_flowing(ctx, state) + + [handle_buffer: :buffer, handle_event: :event, handle_stream_format: :stream_format] + |> Enum.map(fn {callback, action} -> + @impl true + def unquote(callback)(pad, item, ctx, state) when flowing?(ctx, state) do + actions = [{unquote(action), {opposite_pad(pad, state), item}}] + {actions, state} + end + + @impl true + def unquote(callback)(pad, item, _ctx, state) do + state = unquote(action) |> store_in_queue(pad, item, state) + {[], state} + end + end) + + @impl true + def handle_end_of_stream(pad, ctx, state) when flowing?(ctx, state) do + {[end_of_stream: opposite_pad(pad, state)], state} + end + + @impl true + def handle_end_of_stream(pad, _ctx, state) do + {[], store_in_queue(:end_of_stream, pad, nil, state)} + end + + defp store_in_queue(action, pad, item, state) do + state |> Map.update!(:queue, &[{action, pad, item} | &1]) + end + + defp flush_queue_if_flowing(ctx, state) do + if flowing?(ctx, state), do: flush_queue(ctx, state), else: {[], state} + end + + defp flush_queue(ctx, state) when flowing?(ctx, state) do + actions = + state.queue + |> Enum.reverse() + |> Enum.map(fn + {:end_of_stream, _input, nil} -> {:end_of_stream, state.output} + {action, pad, item} -> {action, {opposite_pad(pad, state), item}} + end) + + {actions, %{state | queue: nil}} + end + + defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output + defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input +end From 35eecce8918a43a5b345dbb69fd652dcaf1b9298 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 30 Oct 2024 16:35:41 +0100 Subject: [PATCH 2/7] Add test wip --- lib/membrane/forwarding_filter.ex | 15 +++- .../integration/forwarding_filter_test.exs | 79 +++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 test/membrane/integration/forwarding_filter_test.exs diff --git a/lib/membrane/forwarding_filter.ex b/lib/membrane/forwarding_filter.ex index 4dda1e223..88f5c6a0b 100644 --- a/lib/membrane/forwarding_filter.ex +++ b/lib/membrane/forwarding_filter.ex @@ -13,7 +13,8 @@ defmodule Membrane.ForwardingFilter do def_options notify_on_stream_format?: [default: false], notify_on_event?: [default: false] - defguardp flowing?(ctx, state) when ctx.playback == :playing and state.input != nil and state.output != nil + defguardp flowing?(ctx, state) + when ctx.playback == :playing and state.input != nil and state.output != nil @impl true def handle_init(_ctx, opts) do @@ -39,13 +40,15 @@ defmodule Membrane.ForwardingFilter do @impl true def unquote(callback)(pad, item, ctx, state) when flowing?(ctx, state) do actions = [{unquote(action), {opposite_pad(pad, state), item}}] + actions = actions ++ maybe_notify_parent(unquote(action), pad, item, state) {actions, state} end @impl true def unquote(callback)(pad, item, _ctx, state) do state = unquote(action) |> store_in_queue(pad, item, state) - {[], state} + actions = unquote(action) |> maybe_notify_parent(pad, item, state) + {actions, state} end end) @@ -81,4 +84,12 @@ defmodule Membrane.ForwardingFilter do defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input + + defp maybe_notify_parent(:event, pad, event, %{notify_on_event?: true}), + do: [notiy_parent: {:event, pad, event}] + + defp maybe_notify_parent(:stream_format, pad, format, %{notify_on_stream_format?: true}), + do: [notiy_parent: {:stream_format, pad, format}] + + defp maybe_notify_parent(_type, _pad, _item, _state), do: [] end diff --git a/test/membrane/integration/forwarding_filter_test.exs b/test/membrane/integration/forwarding_filter_test.exs new file mode 100644 index 000000000..3f60e39f1 --- /dev/null +++ b/test/membrane/integration/forwarding_filter_test.exs @@ -0,0 +1,79 @@ +defmodule Membrane.Integration.ForwardingFilterTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.ForwardingFilter + alias Membrane.Testing + + defmodule Format do + defstruct [:field] + end + + defmodule Event do + @derive Membrane.EventProtocol + defstruct [:field] + end + + defmodule Source do + use Membrane.Source + def_output_pad :output, accepted_format: _any, flow_control: :push + + @impl true + def handle_parent_notifications({action, item}, _ctx, state), + do: {[{action, {:output, item}}], state} + end + + test "ForwardingFilter buffers data until output pad is linked" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(:source, Source) + |> child(:filter, %ForwardingFilter{ + notify_on_event?: true, + notify_on_stream_format?: true + }) + ) + + data = generate_data(100, [:stream_format, :buffer, :event]) + + data + |> Enum.each(fn {type, item} -> + Testing.Pipeline.notify_child(pipeline, :source, {type, item}) + assert_pipeline_notified(pipeline, :filter, {^type, :input, ^item}) + end) + + Testing.Pipeline.execute_actions( + spec: + get_child(:filter) + |> child(:sink, Testing.Sink) + ) + + data + |> Enum.each(fn {type, item} -> + case type do + :buffer -> assert_sink_buffer(pipeline, :sink, ^item) + :event -> assert_sink_event(pipeline, :sink, ^item) + :stream_format -> assert_sink_stream_format(pipeline, :sink, ^item) + end + end) + + data = generate_data(100, [:stream_format, :buffer, :event], 200) + end + + defp generate_data(number, types, pts_offset \\ 0) do + data = + 1..(number - 1) + |> Enum.map(fn i -> + case Enum.random(types) do + :stream_format -> {:stream_format, %Format{field: i}} + :event -> {:event, %Event{field: i}} + :buffer -> {:buffer, %Buffer{pts: i + pts_offset, payload: <<>>}} + end + end) + + [stream_format: %Format{field: 0}] ++ data + end +end From d30a2b9438e14881846d99ae45eb78a59cddfce2 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 5 Nov 2024 14:36:58 +0100 Subject: [PATCH 3/7] Fix forwarding filter test --- lib/membrane/forwarding_filter.ex | 33 +++++++++++---- .../integration/forwarding_filter_test.exs | 42 ++++++++++++------- 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/lib/membrane/forwarding_filter.ex b/lib/membrane/forwarding_filter.ex index 88f5c6a0b..7b9d691f9 100644 --- a/lib/membrane/forwarding_filter.ex +++ b/lib/membrane/forwarding_filter.ex @@ -11,7 +11,26 @@ defmodule Membrane.ForwardingFilter do availability: :on_request, max_instances: 1 - def_options notify_on_stream_format?: [default: false], notify_on_event?: [default: false] + def_options notify_on_stream_format?: [ + description: """ + If `true`, #{inspect(__MODULE__)} will send `{:stream_format, pad_ref, stream_format}` \ + notification to parent on every stream format arrival. + + Defaults to `false`. + """, + spec: boolean(), + default: false + ], + notify_on_event?: [ + description: """ + If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` \ + notification to parent on every event arrival. + + Defaults to `false`. + """, + spec: boolean(), + default: false + ] defguardp flowing?(ctx, state) when ctx.playback == :playing and state.input != nil and state.output != nil @@ -66,11 +85,7 @@ defmodule Membrane.ForwardingFilter do state |> Map.update!(:queue, &[{action, pad, item} | &1]) end - defp flush_queue_if_flowing(ctx, state) do - if flowing?(ctx, state), do: flush_queue(ctx, state), else: {[], state} - end - - defp flush_queue(ctx, state) when flowing?(ctx, state) do + defp flush_queue_if_flowing(ctx, state) when flowing?(ctx, state) do actions = state.queue |> Enum.reverse() @@ -82,14 +97,16 @@ defmodule Membrane.ForwardingFilter do {actions, %{state | queue: nil}} end + defp flush_queue_if_flowing(_ctx, state), do: {[], state} + defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input defp maybe_notify_parent(:event, pad, event, %{notify_on_event?: true}), - do: [notiy_parent: {:event, pad, event}] + do: [notify_parent: {:event, pad, event}] defp maybe_notify_parent(:stream_format, pad, format, %{notify_on_stream_format?: true}), - do: [notiy_parent: {:stream_format, pad, format}] + do: [notify_parent: {:stream_format, pad, format}] defp maybe_notify_parent(_type, _pad, _item, _state), do: [] end diff --git a/test/membrane/integration/forwarding_filter_test.exs b/test/membrane/integration/forwarding_filter_test.exs index 3f60e39f1..a2cacf840 100644 --- a/test/membrane/integration/forwarding_filter_test.exs +++ b/test/membrane/integration/forwarding_filter_test.exs @@ -4,6 +4,8 @@ defmodule Membrane.Integration.ForwardingFilterTest do import Membrane.ChildrenSpec import Membrane.Testing.Assertions + require Membrane.Pad, as: Pad + alias Membrane.Buffer alias Membrane.ForwardingFilter alias Membrane.Testing @@ -12,21 +14,16 @@ defmodule Membrane.Integration.ForwardingFilterTest do defstruct [:field] end - defmodule Event do - @derive Membrane.EventProtocol - defstruct [:field] - end - defmodule Source do use Membrane.Source def_output_pad :output, accepted_format: _any, flow_control: :push @impl true - def handle_parent_notifications({action, item}, _ctx, state), + def handle_parent_notification({action, item}, _ctx, state), do: {[{action, {:output, item}}], state} end - test "ForwardingFilter buffers data until output pad is linked" do + test "Membrane.ForwardingFilter buffers data until output pad is linked" do pipeline = Testing.Pipeline.start_link_supervised!( spec: @@ -42,17 +39,32 @@ defmodule Membrane.Integration.ForwardingFilterTest do data |> Enum.each(fn {type, item} -> Testing.Pipeline.notify_child(pipeline, :source, {type, item}) - assert_pipeline_notified(pipeline, :filter, {^type, :input, ^item}) + + if type in [:stream_format, :event] do + assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item}) + end + end) + + spec = get_child(:filter) |> child(:sink, Testing.Sink) + Testing.Pipeline.execute_actions(pipeline, spec: spec) + + data + |> Enum.each(fn + {:buffer, item} -> assert_sink_buffer(pipeline, :sink, ^item) + {:event, item} -> assert_sink_event(pipeline, :sink, ^item) + {:stream_format, item} -> assert_sink_stream_format(pipeline, :sink, ^item) end) - Testing.Pipeline.execute_actions( - spec: - get_child(:filter) - |> child(:sink, Testing.Sink) - ) + data = generate_data(100, [:stream_format, :buffer, :event], 200) data |> Enum.each(fn {type, item} -> + Testing.Pipeline.notify_child(pipeline, :source, {type, item}) + + if type in [:stream_format, :event] do + assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item}) + end + case type do :buffer -> assert_sink_buffer(pipeline, :sink, ^item) :event -> assert_sink_event(pipeline, :sink, ^item) @@ -60,7 +72,7 @@ defmodule Membrane.Integration.ForwardingFilterTest do end end) - data = generate_data(100, [:stream_format, :buffer, :event], 200) + Testing.Pipeline.terminate(pipeline) end defp generate_data(number, types, pts_offset \\ 0) do @@ -69,7 +81,7 @@ defmodule Membrane.Integration.ForwardingFilterTest do |> Enum.map(fn i -> case Enum.random(types) do :stream_format -> {:stream_format, %Format{field: i}} - :event -> {:event, %Event{field: i}} + :event -> {:event, %Testing.Event{}} :buffer -> {:buffer, %Buffer{pts: i + pts_offset, payload: <<>>}} end end) From c24e1c884732cba0ea045c7c28808684677a146f Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 5 Nov 2024 16:00:32 +0100 Subject: [PATCH 4/7] Add new test to ForwardingFilteR --- lib/membrane/forwarding_filter.ex | 42 +++++++++++++++---- .../integration/forwarding_filter_test.exs | 33 +++++++++++++++ 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/lib/membrane/forwarding_filter.ex b/lib/membrane/forwarding_filter.ex index 7b9d691f9..08108ff1c 100644 --- a/lib/membrane/forwarding_filter.ex +++ b/lib/membrane/forwarding_filter.ex @@ -1,4 +1,11 @@ defmodule Membrane.ForwardingFilter do + @moduledoc """ + Membrane Filter with input and output dynamic pads, that forwards incoming data to the opposite + side than the one from which it came. + + If pad from one side is not linked yet, the data will be buffered until both pads are linked. + """ + use Membrane.Filter def_input_pad :input, @@ -23,8 +30,8 @@ defmodule Membrane.ForwardingFilter do ], notify_on_event?: [ description: """ - If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` \ - notification to parent on every event arrival. + If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` notification to \ + parent on every event arrival. Defaults to `false`. """, @@ -35,12 +42,15 @@ defmodule Membrane.ForwardingFilter do defguardp flowing?(ctx, state) when ctx.playback == :playing and state.input != nil and state.output != nil + defguardp input_demand_should_be_paused?(ctx, state) + when ctx.playback == :playing and state.input != nil and state.output == nil + @impl true def handle_init(_ctx, opts) do state = opts |> Map.from_struct() - |> Map.merge(%{input: nil, output: nil, queue: []}) + |> Map.merge(%{input: nil, output: nil, queue: [], input_demand_paused?: false}) {[], state} end @@ -48,11 +58,11 @@ defmodule Membrane.ForwardingFilter do @impl true def handle_pad_added(Pad.ref(direction, _ref) = pad, ctx, state) do state = state |> Map.put(direction, pad) - flush_queue_if_flowing(ctx, state) + handle_flowing_state_changed(ctx, state) end @impl true - def handle_playing(ctx, state), do: flush_queue_if_flowing(ctx, state) + def handle_playing(ctx, state), do: handle_flowing_state_changed(ctx, state) [handle_buffer: :buffer, handle_event: :event, handle_stream_format: :stream_format] |> Enum.map(fn {callback, action} -> @@ -85,7 +95,25 @@ defmodule Membrane.ForwardingFilter do state |> Map.update!(:queue, &[{action, pad, item} | &1]) end - defp flush_queue_if_flowing(ctx, state) when flowing?(ctx, state) do + defp handle_flowing_state_changed(ctx, state) do + {pause_or_resume_demand, state} = manage_input_demand(ctx, state) + {flush_queue_actions, state} = maybe_flush_queue(ctx, state) + {pause_or_resume_demand ++ flush_queue_actions, state} + end + + defp manage_input_demand(ctx, %{input_demand_paused?: true} = state) + when not input_demand_should_be_paused?(ctx, state) do + {[resume_auto_demand: state.input], %{state | input_demand_paused?: false}} + end + + defp manage_input_demand(ctx, %{input_demand_paused?: false} = state) + when input_demand_should_be_paused?(ctx, state) do + {[pause_auto_demand: state.input], %{state | input_demand_paused?: true}} + end + + defp manage_input_demand(_ctx, state), do: {[], state} + + defp maybe_flush_queue(ctx, state) when flowing?(ctx, state) do actions = state.queue |> Enum.reverse() @@ -97,7 +125,7 @@ defmodule Membrane.ForwardingFilter do {actions, %{state | queue: nil}} end - defp flush_queue_if_flowing(_ctx, state), do: {[], state} + defp maybe_flush_queue(_ctx, state), do: {[], state} defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input diff --git a/test/membrane/integration/forwarding_filter_test.exs b/test/membrane/integration/forwarding_filter_test.exs index a2cacf840..2a930e559 100644 --- a/test/membrane/integration/forwarding_filter_test.exs +++ b/test/membrane/integration/forwarding_filter_test.exs @@ -75,6 +75,39 @@ defmodule Membrane.Integration.ForwardingFilterTest do Testing.Pipeline.terminate(pipeline) end + test "Membrane.ForwardingFilter pauses input demand if output pad is not linked" do + atomics_ref = :atomics.new(1, []) + :atomics.put(atomics_ref, 1, 0) + + generator = fn _initial_state, _demand_size -> + :atomics.add(atomics_ref, 1, 1) + buffer = %Membrane.Buffer{payload: <<>>} + {[buffer: {:output, buffer}, redemand: :output], nil} + end + + auto_demand_size = 20 + + spec = + child(:source, %Testing.Source{output: {nil, generator}}) + |> via_in(:input, auto_demand_size: auto_demand_size) + |> child(:forwarding_filter, ForwardingFilter) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(500) + + assert :atomics.get(atomics_ref, 1) == auto_demand_size + + spec = get_child(:forwarding_filter) |> child(:sink, Testing.Sink) + Testing.Pipeline.execute_actions(pipeline, spec: spec) + + Process.sleep(500) + + assert :atomics.get(atomics_ref, 1) > auto_demand_size + 100 + + Testing.Pipeline.terminate(pipeline) + end + defp generate_data(number, types, pts_offset \\ 0) do data = 1..(number - 1) From 09e18c8e88a942c7230b6bde49efb02b164cc34c Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 5 Nov 2024 16:01:56 +0100 Subject: [PATCH 5/7] Fix credo --- test/membrane/integration/forwarding_filter_test.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/membrane/integration/forwarding_filter_test.exs b/test/membrane/integration/forwarding_filter_test.exs index 2a930e559..0ab10c223 100644 --- a/test/membrane/integration/forwarding_filter_test.exs +++ b/test/membrane/integration/forwarding_filter_test.exs @@ -4,12 +4,12 @@ defmodule Membrane.Integration.ForwardingFilterTest do import Membrane.ChildrenSpec import Membrane.Testing.Assertions - require Membrane.Pad, as: Pad - alias Membrane.Buffer alias Membrane.ForwardingFilter alias Membrane.Testing + require Membrane.Pad, as: Pad + defmodule Format do defstruct [:field] end From 380f17008c299d0008d0072ca6435b52f498eca5 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 5 Nov 2024 16:03:17 +0100 Subject: [PATCH 6/7] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c68edaab2..95f8c125c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 1.2.0 * Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876) + * Implement `Membrane.ForwardingFilter`. [#904](https://github.com/membraneframework/membrane_core/pull/904) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) From 8a3af851bd6665acd0544062744f16f6064edfa5 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 5 Nov 2024 16:08:01 +0100 Subject: [PATCH 7/7] Mix format --- lib/membrane/forwarding_filter.ex | 2 +- test/membrane/integration/forwarding_filter_test.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/membrane/forwarding_filter.ex b/lib/membrane/forwarding_filter.ex index 08108ff1c..e7fc680a2 100644 --- a/lib/membrane/forwarding_filter.ex +++ b/lib/membrane/forwarding_filter.ex @@ -101,7 +101,7 @@ defmodule Membrane.ForwardingFilter do {pause_or_resume_demand ++ flush_queue_actions, state} end - defp manage_input_demand(ctx, %{input_demand_paused?: true} = state) + defp manage_input_demand(ctx, %{input_demand_paused?: true} = state) when not input_demand_should_be_paused?(ctx, state) do {[resume_auto_demand: state.input], %{state | input_demand_paused?: false}} end diff --git a/test/membrane/integration/forwarding_filter_test.exs b/test/membrane/integration/forwarding_filter_test.exs index 0ab10c223..607a419c7 100644 --- a/test/membrane/integration/forwarding_filter_test.exs +++ b/test/membrane/integration/forwarding_filter_test.exs @@ -92,7 +92,7 @@ defmodule Membrane.Integration.ForwardingFilterTest do |> via_in(:input, auto_demand_size: auto_demand_size) |> child(:forwarding_filter, ForwardingFilter) - pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) Process.sleep(500)