Skip to content

Commit

Permalink
Incorporate Funnel, Tee and Fake.Sink (#922)
Browse files Browse the repository at this point in the history
* Incorporate Funnel

* Incorporate Tee
  • Loading branch information
FelonEkonom authored Dec 23, 2024
1 parent 8505648 commit 5d6d7e7
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.2.0
* Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876)
* Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904)
* Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
12 changes: 12 additions & 0 deletions lib/membrane/fake_sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Membrane.Fake.Sink do
@moduledoc """
Membrane Sink that ignores incoming data.
"""

use Membrane.Sink

def_input_pad :input, accepted_format: _any

@impl true
def handle_buffer(:input, _buffer, _ctx, state), do: {[], state}
end
66 changes: 66 additions & 0 deletions lib/membrane/funnel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Membrane.Funnel do
@moduledoc """
Element that can be used for collecting data from multiple inputs and sending it through one
output.
When a new input connects in the `:playing` state, the funnel sends
`#{inspect(__MODULE__)}.NewInputEvent` via output.
"""

use Membrane.Filter

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto

def_options end_of_stream: [spec: :on_last_pad | :on_first_pad | :never, default: :on_last_pad]

@impl true
def handle_init(_ctx, opts) do
{[], %{end_of_stream: opts.end_of_stream}}
end

@impl true
def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do
{[buffer: {:output, buffer}], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do
{[event: {:output, %__MODULE__.NewInputEvent{}}], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do
{[], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do
{[], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do
if ctx.pads.output.end_of_stream? do
{[], state}
else
{[end_of_stream: :output], state}
end
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do
if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do
{[end_of_stream: :output], state}
else
{[], state}
end
end

defp inputs_data(ctx) do
Enum.flat_map(ctx.pads, fn
{Pad.ref(:input, _id), data} -> [data]
_output -> []
end)
end
end
9 changes: 9 additions & 0 deletions lib/membrane/funnel/new_input_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Membrane.Funnel.NewInputEvent do
@moduledoc """
Event sent each time new element is linked (via funnel input pad) after playing pipeline.
"""
@derive Membrane.EventProtocol

@type t :: %__MODULE__{}
defstruct []
end
75 changes: 75 additions & 0 deletions lib/membrane/tee.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
defmodule Membrane.Tee do
@moduledoc """
Element for forwarding buffers to at least one output pad
It has one input pad `:input` and 2 output pads:
* `:output` - is a dynamic pad which is always available and works in pull mode
* `:push_output` - is a dynamic pad that can be linked to any number of elements (including 0) and works
in push mode
The `:output` pads dictate the speed of processing data and any element (or elements) connected to
`:push_output` pad will receive the same data as all `:output` instances.
"""
use Membrane.Filter, flow_control_hints?: false

require Membrane.Logger

def_input_pad :input,
availability: :always,
flow_control: :auto,
accepted_format: _any

def_output_pad :output,
availability: :on_request,
flow_control: :auto,
accepted_format: _any

def_output_pad :push_output,
availability: :on_request,
flow_control: :push,
accepted_format: _any

@impl true
def handle_init(_ctx, _opts) do
{[], %{stream_format: nil}}
end

@impl true
def handle_playing(ctx, state) do
if map_size(ctx.pads) < 2 do
Membrane.Logger.debug("""
#{inspect(__MODULE__)} enters :playing playback without any output (:output or :push_output) \
pads linked.
""")
end

{[], state}
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
{[forward: stream_format], %{state | stream_format: stream_format}}
end

@impl true
def handle_pad_added(Pad.ref(name, _ref) = output_pad, ctx, state)
when name in [:output, :push_output] do
maybe_stream_format =
case state.stream_format do
nil -> []
stream_format -> [stream_format: {output_pad, stream_format}]
end

maybe_eos =
if ctx.pads.input.end_of_stream?,
do: [end_of_stream: output_pad],
else: []

{maybe_stream_format ++ maybe_eos, state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
{[forward: buffer], state}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do
alias Membrane.Testing

defmodule AutoFilter do
use Membrane.Filter
use Membrane.Filter, flow_control_hints?: false

def_input_pad :input, availability: :on_request, accepted_format: _any
def_output_pad :output, availability: :on_request, accepted_format: _any
Expand Down
33 changes: 33 additions & 0 deletions test/membrane/integration/funnel_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Membrane.Integration.FunnelTest do
use ExUnit.Case

import Membrane.Testing.Assertions

alias Membrane.{Buffer, Funnel, Testing}

test "Collects multiple inputs" do
import Membrane.ChildrenSpec
data = 1..10

{:ok, _supervisor_pid, pipeline} =
Testing.Pipeline.start_link(
spec: [
child(:funnel, Funnel),
child(:src1, %Testing.Source{output: data}) |> get_child(:funnel),
child(:src2, %Testing.Source{output: data}) |> get_child(:funnel),
get_child(:funnel) |> child(:sink, Testing.Sink)
]
)

data
|> Enum.flat_map(&[&1, &1])
|> Enum.each(fn payload ->
assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^payload})
end)

assert_end_of_stream(pipeline, :sink)
refute_sink_buffer(pipeline, :sink, _buffer, 0)

Membrane.Pipeline.terminate(pipeline)
end
end
55 changes: 55 additions & 0 deletions test/membrane/integration/tee_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Membrane.Integration.TeeTest do
@moduledoc false
use ExUnit.Case, async: true
use Bunch

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.Testing.{Pipeline, Sink, Source}

test "forwards input to three all outputs" do
range = 1..100
sinks = [:sink1, :sink2, :sink3, :sink_4]

spec =
[
child(:src, %Source{output: range})
|> child(:tee, Membrane.Tee)
] ++
for sink <- sinks do
pad = if sink in [:sink1, :sink2], do: :output, else: :push_output

get_child(:tee)
|> via_out(pad)
|> child(sink, %Sink{})
end

pipeline = Pipeline.start_link_supervised!(spec: spec)

for sink <- sinks do
assert_end_of_stream(pipeline, ^sink, :input)
end

for element <- range, sink <- sinks do
assert_sink_buffer(pipeline, sink, %Buffer{payload: ^element})
end

for {pad, sink} <- [push_output: :sink5, output: :sink6] do
spec =
get_child(:tee)
|> via_out(pad)
|> child(sink, %Sink{})

Pipeline.execute_actions(pipeline, spec: spec)
end

for sink <- [:sink5, :sink6] do
assert_sink_stream_format(pipeline, sink, %Membrane.RemoteStream{})
assert_end_of_stream(pipeline, ^sink, :input)
end

Pipeline.terminate(pipeline)
end
end

0 comments on commit 5d6d7e7

Please sign in to comment.