Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement forwarding filter #904

Merged
merged 7 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.2.0
* Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876)
* Implement `Membrane.ForwardingFilter`. [#904](https://github.com/membraneframework/membrane_core/pull/904)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
140 changes: 140 additions & 0 deletions lib/membrane/forwarding_filter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule Membrane.ForwardingFilter do
@moduledoc """
Membrane Filter with input and output dynamic pads, that forwards incoming data to the opposite
side than the one from which it came.

If pad from one side is not linked yet, the data will be buffered until both pads are linked.
"""

use Membrane.Filter

def_input_pad :input,
accepted_format: _any,
availability: :on_request,
max_instances: 1

def_output_pad :output,
accepted_format: _any,
availability: :on_request,
max_instances: 1

def_options notify_on_stream_format?: [
description: """
If `true`, #{inspect(__MODULE__)} will send `{:stream_format, pad_ref, stream_format}` \
notification to parent on every stream format arrival.

Defaults to `false`.
""",
spec: boolean(),
default: false
],
notify_on_event?: [
description: """
If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` notification to \
parent on every event arrival.

Defaults to `false`.
""",
spec: boolean(),
default: false
]

defguardp flowing?(ctx, state)
when ctx.playback == :playing and state.input != nil and state.output != nil

defguardp input_demand_should_be_paused?(ctx, state)
when ctx.playback == :playing and state.input != nil and state.output == nil

@impl true
def handle_init(_ctx, opts) do
state =
opts
|> Map.from_struct()
|> Map.merge(%{input: nil, output: nil, queue: [], input_demand_paused?: false})

{[], state}
end

@impl true
def handle_pad_added(Pad.ref(direction, _ref) = pad, ctx, state) do
state = state |> Map.put(direction, pad)
handle_flowing_state_changed(ctx, state)
end

@impl true
def handle_playing(ctx, state), do: handle_flowing_state_changed(ctx, state)

[handle_buffer: :buffer, handle_event: :event, handle_stream_format: :stream_format]
|> Enum.map(fn {callback, action} ->
@impl true
def unquote(callback)(pad, item, ctx, state) when flowing?(ctx, state) do
actions = [{unquote(action), {opposite_pad(pad, state), item}}]
actions = actions ++ maybe_notify_parent(unquote(action), pad, item, state)
{actions, state}
end

@impl true
def unquote(callback)(pad, item, _ctx, state) do
state = unquote(action) |> store_in_queue(pad, item, state)
actions = unquote(action) |> maybe_notify_parent(pad, item, state)
{actions, state}
end
end)

@impl true
def handle_end_of_stream(pad, ctx, state) when flowing?(ctx, state) do
{[end_of_stream: opposite_pad(pad, state)], state}
end

@impl true
def handle_end_of_stream(pad, _ctx, state) do
{[], store_in_queue(:end_of_stream, pad, nil, state)}
end

defp store_in_queue(action, pad, item, state) do
state |> Map.update!(:queue, &[{action, pad, item} | &1])
end

defp handle_flowing_state_changed(ctx, state) do
{pause_or_resume_demand, state} = manage_input_demand(ctx, state)
{flush_queue_actions, state} = maybe_flush_queue(ctx, state)
{pause_or_resume_demand ++ flush_queue_actions, state}
end

defp manage_input_demand(ctx, %{input_demand_paused?: true} = state)
when not input_demand_should_be_paused?(ctx, state) do
{[resume_auto_demand: state.input], %{state | input_demand_paused?: false}}
end

defp manage_input_demand(ctx, %{input_demand_paused?: false} = state)
when input_demand_should_be_paused?(ctx, state) do
{[pause_auto_demand: state.input], %{state | input_demand_paused?: true}}
end

defp manage_input_demand(_ctx, state), do: {[], state}

defp maybe_flush_queue(ctx, state) when flowing?(ctx, state) do
actions =
state.queue
|> Enum.reverse()
|> Enum.map(fn
{:end_of_stream, _input, nil} -> {:end_of_stream, state.output}
{action, pad, item} -> {action, {opposite_pad(pad, state), item}}
end)

{actions, %{state | queue: nil}}
end

defp maybe_flush_queue(_ctx, state), do: {[], state}

defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output
defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input

defp maybe_notify_parent(:event, pad, event, %{notify_on_event?: true}),
do: [notify_parent: {:event, pad, event}]

defp maybe_notify_parent(:stream_format, pad, format, %{notify_on_stream_format?: true}),
do: [notify_parent: {:stream_format, pad, format}]

defp maybe_notify_parent(_type, _pad, _item, _state), do: []
end
124 changes: 124 additions & 0 deletions test/membrane/integration/forwarding_filter_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
defmodule Membrane.Integration.ForwardingFilterTest do
use ExUnit.Case, async: true

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.ForwardingFilter
alias Membrane.Testing

require Membrane.Pad, as: Pad

defmodule Format do
defstruct [:field]
end

defmodule Source do
use Membrane.Source
def_output_pad :output, accepted_format: _any, flow_control: :push

@impl true
def handle_parent_notification({action, item}, _ctx, state),
do: {[{action, {:output, item}}], state}
end

test "Membrane.ForwardingFilter buffers data until output pad is linked" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(:source, Source)
|> child(:filter, %ForwardingFilter{
notify_on_event?: true,
notify_on_stream_format?: true
})
)

data = generate_data(100, [:stream_format, :buffer, :event])

data
|> Enum.each(fn {type, item} ->
Testing.Pipeline.notify_child(pipeline, :source, {type, item})

if type in [:stream_format, :event] do
assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item})
end
end)

spec = get_child(:filter) |> child(:sink, Testing.Sink)
Testing.Pipeline.execute_actions(pipeline, spec: spec)

data
|> Enum.each(fn
{:buffer, item} -> assert_sink_buffer(pipeline, :sink, ^item)
{:event, item} -> assert_sink_event(pipeline, :sink, ^item)
{:stream_format, item} -> assert_sink_stream_format(pipeline, :sink, ^item)
end)

data = generate_data(100, [:stream_format, :buffer, :event], 200)

data
|> Enum.each(fn {type, item} ->
Testing.Pipeline.notify_child(pipeline, :source, {type, item})

if type in [:stream_format, :event] do
assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item})
end

case type do
:buffer -> assert_sink_buffer(pipeline, :sink, ^item)
:event -> assert_sink_event(pipeline, :sink, ^item)
:stream_format -> assert_sink_stream_format(pipeline, :sink, ^item)
end
end)

Testing.Pipeline.terminate(pipeline)
end

test "Membrane.ForwardingFilter pauses input demand if output pad is not linked" do
atomics_ref = :atomics.new(1, [])
:atomics.put(atomics_ref, 1, 0)

generator = fn _initial_state, _demand_size ->
:atomics.add(atomics_ref, 1, 1)
buffer = %Membrane.Buffer{payload: <<>>}
{[buffer: {:output, buffer}, redemand: :output], nil}
end

auto_demand_size = 20

spec =
child(:source, %Testing.Source{output: {nil, generator}})
|> via_in(:input, auto_demand_size: auto_demand_size)
|> child(:forwarding_filter, ForwardingFilter)

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

Process.sleep(500)

assert :atomics.get(atomics_ref, 1) == auto_demand_size

spec = get_child(:forwarding_filter) |> child(:sink, Testing.Sink)
Testing.Pipeline.execute_actions(pipeline, spec: spec)

Process.sleep(500)

assert :atomics.get(atomics_ref, 1) > auto_demand_size + 100

Testing.Pipeline.terminate(pipeline)
end

defp generate_data(number, types, pts_offset \\ 0) do
data =
1..(number - 1)
|> Enum.map(fn i ->
case Enum.random(types) do
:stream_format -> {:stream_format, %Format{field: i}}
:event -> {:event, %Testing.Event{}}
:buffer -> {:buffer, %Buffer{pts: i + pts_offset, payload: <<>>}}
end
end)

[stream_format: %Format{field: 0}] ++ data
end
end
Loading