Skip to content

Commit c3bb2fb

Browse files
committed
Modify 5s linking timeout constraint
1 parent eae44ae commit c3bb2fb

File tree

5 files changed

+60
-28
lines changed

5 files changed

+60
-28
lines changed

lib/membrane/bin/pad_data.ex

+3-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ defmodule Membrane.Bin.PadData do
2727
link_id: private_field,
2828
endpoint: private_field,
2929
linked?: private_field,
30-
response_received?: private_field
30+
response_received?: private_field,
31+
linking_timeout_ref: private_field
3132
}
3233

3334
@enforce_keys [
@@ -43,5 +44,5 @@ defmodule Membrane.Bin.PadData do
4344
:spec_ref
4445
]
4546

46-
defstruct @enforce_keys
47+
defstruct @enforce_keys ++ [:linking_timeout_ref]
4748
end

lib/membrane/core/bin.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do
209209
{:noreply, state}
210210
end
211211

212-
defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
213-
PadController.handle_linking_timeout(pad_ref, state)
212+
defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do
213+
PadController.handle_linking_timeout(pad_ref, timeout_ref, state)
214214
{:noreply, state}
215215
end
216216

lib/membrane/core/bin/pad_controller.ex

+25-19
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,22 @@ defmodule Membrane.Core.Bin.PadController do
7070
end
7171

7272
state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
73-
state = maybe_handle_pad_added(pad_ref, state)
7473

75-
unless PadModel.get_data!(state, pad_ref, :endpoint) do
76-
# If there's no endpoint associated to the pad, no internal link to the pad
77-
# has been requested in the bin yet
78-
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)
79-
:ok
80-
end
74+
state =
75+
if PadModel.get_data!(state, pad_ref, :endpoint) == nil do
76+
# If there's no endpoint associated to the pad, no internal link to the pad
77+
# has been requested in the bin yet
8178

82-
state
79+
linking_timeout_ref = make_ref()
80+
message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref])
81+
Process.send_after(self(), message, 5000)
82+
83+
PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref)
84+
else
85+
state
86+
end
87+
88+
maybe_handle_pad_added(pad_ref, state)
8389
end
8490

8591
@spec remove_pad(Pad.ref(), State.t()) :: State.t()
@@ -102,15 +108,15 @@ defmodule Membrane.Core.Bin.PadController do
102108
end
103109
end
104110

105-
@spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return()
106-
def handle_linking_timeout(pad_ref, state) do
107-
case PadModel.get_data(state, pad_ref) do
108-
{:ok, %{endpoint: nil} = pad_data} ->
109-
raise Membrane.LinkError,
110-
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}"
111+
@spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return()
112+
def handle_linking_timeout(pad_ref, timeout_ref, state) do
113+
map_set_item = {pad_ref, timeout_ref}
111114

112-
_other ->
113-
:ok
115+
if MapSet.member?(state.initialized_internal_pads, map_set_item) do
116+
Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item))
117+
else
118+
raise Membrane.LinkError,
119+
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}"
114120
end
115121
end
116122

@@ -316,8 +322,8 @@ defmodule Membrane.Core.Bin.PadController do
316322
end
317323

318324
@spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t()
319-
defp maybe_handle_pad_added(ref, state) do
320-
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref)
325+
defp maybe_handle_pad_added(pad_ref, state) do
326+
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref)
321327

322328
if Pad.availability_mode(availability) == :dynamic do
323329
context = &CallbackContext.from_state(&1, pad_options: pad_opts)
@@ -326,7 +332,7 @@ defmodule Membrane.Core.Bin.PadController do
326332
:handle_pad_added,
327333
ActionHandler,
328334
%{context: context},
329-
[ref],
335+
[pad_ref],
330336
state
331337
)
332338
else

lib/membrane/core/bin/state.ex

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ defmodule Membrane.Core.Bin.State do
4545
terminating?: boolean(),
4646
resource_guard: Membrane.ResourceGuard.t(),
4747
setup_incomplete?: boolean(),
48+
initialized_internal_pads: MapSet.t(Pad.ref()),
4849
stalker: Membrane.Core.Stalker.t()
4950
}
5051

@@ -76,5 +77,6 @@ defmodule Membrane.Core.Bin.State do
7677
resource_guard: nil,
7778
subprocess_supervisor: nil,
7879
children_log_metadata: [],
80+
initialized_internal_pads: MapSet.new(),
7981
pads_data: nil
8082
end

lib/membrane/core/parent/child_life_controller.ex

+28-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
1717
alias Membrane.Pad
1818
alias Membrane.ParentError
1919

20+
require Membrane.Core.Child.PadModel, as: PadModel
2021
require Membrane.Core.Component
2122
require Membrane.Core.Message, as: Message
2223
require Membrane.Logger
@@ -154,7 +155,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
154155

155156
state =
156157
put_in(state, [:pending_specs, spec_ref], %{
157-
status: :initializing,
158+
status: :created,
158159
children_names: MapSet.new(all_children_names),
159160
links_ids: Enum.map(links, & &1.id),
160161
dependent_specs: dependent_specs,
@@ -309,14 +310,36 @@ defmodule Membrane.Core.Parent.ChildLifeController do
309310
end
310311
end
311312

313+
defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
314+
state =
315+
with %Bin.State{} <- state do
316+
initialized_internal_pads =
317+
spec_data.links_ids
318+
|> Enum.map(&Map.fetch!(state.links, &1))
319+
|> Enum.flat_map(&[&1.from, &1.to])
320+
|> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} ->
321+
with {Membrane.Bin, :itself} <- child,
322+
{:ok, timeout_ref} when timeout_ref != nil <-
323+
PadModel.get_data(state, pad_ref, :linking_timeout_ref) do
324+
[{pad_ref, timeout_ref}]
325+
else
326+
_other -> []
327+
end
328+
end)
329+
|> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1))
330+
331+
%{state | initialized_internal_pads: initialized_internal_pads}
332+
end
333+
334+
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)
335+
end
336+
312337
defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do
313338
Membrane.Logger.debug(
314339
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}"
315340
)
316341

317-
%{children: children} = state
318-
319-
if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
342+
if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and
320343
Enum.empty?(spec_data.dependent_specs) do
321344
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")
322345
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state)
@@ -351,7 +374,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
351374
end
352375

353376
defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do
354-
if Enum.empty?(spec_data.awaiting_responses) do
377+
if MapSet.size(spec_data.awaiting_responses) == 0 do
355378
state =
356379
spec_data.links_ids
357380
|> Enum.map(&Map.fetch!(state.links, &1))

0 commit comments

Comments
 (0)