@@ -8,11 +8,10 @@ defmodule Membrane.Core.Bin.PadController do
8
8
alias Membrane . { Core , LinkError , Pad }
9
9
alias Membrane.Core.Bin . { ActionHandler , CallbackContext , State }
10
10
alias Membrane.Core . { CallbackHandler , Child , Message }
11
- alias Membrane.Core.Child.PadModel
12
11
alias Membrane.Core.Element.StreamFormatController
13
12
alias Membrane.Core.Parent . { ChildLifeController , Link , SpecificationParser }
14
13
15
- require Membrane.Core.Child.PadModel
14
+ require Membrane.Core.Child.PadModel , as: PadModel
16
15
require Membrane.Core.Message
17
16
require Membrane.Logger
18
17
require Membrane.Pad
@@ -50,8 +49,7 @@ defmodule Membrane.Core.Bin.PadController do
50
49
state =
51
50
case PadModel . get_data ( state , pad_ref ) do
52
51
{ :error , :unknown_pad } ->
53
- init_pad_data ( pad_ref , pad_info , state )
54
- |> Map . update! ( :pad_refs , & [ pad_ref | & 1 ] )
52
+ init_pad_data ( pad_ref , state )
55
53
56
54
# This case is for pads that were instantiated before the external link request,
57
55
# that is in the internal link request (see `handle_internal_link_request/4`).
@@ -69,9 +67,17 @@ defmodule Membrane.Core.Bin.PadController do
69
67
state
70
68
end
71
69
72
- state = PadModel . update_data! ( state , pad_ref , & % { & 1 | link_id: link_id , options: pad_options } )
70
+ linking_timeout_id = make_ref ( )
73
71
74
- _ref = Process . send_after ( self ( ) , Message . new ( :linking_timeout , pad_ref ) , 5000 )
72
+ state =
73
+ PadModel . update_data! (
74
+ state ,
75
+ pad_ref ,
76
+ & % { & 1 | link_id: link_id , linking_timeout_id: linking_timeout_id , options: pad_options }
77
+ )
78
+
79
+ message = Message . new ( :linking_timeout , [ pad_ref , linking_timeout_id ] )
80
+ _ref = Process . send_after ( self ( ) , message , 5000 )
75
81
76
82
maybe_handle_pad_added ( pad_ref , state )
77
83
end
@@ -96,20 +102,16 @@ defmodule Membrane.Core.Bin.PadController do
96
102
end
97
103
end
98
104
99
- @ spec handle_linking_timeout ( Pad . ref ( ) , State . t ( ) ) :: State . t ( ) | no_return ( )
100
- def handle_linking_timeout ( pad_ref , state ) do
101
- case Map . fetch ( state . linking_timeout_counters , pad_ref ) do
102
- { :ok , 1 } ->
103
- Map . update! ( state , :linking_timeout_counters , & Map . delete ( & 1 , pad_ref ) )
104
-
105
- { :ok , counter } when counter > 1 ->
106
- put_in ( state . linking_timeout_counters [ pad_ref ] , counter - 1 )
107
-
108
- _else ->
109
- raise Membrane.LinkError , """
110
- Bin pad #{ inspect ( pad_ref ) } wasn't linked internally within timeout. Pad data: #{ PadModel . get_data ( state , pad_ref ) |> inspect ( pretty: true ) }
111
- """
105
+ @ spec handle_linking_timeout ( Pad . ref ( ) , reference ( ) , State . t ( ) ) :: :ok | no_return ( )
106
+ def handle_linking_timeout ( pad_ref , linking_timeout_id , state ) do
107
+ with { :ok , pad_data } <- PadModel . get_data ( state , pad_ref ) ,
108
+ % { linking_timeout_id: ^ linking_timeout_id , linked_in_spec?: false } <- pad_data do
109
+ raise Membrane.LinkError , """
110
+ Bin pad #{ inspect ( pad_ref ) } wasn't linked internally within timeout. Pad data: #{ PadModel . get_data ( state , pad_ref ) |> inspect ( pretty: true ) }
111
+ """
112
112
end
113
+
114
+ :ok
113
115
end
114
116
115
117
@ doc """
@@ -137,7 +139,7 @@ defmodule Membrane.Core.Bin.PadController do
137
139
138
140
# Static pads can be linked internally before the external link request
139
141
pad_info . availability == :always ->
140
- init_pad_data ( pad_ref , pad_info , state )
142
+ init_pad_data ( pad_ref , state )
141
143
142
144
true ->
143
145
raise LinkError , "Dynamic pads must be firstly linked externally, then internally"
@@ -282,7 +284,6 @@ defmodule Membrane.Core.Bin.PadController do
282
284
with { :ok , % { availability: :on_request } } <- PadModel . get_data ( state , pad_ref ) do
283
285
{ pad_data , state } =
284
286
maybe_handle_pad_removed ( pad_ref , state )
285
- |> Map . update! ( :pad_refs , & List . delete ( & 1 , pad_ref ) )
286
287
|> PadModel . pop_data! ( pad_ref )
287
288
288
289
if pad_data . endpoint do
@@ -349,9 +350,15 @@ defmodule Membrane.Core.Bin.PadController do
349
350
end
350
351
end
351
352
352
- defp init_pad_data ( pad_ref , pad_info , state ) do
353
+ @ spec init_pad_data ( Pad . ref ( ) , State . t ( ) ) :: State . t ( )
354
+ def init_pad_data ( pad_ref , state ) do
355
+ if PadModel . assert_instance ( state , pad_ref ) == :ok do
356
+ raise "Cannot init pad data for pad #{ inspect ( pad_ref ) } , because it already exists"
357
+ end
358
+
353
359
pad_data =
354
- pad_info
360
+ state . pads_info
361
+ |> Map . get ( Pad . name_by_ref ( pad_ref ) )
355
362
|> Map . delete ( :accepted_formats_str )
356
363
|> Map . merge ( % {
357
364
ref: pad_ref ,
@@ -360,10 +367,12 @@ defmodule Membrane.Core.Bin.PadController do
360
367
linked?: false ,
361
368
response_received?: false ,
362
369
spec_ref: nil ,
363
- options: nil
370
+ options: nil ,
371
+ linking_timeout_id: nil ,
372
+ linked_in_spec?: false
364
373
} )
365
374
|> then ( & struct! ( Membrane.Bin.PadData , & 1 ) )
366
375
367
- put_in ( state , [ : pads_data, pad_ref ] , pad_data )
376
+ put_in ( state . pads_data [ pad_ref ] , pad_data )
368
377
end
369
378
end
0 commit comments