Skip to content

Commit d30a2b9

Browse files
committed
Fix forwarding filter test
1 parent 35eecce commit d30a2b9

File tree

2 files changed

+52
-23
lines changed

2 files changed

+52
-23
lines changed

lib/membrane/forwarding_filter.ex

+25-8
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,26 @@ defmodule Membrane.ForwardingFilter do
1111
availability: :on_request,
1212
max_instances: 1
1313

14-
def_options notify_on_stream_format?: [default: false], notify_on_event?: [default: false]
14+
def_options notify_on_stream_format?: [
15+
description: """
16+
If `true`, #{inspect(__MODULE__)} will send `{:stream_format, pad_ref, stream_format}` \
17+
notification to parent on every stream format arrival.
18+
19+
Defaults to `false`.
20+
""",
21+
spec: boolean(),
22+
default: false
23+
],
24+
notify_on_event?: [
25+
description: """
26+
If `true`, #{inspect(__MODULE__)} will send `{:event, pad_ref, event}` \
27+
notification to parent on every event arrival.
28+
29+
Defaults to `false`.
30+
""",
31+
spec: boolean(),
32+
default: false
33+
]
1534

1635
defguardp flowing?(ctx, state)
1736
when ctx.playback == :playing and state.input != nil and state.output != nil
@@ -66,11 +85,7 @@ defmodule Membrane.ForwardingFilter do
6685
state |> Map.update!(:queue, &[{action, pad, item} | &1])
6786
end
6887

69-
defp flush_queue_if_flowing(ctx, state) do
70-
if flowing?(ctx, state), do: flush_queue(ctx, state), else: {[], state}
71-
end
72-
73-
defp flush_queue(ctx, state) when flowing?(ctx, state) do
88+
defp flush_queue_if_flowing(ctx, state) when flowing?(ctx, state) do
7489
actions =
7590
state.queue
7691
|> Enum.reverse()
@@ -82,14 +97,16 @@ defmodule Membrane.ForwardingFilter do
8297
{actions, %{state | queue: nil}}
8398
end
8499

100+
defp flush_queue_if_flowing(_ctx, state), do: {[], state}
101+
85102
defp opposite_pad(Pad.ref(:input, _ref), state), do: state.output
86103
defp opposite_pad(Pad.ref(:output, _ref), state), do: state.input
87104

88105
defp maybe_notify_parent(:event, pad, event, %{notify_on_event?: true}),
89-
do: [notiy_parent: {:event, pad, event}]
106+
do: [notify_parent: {:event, pad, event}]
90107

91108
defp maybe_notify_parent(:stream_format, pad, format, %{notify_on_stream_format?: true}),
92-
do: [notiy_parent: {:stream_format, pad, format}]
109+
do: [notify_parent: {:stream_format, pad, format}]
93110

94111
defp maybe_notify_parent(_type, _pad, _item, _state), do: []
95112
end

test/membrane/integration/forwarding_filter_test.exs

+27-15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ defmodule Membrane.Integration.ForwardingFilterTest do
44
import Membrane.ChildrenSpec
55
import Membrane.Testing.Assertions
66

7+
require Membrane.Pad, as: Pad
8+
79
alias Membrane.Buffer
810
alias Membrane.ForwardingFilter
911
alias Membrane.Testing
@@ -12,21 +14,16 @@ defmodule Membrane.Integration.ForwardingFilterTest do
1214
defstruct [:field]
1315
end
1416

15-
defmodule Event do
16-
@derive Membrane.EventProtocol
17-
defstruct [:field]
18-
end
19-
2017
defmodule Source do
2118
use Membrane.Source
2219
def_output_pad :output, accepted_format: _any, flow_control: :push
2320

2421
@impl true
25-
def handle_parent_notifications({action, item}, _ctx, state),
22+
def handle_parent_notification({action, item}, _ctx, state),
2623
do: {[{action, {:output, item}}], state}
2724
end
2825

29-
test "ForwardingFilter buffers data until output pad is linked" do
26+
test "Membrane.ForwardingFilter buffers data until output pad is linked" do
3027
pipeline =
3128
Testing.Pipeline.start_link_supervised!(
3229
spec:
@@ -42,25 +39,40 @@ defmodule Membrane.Integration.ForwardingFilterTest do
4239
data
4340
|> Enum.each(fn {type, item} ->
4441
Testing.Pipeline.notify_child(pipeline, :source, {type, item})
45-
assert_pipeline_notified(pipeline, :filter, {^type, :input, ^item})
42+
43+
if type in [:stream_format, :event] do
44+
assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item})
45+
end
46+
end)
47+
48+
spec = get_child(:filter) |> child(:sink, Testing.Sink)
49+
Testing.Pipeline.execute_actions(pipeline, spec: spec)
50+
51+
data
52+
|> Enum.each(fn
53+
{:buffer, item} -> assert_sink_buffer(pipeline, :sink, ^item)
54+
{:event, item} -> assert_sink_event(pipeline, :sink, ^item)
55+
{:stream_format, item} -> assert_sink_stream_format(pipeline, :sink, ^item)
4656
end)
4757

48-
Testing.Pipeline.execute_actions(
49-
spec:
50-
get_child(:filter)
51-
|> child(:sink, Testing.Sink)
52-
)
58+
data = generate_data(100, [:stream_format, :buffer, :event], 200)
5359

5460
data
5561
|> Enum.each(fn {type, item} ->
62+
Testing.Pipeline.notify_child(pipeline, :source, {type, item})
63+
64+
if type in [:stream_format, :event] do
65+
assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item})
66+
end
67+
5668
case type do
5769
:buffer -> assert_sink_buffer(pipeline, :sink, ^item)
5870
:event -> assert_sink_event(pipeline, :sink, ^item)
5971
:stream_format -> assert_sink_stream_format(pipeline, :sink, ^item)
6072
end
6173
end)
6274

63-
data = generate_data(100, [:stream_format, :buffer, :event], 200)
75+
Testing.Pipeline.terminate(pipeline)
6476
end
6577

6678
defp generate_data(number, types, pts_offset \\ 0) do
@@ -69,7 +81,7 @@ defmodule Membrane.Integration.ForwardingFilterTest do
6981
|> Enum.map(fn i ->
7082
case Enum.random(types) do
7183
:stream_format -> {:stream_format, %Format{field: i}}
72-
:event -> {:event, %Event{field: i}}
84+
:event -> {:event, %Testing.Event{}}
7385
:buffer -> {:buffer, %Buffer{pts: i + pts_offset, payload: <<>>}}
7486
end
7587
end)

0 commit comments

Comments
 (0)