From 6c268121908c42d73e1fe9e6f9390ed7f00e8f01 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 18 Dec 2024 14:06:00 +0100 Subject: [PATCH] Incorporate Funnel --- lib/membrane/funnel.ex | 65 ++++++++++++++++++++++++++ lib/membrane/funnel/new_input_event.ex | 9 ++++ 2 files changed, 74 insertions(+) create mode 100644 lib/membrane/funnel.ex create mode 100644 lib/membrane/funnel/new_input_event.ex diff --git a/lib/membrane/funnel.ex b/lib/membrane/funnel.ex new file mode 100644 index 000000000..bc90e6e78 --- /dev/null +++ b/lib/membrane/funnel.ex @@ -0,0 +1,65 @@ +defmodule Membrane.Funnel do + @moduledoc """ + Element that can be used for collecting data from multiple inputs and sending it through one + output. When a new input connects in the `:playing` state, the funnel sends + `Membrane.Funnel.NewInputEvent` via output. + """ + use Membrane.Filter + + alias Membrane.Funnel + + def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request + def_output_pad :output, accepted_format: _any, flow_control: :auto + + def_options end_of_stream: [spec: :on_last_pad | :on_first_pad | :never, default: :on_last_pad] + + @impl true + def handle_init(_ctx, opts) do + {[], %{end_of_stream: opts.end_of_stream}} + end + + @impl true + def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do + {[buffer: {:output, buffer}], state} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do + {[event: {:output, %Funnel.NewInputEvent{}}], state} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do + {[], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do + {[], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do + if ctx.pads.output.end_of_stream? do + {[], state} + else + {[end_of_stream: :output], state} + end + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do + if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do + {[end_of_stream: :output], state} + else + {[], state} + end + end + + defp inputs_data(ctx) do + Enum.flat_map(ctx.pads, fn + {Pad.ref(:input, _id), data} -> [data] + _output -> [] + end) + end +end diff --git a/lib/membrane/funnel/new_input_event.ex b/lib/membrane/funnel/new_input_event.ex new file mode 100644 index 000000000..22f6455e0 --- /dev/null +++ b/lib/membrane/funnel/new_input_event.ex @@ -0,0 +1,9 @@ +defmodule Membrane.Funnel.NewInputEvent do + @moduledoc """ + Event sent each time new element is linked (via funnel input pad) after playing pipeline. + """ + @derive Membrane.EventProtocol + + @type t :: %__MODULE__{} + defstruct [] +end