Skip to content

Commit fb10564

Browse files
authored
Refactor demand mechanism (#783)
* Comment out handling_action? flag * Add new delayed demands loop tests * supplying_demand? -> delay_demands? * Delete unncessary flags, rename some modules * Introduce ManualFlowController * Introduce AutoFlowController * Format * InputQueue -> ManualFlowController.InputQueue * Bump default input queue size to 100 * Fix tests * Remove unnecessary comments * Remove leftovers * Clean up aliases * format * Implement comments from CR * Fix test performance * Fix performance test * Implement suggestion from CR
1 parent ef91c4d commit fb10564

29 files changed

+310
-222
lines changed

.circleci/config.yml

+3-2
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,14 @@ jobs:
152152
steps:
153153
- attach_workspace:
154154
at: .
155+
- run: cd app; git add -A; git reset --hard; cd ..
155156
- run: cp -r benchmark/ ~/benchmark_backup/
156157
- run: cp mix.exs ~/benchmark_backup/
157158
- run: docker pull membraneframeworklabs/docker_membrane
158-
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/feature_branch_results
159+
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/feature_branch_results
159160
- run: git checkout -f master
160161
- run: cp ~/benchmark_backup/mix.exs ~/app
161-
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/master_results
162+
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/master_results
162163
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix run benchmark/compare.exs /root/results/feature_branch_results /root/results/master_results
163164
- run:
164165
command: rm ~/results/*

lib/membrane/children_spec.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ defmodule Membrane.ChildrenSpec do
514514
Membrane won't send smaller demand than `minimal demand`, to reduce demands' overhead. However, the user will always receive
515515
as many buffers, as demanded, all excess buffers will be queued internally.
516516
Used only for pads working in `:manual` flow control mode. See `t:Membrane.Pad.flow_control/0`
517-
for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future).
517+
for more info. Defaults to `#{Membrane.Core.Element.ManualFlowController.InputQueue.default_min_demand_factor()}` (the default may change in the future).
518518
- `auto_demand_size` - Size of automatically generated demands. Used only for pads working in `:auto` flow control mode.
519519
See `t:Membrane.Pad.flow_control/0` for more info.
520520
- `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1,

lib/membrane/core/bin.ex

-2
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@ defmodule Membrane.Core.Bin do
7070

7171
if node do
7272
result = :rpc.call(node, GenServer, :start, [__MODULE__, options])
73-
74-
# TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved
7573
with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid)
7674
result
7775
else

lib/membrane/core/bin/state.ex

-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ defmodule Membrane.Core.Bin.State do
4545
terminating?: boolean(),
4646
resource_guard: Membrane.ResourceGuard.t(),
4747
setup_incomplete?: boolean(),
48-
handling_action?: boolean(),
4948
stalker: Membrane.Core.Stalker.t()
5049
}
5150

@@ -73,7 +72,6 @@ defmodule Membrane.Core.Bin.State do
7372
initialized?: false,
7473
terminating?: false,
7574
setup_incomplete?: false,
76-
handling_action?: false,
7775
stalker: nil,
7876
resource_guard: nil,
7977
subprocess_supervisor: nil,

lib/membrane/core/callback_handler.ex

-21
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do
88
use Bunch
99

1010
alias Membrane.CallbackError
11-
alias Membrane.Core.Component
1211

1312
require Membrane.Logger
1413

@@ -189,16 +188,6 @@ defmodule Membrane.Core.CallbackHandler do
189188
reraise e, __STACKTRACE__
190189
end
191190

192-
was_handling_action? = state.handling_action?
193-
state = %{state | handling_action?: true}
194-
195-
# Updating :supplying_demand? flag value here is a temporal fix.
196-
# Setting it to `true` while handling actions causes postponing calls
197-
# of handle_redemand/2 and supply_demand/2 until a moment, when all
198-
# actions returned from the callback are handled
199-
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
200-
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state
201-
202191
state =
203192
Enum.reduce(actions, state, fn action, state ->
204193
try do
@@ -213,16 +202,6 @@ defmodule Membrane.Core.CallbackHandler do
213202
end
214203
end)
215204

216-
state =
217-
if was_handling_action?,
218-
do: state,
219-
else: %{state | handling_action?: false}
220-
221-
state =
222-
if Component.is_element?(state) and not was_supplying_demand?,
223-
do: %{state | supplying_demand?: false},
224-
else: state
225-
226205
handler_module.handle_end_of_actions(state)
227206
end
228207
end

lib/membrane/core/child/pad_model.ex

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ defmodule Membrane.Core.Child.PadModel do
77

88
alias Membrane.Core.Child
99
alias Membrane.Core.Element.EffectiveFlowController
10+
alias Membrane.Core.Element.ManualFlowController.InputQueue
1011
alias Membrane.{Pad, UnknownPadError}
1112

1213
@type bin_pad_data :: %Membrane.Bin.PadData{
@@ -39,7 +40,7 @@ defmodule Membrane.Core.Child.PadModel do
3940
pid: pid,
4041
other_ref: Pad.ref(),
4142
sticky_messages: [Membrane.Event.t()],
42-
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
43+
input_queue: InputQueue.t() | nil,
4344
options: %{optional(atom) => any},
4445
auto_demand_size: pos_integer() | nil,
4546
sticky_events: [Membrane.Event.t()],

lib/membrane/core/element.ex

+3-5
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ defmodule Membrane.Core.Element do
2424
alias Membrane.Core.Element.{
2525
BufferController,
2626
DemandController,
27-
DemandHandler,
2827
EffectiveFlowController,
2928
EventController,
3029
LifecycleController,
30+
ManualFlowController,
3131
PadController,
3232
State,
3333
StreamFormatController
@@ -85,8 +85,6 @@ defmodule Membrane.Core.Element do
8585
# rpc if necessary
8686
if node do
8787
result = :rpc.call(node, GenServer, :start, [__MODULE__, options])
88-
89-
# TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved
9088
with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid)
9189
result
9290
else
@@ -211,13 +209,13 @@ defmodule Membrane.Core.Element do
211209
end
212210

213211
defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
214-
state = DemandHandler.resume_delayed_demands_loop(state)
212+
state = ManualFlowController.resume_delayed_demands_loop(state)
215213
{:noreply, state}
216214
end
217215

218216
defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do
219217
pad_ref = Message.for_pad(msg)
220-
state = BufferController.handle_buffer(pad_ref, buffers, state)
218+
state = BufferController.handle_incoming_buffers(pad_ref, buffers, state)
221219
{:noreply, state}
222220
end
223221

lib/membrane/core/element/action_handler.ex

+30-34
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ defmodule Membrane.Core.Element.ActionHandler do
2222
}
2323

2424
alias Membrane.Core.Element.{
25+
AutoFlowController,
2526
DemandController,
26-
DemandHandler,
27+
ManualFlowController,
2728
State,
2829
StreamFormatController
2930
}
3031

31-
alias Membrane.Core.Element.DemandController.AutoFlowUtils
3232
alias Membrane.Core.{Events, TimerController}
3333
alias Membrane.Element.Action
3434

@@ -50,34 +50,29 @@ defmodule Membrane.Core.Element.ActionHandler do
5050
# Fixed order of handling demand of manual and auto pads would lead to
5151
# favoring manual pads over auto pads (or vice versa), especially after
5252
# introducting auto flow queues.
53-
manual_demands_first? = Enum.random([1, 2]) == 1
5453

55-
state =
56-
if manual_demands_first?,
57-
do: maybe_handle_delayed_demands(state),
58-
else: state
59-
60-
state = maybe_handle_pads_to_snapshot(state)
61-
62-
state =
63-
if manual_demands_first?,
64-
do: state,
65-
else: maybe_handle_delayed_demands(state)
66-
67-
state
54+
if Enum.random([true, false]) do
55+
state
56+
|> handle_pads_to_snapshot()
57+
|> maybe_handle_delayed_demands()
58+
else
59+
state
60+
|> maybe_handle_delayed_demands()
61+
|> handle_pads_to_snapshot()
62+
end
6863
end
6964

7065
defp maybe_handle_delayed_demands(state) do
71-
with %{supplying_demand?: false} <- state do
72-
DemandHandler.handle_delayed_demands(state)
66+
with %{delay_demands?: false} <- state do
67+
ManualFlowController.handle_delayed_demands(state)
7368
end
7469
end
7570

76-
defp maybe_handle_pads_to_snapshot(state) do
77-
with %{handling_action?: false} <- state do
78-
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
79-
|> Map.put(:pads_to_snapshot, MapSet.new())
80-
end
71+
defp handle_pads_to_snapshot(state) do
72+
state.pads_to_snapshot
73+
|> Enum.shuffle()
74+
|> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2)
75+
|> Map.put(:pads_to_snapshot, MapSet.new())
8176
end
8277

8378
@impl CallbackHandler
@@ -178,13 +173,13 @@ defmodule Membrane.Core.Element.ActionHandler do
178173
@impl CallbackHandler
179174
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
180175
when type in [:sink, :filter, :endpoint] do
181-
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
176+
AutoFlowController.pause_demands(in_ref, state)
182177
end
183178

184179
@impl CallbackHandler
185180
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
186181
when type in [:sink, :filter, :endpoint] do
187-
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
182+
AutoFlowController.resume_demands(in_ref, state)
188183
end
189184

190185
@impl CallbackHandler
@@ -239,7 +234,7 @@ defmodule Membrane.Core.Element.ActionHandler do
239234
%State{type: type} = state
240235
)
241236
when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do
242-
supply_demand(pad_ref, size, state)
237+
delay_supplying_demand(pad_ref, size, state)
243238
end
244239

245240
@impl CallbackHandler
@@ -396,26 +391,27 @@ defmodule Membrane.Core.Element.ActionHandler do
396391
end
397392
end
398393

399-
@spec supply_demand(
394+
@spec delay_supplying_demand(
400395
Pad.ref(),
401396
Action.demand_size(),
402397
State.t()
403398
) :: State.t()
404-
defp supply_demand(pad_ref, 0, state) do
399+
defp delay_supplying_demand(pad_ref, 0, state) do
405400
Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}")
406401
state
407402
end
408403

409-
defp supply_demand(pad_ref, size, _state)
404+
defp delay_supplying_demand(pad_ref, size, _state)
410405
when is_integer(size) and size < 0 do
411406
raise ElementError,
412407
"Tried to request a negative demand of size #{inspect(size)} on pad #{inspect(pad_ref)}"
413408
end
414409

415-
defp supply_demand(pad_ref, size, state) do
410+
defp delay_supplying_demand(pad_ref, size, state) do
416411
with %{direction: :input, flow_control: :manual} <-
417412
PadModel.get_data!(state, pad_ref) do
418-
DemandHandler.supply_demand(pad_ref, size, state)
413+
state = ManualFlowController.update_demand(pad_ref, size, state)
414+
ManualFlowController.delay_supplying_demand(pad_ref, state)
419415
else
420416
%{direction: :output} ->
421417
raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref
@@ -435,7 +431,7 @@ defmodule Membrane.Core.Element.ActionHandler do
435431
when type in [:source, :filter, :endpoint] do
436432
with %{direction: :output, flow_control: :manual} <-
437433
PadModel.get_data!(state, pad_ref) do
438-
DemandHandler.handle_redemand(pad_ref, state)
434+
ManualFlowController.delay_redemand(pad_ref, state)
439435
else
440436
%{direction: :input} ->
441437
raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}"
@@ -471,10 +467,10 @@ defmodule Membrane.Core.Element.ActionHandler do
471467
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
472468
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
473469
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
474-
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
470+
ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state)
475471
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
476472
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
477-
|> AutoFlowUtils.pop_queues_and_bump_demand()
473+
|> AutoFlowController.pop_queues_and_bump_demand()
478474
else
479475
%{direction: :input} ->
480476
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref

lib/membrane/core/element/demand_controller/auto_flow_utils.ex renamed to lib/membrane/core/element/auto_flow_controller.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
1+
defmodule Membrane.Core.Element.AutoFlowController do
22
@moduledoc false
33

44
alias Membrane.Buffer

lib/membrane/core/element/buffer_controller.ex

+22-16
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,28 @@ defmodule Membrane.Core.Element.BufferController do
1111

1212
alias Membrane.Core.Element.{
1313
ActionHandler,
14+
AutoFlowController,
1415
CallbackContext,
15-
DemandHandler,
1616
EventController,
17-
InputQueue,
17+
ManualFlowController,
1818
PlaybackQueue,
1919
State
2020
}
2121

22-
alias Membrane.Core.Element.DemandController.AutoFlowUtils
22+
alias Membrane.Core.Element.ManualFlowController.InputQueue
23+
2324
alias Membrane.Core.Telemetry
2425

2526
require Membrane.Core.Child.PadModel
2627
require Membrane.Core.Telemetry
2728

2829
@doc """
2930
Handles incoming buffer: either stores it in InputQueue, or executes element's
30-
callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2`
31+
callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2`
3132
to check if there are any unsupplied demands.
3233
"""
33-
@spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
34-
def handle_buffer(pad_ref, buffers, state) do
34+
@spec handle_incoming_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
35+
def handle_incoming_buffers(pad_ref, buffers, state) do
3536
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
3637
playback: %State{playback: :playing} <- state do
3738
%{
@@ -49,48 +50,53 @@ defmodule Membrane.Core.Element.BufferController do
4950
EventController.handle_start_of_stream(pad_ref, state)
5051
end
5152

52-
do_handle_buffer(pad_ref, data, buffers, state)
53+
do_handle_incoming_buffers(pad_ref, data, buffers, state)
5354
else
5455
pad: {:error, :unknown_pad} ->
5556
# We've got a buffer from already unlinked pad
5657
state
5758

5859
playback: _playback ->
59-
PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state)
60+
PlaybackQueue.store(&handle_incoming_buffers(pad_ref, buffers, &1), state)
6061
end
6162
end
6263

63-
@spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
64+
@spec do_handle_incoming_buffers(
65+
Pad.ref(),
66+
PadModel.pad_data(),
67+
[Buffer.t()] | Buffer.t(),
68+
State.t()
69+
) ::
6470
State.t()
65-
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
71+
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do
6672
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
6773
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)
6874

6975
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
7076
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)
7177

7278
if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
73-
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
79+
AutoFlowController.store_buffers_in_queue(pad_ref, buffers, state)
7480
else
7581
state = exec_buffer_callback(pad_ref, buffers, state)
76-
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
82+
AutoFlowController.auto_adjust_atomic_demand(pad_ref, state)
7783
end
7884
end
7985

80-
defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
86+
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do
8187
%{input_queue: old_input_queue} = data
8288

8389
input_queue = InputQueue.store(old_input_queue, buffers)
8490
state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue)
8591

86-
if old_input_queue |> InputQueue.empty?() do
87-
DemandHandler.supply_demand(pad_ref, state)
92+
if InputQueue.empty?(old_input_queue) do
93+
ManualFlowController.supply_demand(pad_ref, state)
8894
else
8995
state
9096
end
9197
end
9298

93-
defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do
99+
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :push}, buffers, state) do
94100
exec_buffer_callback(pad_ref, buffers, state)
95101
end
96102

0 commit comments

Comments
 (0)