Skip to content

Commit

Permalink
Incorporate Funnel
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Dec 18, 2024
1 parent 8505648 commit 6c26812
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
65 changes: 65 additions & 0 deletions lib/membrane/funnel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule Membrane.Funnel do
@moduledoc """
Element that can be used for collecting data from multiple inputs and sending it through one
output. When a new input connects in the `:playing` state, the funnel sends
`Membrane.Funnel.NewInputEvent` via output.
"""
use Membrane.Filter

alias Membrane.Funnel

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto

def_options end_of_stream: [spec: :on_last_pad | :on_first_pad | :never, default: :on_last_pad]

@impl true
def handle_init(_ctx, opts) do
{[], %{end_of_stream: opts.end_of_stream}}
end

@impl true
def handle_buffer(Pad.ref(:input, _id), buffer, _ctx, state) do
{[buffer: {:output, buffer}], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do
{[event: {:output, %Funnel.NewInputEvent{}}], state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do
{[], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), _ctx, %{end_of_stream: :never} = state) do
{[], state}
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_first_pad} = state) do
if ctx.pads.output.end_of_stream? do
{[], state}
else
{[end_of_stream: :output], state}
end
end

@impl true
def handle_end_of_stream(Pad.ref(:input, _id), ctx, %{end_of_stream: :on_last_pad} = state) do
if ctx |> inputs_data() |> Enum.all?(& &1.end_of_stream?) do
{[end_of_stream: :output], state}
else
{[], state}
end
end

defp inputs_data(ctx) do
Enum.flat_map(ctx.pads, fn
{Pad.ref(:input, _id), data} -> [data]
_output -> []
end)
end
end
9 changes: 9 additions & 0 deletions lib/membrane/funnel/new_input_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Membrane.Funnel.NewInputEvent do
@moduledoc """
Event sent each time new element is linked (via funnel input pad) after playing pipeline.
"""
@derive Membrane.EventProtocol

@type t :: %__MODULE__{}
defstruct []
end

0 comments on commit 6c26812

Please sign in to comment.