diff --git a/CHANGELOG.md b/CHANGELOG.md index 04d410f1e..6ae09419e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.2.0 * Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876) * Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904) + * Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) diff --git a/lib/membrane/fake_sink.ex b/lib/membrane/fake_sink.ex new file mode 100644 index 000000000..add0ba38b --- /dev/null +++ b/lib/membrane/fake_sink.ex @@ -0,0 +1,12 @@ +defmodule Membrane.Fake.Sink do + @moduledoc """ + Membrane Sink that ignores incoming data. + """ + + use Membrane.Sink + + def_input_pad :input, accepted_format: _any + + @impl true + def handle_buffer(:input, _buffer, _ctx, state), do: {[], state} +end diff --git a/lib/membrane/funnel.ex b/lib/membrane/funnel.ex new file mode 100644 index 000000000..ecb11a3db --- /dev/null +++ b/lib/membrane/funnel.ex @@ -0,0 +1,66 @@ +defmodule Membrane.Funnel do + @moduledoc """ + Element that can be used for collecting data from multiple inputs and sending it through one + output. + + When a new input connects in the `:playing` state, the funnel sends + `#{inspect(__MODULE__)}.NewInputEvent` via output. + """ + + use Membrane.Filter + + def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request + def_output_pad :output, accepted_format: _any, flow_control: :auto + + def_options end_of_stream: [spec: :on_last_pad | :on_first_pad | :never, default: :on_last_pad] + + @impl true + def handle_init(_ctx, opts) do + {[], %{end_of_stream: opts.end_of_stream}} + end + + @impl true + def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do + {[buffer: {:output, buffer}], state} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do + {[event: {:output, %__MODULE__.NewInputEvent{}}], state} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do + {[], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do + {[], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do + if ctx.pads.output.end_of_stream? do + {[], state} + else + {[end_of_stream: :output], state} + end + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do + if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do + {[end_of_stream: :output], state} + else + {[], state} + end + end + + defp inputs_data(ctx) do + Enum.flat_map(ctx.pads, fn + {Pad.ref(:input, _id), data} -> [data] + _output -> [] + end) + end +end diff --git a/lib/membrane/funnel/new_input_event.ex b/lib/membrane/funnel/new_input_event.ex new file mode 100644 index 000000000..22f6455e0 --- /dev/null +++ b/lib/membrane/funnel/new_input_event.ex @@ -0,0 +1,9 @@ +defmodule Membrane.Funnel.NewInputEvent do + @moduledoc """ + Event sent each time new element is linked (via funnel input pad) after playing pipeline. + """ + @derive Membrane.EventProtocol + + @type t :: %__MODULE__{} + defstruct [] +end diff --git a/lib/membrane/tee.ex b/lib/membrane/tee.ex new file mode 100644 index 000000000..b227edcd8 --- /dev/null +++ b/lib/membrane/tee.ex @@ -0,0 +1,75 @@ +defmodule Membrane.Tee do + @moduledoc """ + Element for forwarding buffers to at least one output pad + + It has one input pad `:input` and 2 output pads: + * `:output` - is a dynamic pad which is always available and works in pull mode + * `:push_output` - is a dynamic pad that can be linked to any number of elements (including 0) and works + in push mode + + The `:output` pads dictate the speed of processing data and any element (or elements) connected to + `:push_output` pad will receive the same data as all `:output` instances. + """ + use Membrane.Filter, flow_control_hints?: false + + require Membrane.Logger + + def_input_pad :input, + availability: :always, + flow_control: :auto, + accepted_format: _any + + def_output_pad :output, + availability: :on_request, + flow_control: :auto, + accepted_format: _any + + def_output_pad :push_output, + availability: :on_request, + flow_control: :push, + accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{stream_format: nil}} + end + + @impl true + def handle_playing(ctx, state) do + if map_size(ctx.pads) < 2 do + Membrane.Logger.debug(""" + #{inspect(__MODULE__)} enters :playing playback without any output (:output or :push_output) \ + pads linked. + """) + end + + {[], state} + end + + @impl true + def handle_stream_format(:input, stream_format, _ctx, state) do + {[forward: stream_format], %{state | stream_format: stream_format}} + end + + @impl true + def handle_pad_added(Pad.ref(name, _ref) = output_pad, ctx, state) + when name in [:output, :push_output] do + maybe_stream_format = + case state.stream_format do + nil -> [] + stream_format -> [stream_format: {output_pad, stream_format}] + end + + maybe_eos = + if ctx.pads.input.end_of_stream?, + do: [end_of_stream: output_pad], + else: [] + + {maybe_stream_format ++ maybe_eos, state} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + {[forward: buffer], state} + end +end diff --git a/test/membrane/integration/effective_flow_control_resolution_test.exs b/test/membrane/integration/effective_flow_control_resolution_test.exs index 894207fef..ca95853ed 100644 --- a/test/membrane/integration/effective_flow_control_resolution_test.exs +++ b/test/membrane/integration/effective_flow_control_resolution_test.exs @@ -7,7 +7,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do alias Membrane.Testing defmodule AutoFilter do - use Membrane.Filter + use Membrane.Filter, flow_control_hints?: false def_input_pad :input, availability: :on_request, accepted_format: _any def_output_pad :output, availability: :on_request, accepted_format: _any diff --git a/test/membrane/integration/funnel_test.exs b/test/membrane/integration/funnel_test.exs new file mode 100644 index 000000000..fb9ca5739 --- /dev/null +++ b/test/membrane/integration/funnel_test.exs @@ -0,0 +1,33 @@ +defmodule Membrane.Integration.FunnelTest do + use ExUnit.Case + + import Membrane.Testing.Assertions + + alias Membrane.{Buffer, Funnel, Testing} + + test "Collects multiple inputs" do + import Membrane.ChildrenSpec + data = 1..10 + + {:ok, _supervisor_pid, pipeline} = + Testing.Pipeline.start_link( + spec: [ + child(:funnel, Funnel), + child(:src1, %Testing.Source{output: data}) |> get_child(:funnel), + child(:src2, %Testing.Source{output: data}) |> get_child(:funnel), + get_child(:funnel) |> child(:sink, Testing.Sink) + ] + ) + + data + |> Enum.flat_map(&[&1, &1]) + |> Enum.each(fn payload -> + assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^payload}) + end) + + assert_end_of_stream(pipeline, :sink) + refute_sink_buffer(pipeline, :sink, _buffer, 0) + + Membrane.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/integration/tee_test.exs b/test/membrane/integration/tee_test.exs new file mode 100644 index 000000000..1c98ea342 --- /dev/null +++ b/test/membrane/integration/tee_test.exs @@ -0,0 +1,55 @@ +defmodule Membrane.Integration.TeeTest do + @moduledoc false + use ExUnit.Case, async: true + use Bunch + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Buffer + alias Membrane.Testing.{Pipeline, Sink, Source} + + test "forwards input to three all outputs" do + range = 1..100 + sinks = [:sink1, :sink2, :sink3, :sink_4] + + spec = + [ + child(:src, %Source{output: range}) + |> child(:tee, Membrane.Tee) + ] ++ + for sink <- sinks do + pad = if sink in [:sink1, :sink2], do: :output, else: :push_output + + get_child(:tee) + |> via_out(pad) + |> child(sink, %Sink{}) + end + + pipeline = Pipeline.start_link_supervised!(spec: spec) + + for sink <- sinks do + assert_end_of_stream(pipeline, ^sink, :input) + end + + for element <- range, sink <- sinks do + assert_sink_buffer(pipeline, sink, %Buffer{payload: ^element}) + end + + for {pad, sink} <- [push_output: :sink5, output: :sink6] do + spec = + get_child(:tee) + |> via_out(pad) + |> child(sink, %Sink{}) + + Pipeline.execute_actions(pipeline, spec: spec) + end + + for sink <- [:sink5, :sink6] do + assert_sink_stream_format(pipeline, sink, %Membrane.RemoteStream{}) + assert_end_of_stream(pipeline, ^sink, :input) + end + + Pipeline.terminate(pipeline) + end +end