Skip to content

Commit e6538fa

Browse files
committed
wip
1 parent d66cff4 commit e6538fa

13 files changed

+58
-46
lines changed

lib/membrane/core/bin/state.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule Membrane.Core.Bin.State do
4545
terminating?: boolean(),
4646
resource_guard: Membrane.ResourceGuard.t(),
4747
setup_incomplete?: boolean(),
48-
handling_action?: boolean(),
48+
# handling_action?: boolean(),
4949
stalker: Membrane.Core.Stalker.t()
5050
}
5151

@@ -73,7 +73,7 @@ defmodule Membrane.Core.Bin.State do
7373
initialized?: false,
7474
terminating?: false,
7575
setup_incomplete?: false,
76-
handling_action?: false,
76+
# handling_action?: false,
7777
stalker: nil,
7878
resource_guard: nil,
7979
subprocess_supervisor: nil,

lib/membrane/core/callback_handler.ex

+4-16
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,8 @@ defmodule Membrane.Core.CallbackHandler do
189189
reraise e, __STACKTRACE__
190190
end
191191

192-
was_handling_action? = state.handling_action?
193-
state = %{state | handling_action?: true}
194-
195-
# Updating :supplying_demand? flag value here is a temporal fix.
196-
# Setting it to `true` while handling actions causes postponing calls
197-
# of handle_redemand/2 and supply_demand/2 until a moment, when all
198-
# actions returned from the callback are handled
199-
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
200-
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state
192+
was_delay_consuming_queues? = Map.get(state, :delay_consuming_queues?, false)
193+
state = if Component.is_element?(state), do: %{state | delay_consuming_queues?: true}, else: state
201194

202195
state =
203196
Enum.reduce(actions, state, fn action, state ->
@@ -214,13 +207,8 @@ defmodule Membrane.Core.CallbackHandler do
214207
end)
215208

216209
state =
217-
if was_handling_action?,
218-
do: state,
219-
else: %{state | handling_action?: false}
220-
221-
state =
222-
if Component.is_element?(state) and not was_supplying_demand?,
223-
do: %{state | supplying_demand?: false},
210+
if Component.is_element?(state) and not was_delay_consuming_queues?,
211+
do: %{state | delay_consuming_queues?: false},
224212
else: state
225213

226214
handler_module.handle_end_of_actions(state)

lib/membrane/core/element.ex

+3-3
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ defmodule Membrane.Core.Element do
139139
name: options.name,
140140
internal_state: nil,
141141
parent_pid: options.parent,
142-
supplying_demand?: false,
142+
delay_consuming_queues?: false,
143143
delayed_demands: MapSet.new(),
144144
handle_demand_loop_counter: 0,
145145
synchronization: %{
@@ -157,7 +157,7 @@ defmodule Membrane.Core.Element do
157157
terminating?: false,
158158
setup_incomplete?: false,
159159
effective_flow_control: :push,
160-
handling_action?: false,
160+
# handling_action?: false,
161161
popping_auto_flow_queue?: false,
162162
pads_to_snapshot: MapSet.new(),
163163
stalker: options.stalker,
@@ -233,7 +233,7 @@ defmodule Membrane.Core.Element do
233233

234234
defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do
235235
pad_ref = Message.for_pad(msg)
236-
state = BufferController.handle_buffer(pad_ref, buffers, state)
236+
state = BufferController.handle_ingoing_buffer(pad_ref, buffers, state)
237237
{:noreply, state}
238238
end
239239

lib/membrane/core/element/action_handler.ex

+9-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ defmodule Membrane.Core.Element.ActionHandler do
5050
# Fixed order of handling demand of manual and auto pads would lead to
5151
# favoring manual pads over auto pads (or vice versa), especially after
5252
# introducting auto flow queues.
53+
54+
55+
# IO.inspect({state.delay_consuming_queues?, state.handling_action?}, label: "TT FF TF")
56+
with %{delay_consuming_queues?: false, handling_action?: true} <- state do
57+
raise "dupppppaaaaaaa"
58+
end
59+
5360
manual_demands_first? = Enum.random([1, 2]) == 1
5461

5562
state =
@@ -68,13 +75,13 @@ defmodule Membrane.Core.Element.ActionHandler do
6875
end
6976

7077
defp maybe_handle_delayed_demands(state) do
71-
with %{supplying_demand?: false} <- state do
78+
with %{delay_consuming_queues?: false} <- state do
7279
DemandHandler.handle_delayed_demands(state)
7380
end
7481
end
7582

7683
defp maybe_handle_pads_to_snapshot(state) do
77-
with %{handling_action?: false} <- state do
84+
with %{delay_consuming_queues?: false} <- state do
7885
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
7986
|> Map.put(:pads_to_snapshot, MapSet.new())
8087
end

lib/membrane/core/element/buffer_controller.ex

+8-8
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ defmodule Membrane.Core.Element.BufferController do
3030
callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2`
3131
to check if there are any unsupplied demands.
3232
"""
33-
@spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
34-
def handle_buffer(pad_ref, buffers, state) do
33+
@spec handle_ingoing_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
34+
def handle_ingoing_buffer(pad_ref, buffers, state) do
3535
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
3636
playback: %State{playback: :playing} <- state do
3737
%{
@@ -49,20 +49,20 @@ defmodule Membrane.Core.Element.BufferController do
4949
EventController.handle_start_of_stream(pad_ref, state)
5050
end
5151

52-
do_handle_buffer(pad_ref, data, buffers, state)
52+
do_handle_ingoing_buffer(pad_ref, data, buffers, state)
5353
else
5454
pad: {:error, :unknown_pad} ->
5555
# We've got a buffer from already unlinked pad
5656
state
5757

5858
playback: _playback ->
59-
PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state)
59+
PlaybackQueue.store(&handle_ingoing_buffer(pad_ref, buffers, &1), state)
6060
end
6161
end
6262

63-
@spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
63+
@spec do_handle_ingoing_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
6464
State.t()
65-
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
65+
defp do_handle_ingoing_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
6666
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
6767
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)
6868

@@ -77,7 +77,7 @@ defmodule Membrane.Core.Element.BufferController do
7777
end
7878
end
7979

80-
defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
80+
defp do_handle_ingoing_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
8181
%{input_queue: old_input_queue} = data
8282

8383
input_queue = InputQueue.store(old_input_queue, buffers)
@@ -90,7 +90,7 @@ defmodule Membrane.Core.Element.BufferController do
9090
end
9191
end
9292

93-
defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do
93+
defp do_handle_ingoing_buffer(pad_ref, %{flow_control: :push}, buffers, state) do
9494
exec_buffer_callback(pad_ref, buffers, state)
9595
end
9696

lib/membrane/core/element/demand_handler.ex

+12-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ defmodule Membrane.Core.Element.DemandHandler do
3232
output, `handle_demand` is invoked right away, so that the demand can be synchronously supplied.
3333
"""
3434
@spec handle_redemand(Pad.ref(), State.t()) :: State.t()
35-
def handle_redemand(pad_ref, %State{supplying_demand?: true} = state) do
35+
def handle_redemand(pad_ref, %State{delay_consuming_queues?: true} = state) do
3636
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand}))
3737
end
3838

@@ -42,9 +42,9 @@ defmodule Membrane.Core.Element.DemandHandler do
4242
end
4343

4444
defp do_handle_redemand(pad_ref, state) do
45-
state = %{state | supplying_demand?: true}
45+
state = %{state | delay_consuming_queues?: true}
4646
state = exec_handle_demand(pad_ref, state)
47-
%{state | supplying_demand?: false}
47+
%{state | delay_consuming_queues?: false}
4848
end
4949

5050
@doc """
@@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.DemandHandler do
7474
end
7575

7676
@spec supply_demand(Pad.ref(), State.t()) :: State.t()
77-
def supply_demand(pad_ref, %State{supplying_demand?: true} = state) do
77+
def supply_demand(pad_ref, %State{delay_consuming_queues?: true} = state) do
7878
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply}))
7979
end
8080

@@ -85,7 +85,7 @@ defmodule Membrane.Core.Element.DemandHandler do
8585

8686
defp do_supply_demand(pad_ref, state) do
8787
# marking is state that actual demand supply has been started (note changing back to false when finished)
88-
state = %State{state | supplying_demand?: true}
88+
state = %State{state | delay_consuming_queues?: true}
8989

9090
pad_data = state |> PadModel.get_data!(pad_ref)
9191

@@ -94,7 +94,7 @@ defmodule Membrane.Core.Element.DemandHandler do
9494

9595
state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue)
9696
state = handle_input_queue_output(pad_ref, popped_data, state)
97-
%State{state | supplying_demand?: false}
97+
%State{state | delay_consuming_queues?: false}
9898
end
9999

100100
defp update_demand(pad_ref, size, state) when is_integer(size) do
@@ -120,8 +120,9 @@ defmodule Membrane.Core.Element.DemandHandler do
120120
# one pad are supplied right away while another one is waiting for buffers
121121
# potentially for a long time.
122122

123+
state =
123124
cond do
124-
state.supplying_demand? ->
125+
state.delay_consuming_queues? ->
125126
raise "Cannot handle delayed demands while already supplying demand"
126127

127128
state.handle_demand_loop_counter >= @handle_demand_loop_limit ->
@@ -144,6 +145,10 @@ defmodule Membrane.Core.Element.DemandHandler do
144145
:redemand -> handle_redemand(pad_ref, state)
145146
end
146147
end
148+
149+
Enum.reduce(state.pads_to_snapshot, state, &Membrane.Core.Element.DemandController.snapshot_atomic_demand/2)
150+
|> Map.put(:pads_to_snapshot, MapSet.new())
151+
147152
end
148153

149154
@spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t()

lib/membrane/core/element/state.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ defmodule Membrane.Core.Element.State do
2323
pads_info: PadModel.pads_info() | nil,
2424
pads_data: PadModel.pads_data() | nil,
2525
parent_pid: pid,
26-
supplying_demand?: boolean(),
26+
delay_consuming_queues?: boolean(),
2727
delayed_demands: MapSet.t({Pad.ref(), :supply | :redemand}),
2828
handle_demand_loop_counter: non_neg_integer(),
2929
synchronization: %{
@@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.State do
7474
:initialized?,
7575
:terminating?,
7676
:setup_incomplete?,
77-
:supplying_demand?,
77+
:delay_consuming_queues?,
7878
:handling_action?,
7979
:popping_auto_flow_queue?,
8080
:stalker,

test/membrane/core/element/action_handler_test.exs

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do
6060
setup :demand_test_filter
6161

6262
test "delaying demand", %{state: state} do
63-
state = %{state | playback: :playing, supplying_demand?: true}
63+
state = %{state | playback: :playing, delay_consuming_queues?: true}
6464
state = @module.handle_action({:demand, {:input, 10}}, :handle_info, %{}, state)
6565
assert state.pads_data.input.manual_demand_size == 10
6666
assert MapSet.new([{:input, :supply}]) == state.delayed_demands
@@ -489,7 +489,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do
489489

490490
test "when pad works in auto or manual flow control mode", %{state: state} do
491491
state =
492-
%{state | supplying_demand?: true, playback: :playing}
492+
%{state | delay_consuming_queues?: true, playback: :playing}
493493
|> PadModel.set_data!(:output, :flow_control, :manual)
494494

495495
new_state =

test/membrane/core/element/event_controller_test.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ defmodule Membrane.Core.Element.EventControllerTest do
5151
parent_pid: self(),
5252
synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil},
5353
handling_action?: false,
54-
supplying_demand?: false,
54+
delay_consuming_queues?: false,
5555
pads_to_snapshot: MapSet.new(),
5656
delayed_demands: MapSet.new(),
5757
handle_demand_loop_counter: 0,

test/membrane/core/element/lifecycle_controller_test.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do
5050
parent_pid: self(),
5151
synchronization: %{clock: nil, parent_clock: nil},
5252
handling_action?: false,
53-
supplying_demand?: false,
53+
delay_consuming_queues?: false,
5454
pads_to_snapshot: MapSet.new(),
5555
delayed_demands: MapSet.new(),
5656
pads_data: %{

test/membrane/core/element/pad_controller_test.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ defmodule Membrane.Core.Element.PadControllerTest do
1919
name: name,
2020
module: elem_module,
2121
handling_action?: false,
22-
supplying_demand?: false,
22+
delay_consuming_queues?: false,
2323
pads_to_snapshot: MapSet.new(),
2424
delayed_demands: MapSet.new(),
2525
parent_pid: self(),

test/membrane/core/element/stream_format_controller_test.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do
4242
playback: :playing,
4343
synchronization: %{clock: nil, parent_clock: nil},
4444
handling_action?: false,
45-
supplying_demand?: false,
45+
delay_consuming_queues?: false,
4646
pads_to_snapshot: MapSet.new(),
4747
delayed_demands: MapSet.new(),
4848
handle_demand_loop_counter: 0,

test/membrane/integration/linking_test.exs

+12
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ defmodule Membrane.Integration.LinkingTest do
142142
Membrane.Pipeline.terminate(pipeline)
143143
end
144144

145+
@tag :flaky
145146
test "and element crashes, bin forwards the unlink message to child", %{pipeline: pipeline} do
146147
bin_spec = {
147148
child(:bin, %Bin{
@@ -164,6 +165,17 @@ defmodule Membrane.Integration.LinkingTest do
164165
send(pipeline, {:start_spec, %{spec: sink_spec}})
165166

166167
sink_pid = get_child_pid(:sink, pipeline)
168+
169+
# 1) test when element is connected to a bin and element crashes, bin forwards the unlink message to child (Membrane.Integration.LinkingTest)
170+
# test/membrane/integration/linking_test.exs:145
171+
# ** (KeyError) key :pid not found in: nil
172+
173+
# If you are using the dot syntax, such as map.field, make sure the left-hand side of the dot is a map
174+
# code: source_pid = get_child_pid(:source, bin_pid)
175+
# stacktrace:
176+
# test/membrane/integration/linking_test.exs:609: Membrane.Integration.LinkingTest.get_child_pid/2
177+
# test/membrane/integration/linking_test.exs:168: (test)
178+
167179
bin_pid = get_child_pid(:bin, pipeline)
168180
source_pid = get_child_pid(:source, bin_pid)
169181
source_ref = Process.monitor(source_pid)

0 commit comments

Comments
 (0)