Skip to content

Commit 814f10b

Browse files
committed
Fix linking timeout mechanism
1 parent e4cfff8 commit 814f10b

File tree

6 files changed

+28
-42
lines changed

6 files changed

+28
-42
lines changed

lib/membrane/bin/pad_data.ex

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ defmodule Membrane.Bin.PadData do
2727
link_id: private_field,
2828
endpoint: private_field,
2929
linked?: private_field,
30-
response_received?: private_field,
31-
linking_timeout_ref: private_field
30+
response_received?: private_field
3231
}
3332

3433
@enforce_keys [
@@ -44,5 +43,5 @@ defmodule Membrane.Bin.PadData do
4443
:spec_ref
4544
]
4645

47-
defstruct @enforce_keys ++ [:linking_timeout_ref]
46+
defstruct @enforce_keys
4847
end

lib/membrane/core/bin.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do
209209
{:noreply, state}
210210
end
211211

212-
defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do
213-
state = PadController.handle_linking_timeout(pad_ref, timeout_ref, state)
212+
defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
213+
state = PadController.handle_linking_timeout(pad_ref, state)
214214
{:noreply, state}
215215
end
216216

lib/membrane/core/bin/pad_controller.ex

+13-21
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,7 @@ defmodule Membrane.Core.Bin.PadController do
7171

7272
state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
7373

74-
state =
75-
if PadModel.get_data!(state, pad_ref, :endpoint) == nil do
76-
# If there's no endpoint associated to the pad, no internal link to the pad
77-
# has been requested in the bin yet
78-
79-
linking_timeout_ref = make_ref()
80-
message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref])
81-
Process.send_after(self(), message, 5000)
82-
83-
PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref)
84-
else
85-
state
86-
end
74+
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)
8775

8876
maybe_handle_pad_added(pad_ref, state)
8977
end
@@ -108,15 +96,19 @@ defmodule Membrane.Core.Bin.PadController do
10896
end
10997
end
11098

111-
@spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: State.t()
112-
def handle_linking_timeout(pad_ref, timeout_ref, state) do
113-
map_set_item = {pad_ref, timeout_ref}
99+
@spec handle_linking_timeout(Pad.ref(), State.t()) :: State.t() | no_return()
100+
def handle_linking_timeout(pad_ref, state) do
101+
case Map.fetch(state.linking_timeout_counters, pad_ref) do
102+
{:ok, 1} ->
103+
Map.update!(state, :linking_timeout_counters, &Map.delete(&1, pad_ref))
114104

115-
if MapSet.member?(state.initialized_internal_pads, map_set_item) do
116-
Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item))
117-
else
118-
raise Membrane.LinkError,
119-
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}"
105+
{:ok, counter} when counter > 1 ->
106+
put_in(state.linking_timeout_counters[pad_ref], counter - 1)
107+
108+
_else ->
109+
raise Membrane.LinkError, """
110+
Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}
111+
"""
120112
end
121113
end
122114

lib/membrane/core/bin/state.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule Membrane.Core.Bin.State do
4545
terminating?: boolean(),
4646
resource_guard: Membrane.ResourceGuard.t(),
4747
setup_incomplete?: boolean(),
48-
initialized_internal_pads: MapSet.t(Pad.ref()),
48+
linking_timeout_counters: %{optional(Pad.ref()) => integer()},
4949
stalker: Membrane.Core.Stalker.t()
5050
}
5151

@@ -77,6 +77,6 @@ defmodule Membrane.Core.Bin.State do
7777
resource_guard: nil,
7878
subprocess_supervisor: nil,
7979
children_log_metadata: [],
80-
initialized_internal_pads: MapSet.new(),
80+
linking_timeout_counters: %{},
8181
pads_data: nil
8282
end

lib/membrane/core/parent/child_life_controller.ex

+8-14
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ defmodule Membrane.Core.Parent.ChildLifeController do
1717
alias Membrane.Pad
1818
alias Membrane.ParentError
1919

20-
require Membrane.Core.Child.PadModel, as: PadModel
2120
require Membrane.Core.Component
2221
require Membrane.Core.Message, as: Message
2322
require Membrane.Logger
@@ -313,22 +312,17 @@ defmodule Membrane.Core.Parent.ChildLifeController do
313312
defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
314313
state =
315314
with %Bin.State{} <- state do
316-
initialized_internal_pads =
315+
linking_timeout_counters =
317316
spec_data.links_ids
318317
|> Enum.map(&Map.fetch!(state.links, &1))
319318
|> Enum.flat_map(&[&1.from, &1.to])
320-
|> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} ->
321-
with {Membrane.Bin, :itself} <- child,
322-
{:ok, timeout_ref} when timeout_ref != nil <-
323-
PadModel.get_data(state, pad_ref, :linking_timeout_ref) do
324-
[{pad_ref, timeout_ref}]
325-
else
326-
_other -> []
327-
end
328-
end)
329-
|> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1))
330-
331-
%{state | initialized_internal_pads: initialized_internal_pads}
319+
|> Enum.filter(&(&1.child == {Membrane.Bin, :itself}))
320+
|> Enum.reduce(
321+
state.linking_timeout_counters,
322+
&Map.update(&2, &1.pad_ref, 1, fn i -> i + 1 end)
323+
)
324+
325+
%{state | linking_timeout_counters: linking_timeout_counters}
332326
end
333327

334328
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)

test/membrane/integration/linking_test.exs

+1
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ defmodule Membrane.Integration.LinkingTest do
353353
Membrane.Pipeline.terminate(pipeline)
354354
end
355355

356+
@tag :xd
356357
test "Bin should crash if it doesn't link internally within timeout" do
357358
defmodule NoInternalLinkBin do
358359
use Membrane.Bin

0 commit comments

Comments
 (0)