Skip to content

Commit

Permalink
Merge pull request #6 from membraneframework/auto-demand
Browse files Browse the repository at this point in the history
auto demand
  • Loading branch information
mat-hek authored Feb 7, 2022
2 parents b1d6dcf + 3378c91 commit 73f4c38
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,50 +1,36 @@
defmodule Membrane.Funnel do
@moduledoc """
Element that can be used for collecting data from multiple inputs and sending it through one
output.
output. When a new input connects in the `:playing` state, the funnel sends
`Membrane.Funnel.NewInputEvent` via output.
"""
use Membrane.Filter

alias Membrane.Funnel

def_input_pad :input, demand_unit: :buffers, caps: :any, availability: :on_request
def_output_pad :output, caps: :any
def_input_pad :input, demand_mode: :auto, caps: :any, availability: :on_request
def_output_pad :output, caps: :any, demand_mode: :auto

def_options end_of_stream: [spec: :on_last_pad | :never, default: :on_last_pad]

@impl true
def handle_init(opts) do
{:ok, %{end_of_stream: opts.end_of_stream, demands: []}}
end

@impl true
def handle_demand(:output, _size, :buffers, _ctx, state) do
{{:ok, state.demands}, state}
{:ok, %{end_of_stream: opts.end_of_stream}}
end

@impl true
def handle_process(Pad.ref(:input, _id), buffer, _ctx, state) do
{{:ok, buffer: {:output, buffer}, redemand: :output}, state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id) = pad, %{playback_state: :playing}, state) do
demands = [{:demand, {pad, 1}} | state.demands]

{{:ok, [demand: {pad, 1}, event: {:output, %Funnel.NewInputEvent{}}]},
%{state | demands: demands}}
{{:ok, buffer: {:output, buffer}}, state}
end

@impl true
def handle_pad_added(Pad.ref(:input, _id) = pad, _ctx, state) do
demands = [{:demand, {pad, 1}} | state.demands]
{:ok, %{state | demands: demands}}
def handle_pad_added(Pad.ref(:input, _id), %{playback_state: :playing}, state) do
{{:ok, event: {:output, %Funnel.NewInputEvent{}}}, state}
end

@impl true
def handle_pad_removed(Pad.ref(:input, _id) = pad, _ctx, state) do
demands = List.delete(state.demands, {:demand, {pad, 1}})
{:ok, %{state | demands: demands}}
def handle_pad_added(Pad.ref(:input, _id), _ctx, state) do
{:ok, state}
end

@impl true
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Membrane.Funnel.Plugin.Mixfile do

defp deps do
[
{:membrane_core, "~> 0.8.0"},
{:membrane_core, github: "membraneframework/membrane_core", tag: "v0.9.0-rc.0"},
{:ex_doc, "~> 0.23", only: :dev, runtime: false},
{:dialyxir, "~> 1.0.0", only: :dev, runtime: false},
{:credo, "~> 1.5", only: :dev, runtime: false}
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"membrane_core": {:hex, :membrane_core, "0.8.1", "33df0e4c76c05a2be57042893b05516886462baefdf95b7299e4d2f5db32dea3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5574db35f08de1e95648f9c3acce38c5f318216c5a5ae5e3f253f7edad8fdc1b"},
"membrane_core": {:git, "https://github.com/membraneframework/membrane_core.git", "5f5dd761bd2dd980b0b891cb83f6633f893a7c4e", [tag: "v0.9.0-rc.0"]},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
"qex": {:hex, :qex, "0.5.0", "5a3a9becf67d4006377c4c247ffdaaa8ae5b3634a0caadb788dc24d6125068f4", [:mix], [], "hexpm", "4ad6f6421163cd8204509a119a5c9813cbb969cfb8d802a9dc49b968bffbac2a"},
Expand Down
File renamed without changes.

0 comments on commit 73f4c38

Please sign in to comment.