|
| 1 | +defmodule Membrane.Integration.ForwardingFilterTest do |
| 2 | + use ExUnit.Case, async: true |
| 3 | + |
| 4 | + import Membrane.ChildrenSpec |
| 5 | + import Membrane.Testing.Assertions |
| 6 | + |
| 7 | + alias Membrane.Buffer |
| 8 | + alias Membrane.ForwardingFilter |
| 9 | + alias Membrane.Testing |
| 10 | + |
| 11 | + defmodule Format do |
| 12 | + defstruct [:field] |
| 13 | + end |
| 14 | + |
| 15 | + defmodule Event do |
| 16 | + @derive Membrane.EventProtocol |
| 17 | + defstruct [:field] |
| 18 | + end |
| 19 | + |
| 20 | + defmodule Source do |
| 21 | + use Membrane.Source |
| 22 | + def_output_pad :output, accepted_format: _any, flow_control: :push |
| 23 | + |
| 24 | + @impl true |
| 25 | + def handle_parent_notifications({action, item}, _ctx, state), |
| 26 | + do: {[{action, {:output, item}}], state} |
| 27 | + end |
| 28 | + |
| 29 | + test "ForwardingFilter buffers data until output pad is linked" do |
| 30 | + pipeline = |
| 31 | + Testing.Pipeline.start_link_supervised!( |
| 32 | + spec: |
| 33 | + child(:source, Source) |
| 34 | + |> child(:filter, %ForwardingFilter{ |
| 35 | + notify_on_event?: true, |
| 36 | + notify_on_stream_format?: true |
| 37 | + }) |
| 38 | + ) |
| 39 | + |
| 40 | + data = generate_data(100, [:stream_format, :buffer, :event]) |
| 41 | + |
| 42 | + data |
| 43 | + |> Enum.each(fn {type, item} -> |
| 44 | + Testing.Pipeline.notify_child(pipeline, :source, {type, item}) |
| 45 | + assert_pipeline_notified(pipeline, :filter, {^type, :input, ^item}) |
| 46 | + end) |
| 47 | + |
| 48 | + Testing.Pipeline.execute_actions( |
| 49 | + spec: |
| 50 | + get_child(:filter) |
| 51 | + |> child(:sink, Testing.Sink) |
| 52 | + ) |
| 53 | + |
| 54 | + data |
| 55 | + |> Enum.each(fn {type, item} -> |
| 56 | + case type do |
| 57 | + :buffer -> assert_sink_buffer(pipeline, :sink, ^item) |
| 58 | + :event -> assert_sink_event(pipeline, :sink, ^item) |
| 59 | + :stream_format -> assert_sink_stream_format(pipeline, :sink, ^item) |
| 60 | + end |
| 61 | + end) |
| 62 | + |
| 63 | + data = generate_data(100, [:stream_format, :buffer, :event], 200) |
| 64 | + end |
| 65 | + |
| 66 | + defp generate_data(number, types, pts_offset \\ 0) do |
| 67 | + data = |
| 68 | + 1..(number - 1) |
| 69 | + |> Enum.map(fn i -> |
| 70 | + case Enum.random(types) do |
| 71 | + :stream_format -> {:stream_format, %Format{field: i}} |
| 72 | + :event -> {:event, %Event{field: i}} |
| 73 | + :buffer -> {:buffer, %Buffer{pts: i + pts_offset, payload: <<>>}} |
| 74 | + end |
| 75 | + end) |
| 76 | + |
| 77 | + [stream_format: %Format{field: 0}] ++ data |
| 78 | + end |
| 79 | +end |
0 commit comments