Skip to content

Commit

Permalink
Add new test to ForwardingFilteR
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Nov 5, 2024
1 parent d30a2b9 commit c24e1c8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 7 deletions.
42 changes: 35 additions & 7 deletions lib/membrane/forwarding_filter.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
defmodule Membrane.ForwardingFilter do
@moduledoc """
Membrane Filter with input and output dynamic pads, that forwards incoming data to the opposite
side than the one from which it came.
If pad from one side is not linked yet, the data will be buffered until both pads are linked.
"""

use Membrane.Filter

def_input_pad :input,
Expand All @@ -23,8 +30,8 @@ defmodule Membrane.ForwardingFilter do
],
notify_on_event?: [
description: """
If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` \
notification to parent on every event arrival.
If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` notification to \
parent on every event arrival.
Defaults to `false`.
""",
Expand All @@ -35,24 +42,27 @@ defmodule Membrane.ForwardingFilter do
defguardp flowing?(ctx, state)
when ctx.playback == :playing and state.input != nil and state.output != nil

defguardp input_demand_should_be_paused?(ctx, state)
when ctx.playback == :playing and state.input != nil and state.output == nil

@impl true
def handle_init(_ctx, opts) do
state =
opts
|> Map.from_struct()
|> Map.merge(%{input: nil, output: nil, queue: []})
|> Map.merge(%{input: nil, output: nil, queue: [], input_demand_paused?: false})

{[], state}
end

@impl true
def handle_pad_added(Pad.ref(direction, _ref) = pad, ctx, state) do
state = state |> Map.put(direction, pad)
flush_queue_if_flowing(ctx, state)
handle_flowing_state_changed(ctx, state)
end

@impl true
def handle_playing(ctx, state), do: flush_queue_if_flowing(ctx, state)
def handle_playing(ctx, state), do: handle_flowing_state_changed(ctx, state)

[handle_buffer: :buffer, handle_event: :event, handle_stream_format: :stream_format]
|> Enum.map(fn {callback, action} ->
Expand Down Expand Up @@ -85,7 +95,25 @@ defmodule Membrane.ForwardingFilter do
state |> Map.update!(:queue, &[{action, pad, item} | &1])
end

defp flush_queue_if_flowing(ctx, state) when flowing?(ctx, state) do
defp handle_flowing_state_changed(ctx, state) do
{pause_or_resume_demand, state} = manage_input_demand(ctx, state)
{flush_queue_actions, state} = maybe_flush_queue(ctx, state)
{pause_or_resume_demand ++ flush_queue_actions, state}
end

defp manage_input_demand(ctx, %{input_demand_paused?: true} = state)
when not input_demand_should_be_paused?(ctx, state) do
{[resume_auto_demand: state.input], %{state | input_demand_paused?: false}}
end

defp manage_input_demand(ctx, %{input_demand_paused?: false} = state)
when input_demand_should_be_paused?(ctx, state) do
{[pause_auto_demand: state.input], %{state | input_demand_paused?: true}}
end

defp manage_input_demand(_ctx, state), do: {[], state}

defp maybe_flush_queue(ctx, state) when flowing?(ctx, state) do
actions =
state.queue
|> Enum.reverse()
Expand All @@ -97,7 +125,7 @@ defmodule Membrane.ForwardingFilter do
{actions, %{state | queue: nil}}
end

defp flush_queue_if_flowing(_ctx, state), do: {[], state}
defp maybe_flush_queue(_ctx, state), do: {[], state}

defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output
defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input
Expand Down
33 changes: 33 additions & 0 deletions test/membrane/integration/forwarding_filter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,39 @@ defmodule Membrane.Integration.ForwardingFilterTest do
Testing.Pipeline.terminate(pipeline)
end

test "Membrane.ForwardingFilter pauses input demand if output pad is not linked" do
atomics_ref = :atomics.new(1, [])
:atomics.put(atomics_ref, 1, 0)

generator = fn _initial_state, _demand_size ->
:atomics.add(atomics_ref, 1, 1)
buffer = %Membrane.Buffer{payload: <<>>}
{[buffer: {:output, buffer}, redemand: :output], nil}
end

auto_demand_size = 20

spec =
child(:source, %Testing.Source{output: {nil, generator}})
|> via_in(:input, auto_demand_size: auto_demand_size)
|> child(:forwarding_filter, ForwardingFilter)

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

Process.sleep(500)

assert :atomics.get(atomics_ref, 1) == auto_demand_size

spec = get_child(:forwarding_filter) |> child(:sink, Testing.Sink)
Testing.Pipeline.execute_actions(pipeline, spec: spec)

Process.sleep(500)

assert :atomics.get(atomics_ref, 1) > auto_demand_size + 100

Testing.Pipeline.terminate(pipeline)
end

defp generate_data(number, types, pts_offset \\ 0) do
data =
1..(number - 1)
Expand Down

0 comments on commit c24e1c8

Please sign in to comment.