Skip to content

Commit

Permalink
Implement CR suggestions WiP
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Dec 16, 2024
1 parent cc9039d commit d364df9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 29 deletions.
34 changes: 18 additions & 16 deletions lib/membrane/core/element/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ defmodule Membrane.Core.Element.DiamondDetectionController do

# This algorithm takes the directed graph made by all elements within a single pipeline and
# finds some diamond-subgraphs where all the edges (links) work in the :pull mode,
# it means in :manual flow control or in :auto flow control if the effective flow control
# is set to :pull.
# that is they're either in the :manual flow control or in the :auto flow control and the
# effective flow control is set to :pull.

# These diamonds can be dangerous when used with pull flow control, e.g. let's consider
# a pipeline that contains:
# * MP4 demuxer that has two output pads
# * MKV muxer that is linked to both of them
# and let's assume that the MP4 container that is consumed by the MP4 demuxer is unbalanced.
# and let's assume that the MP4 file that is consumed by the MP4 demuxer is unbalanced
# (audio and video streams are not interleaved, for example audio comes first and then video)
# If the MKV muxer has pads working in pull mode, then demand on one pad will be satisfied,
# but on the other won't, because the MP4 container is unbalanced. Then, if MP4 demuxer
# but on the other won't, because the source MP4 file is unbalanced. Then, if the MP4 demuxer
# has pads in auto flow control and its effective flow control is set to :pull, it won't
# demand on the input, because one of the pads output with :auto flow control doesn't
# have positive demand, so the whole pipeline will get stuck and won't process more data.
Expand All @@ -30,14 +31,14 @@ defmodule Membrane.Core.Element.DiamondDetectionController do

# Let's notice that:
# * a new diamond can be created only after linking a new spec
# * if the new spec caused some new diamond to occur, this diamond will contain some of
# the links spawned in this spec
# * if the new spec created a new diamond, this diamond will contain some of
# the links spawned in this spec
# If the diamond contains a link, it must also contain an element whose output pad
# is part of this link.

# After the spec status is set to :done, the parent component that returned the spec will trigger
# all elements whose output pads have been linked in this spec. The reference of the trigger
# is always set to the spec reference.
# After the spec status is set to :done, the parent component that returned the spec will
# triggerall elements whose output pads have been linked in this spec. The reference of the
# trigger is always set to the spec reference.

# If the element is triggered with a specific reference for the first time, it does two
# things:
Expand Down Expand Up @@ -152,7 +153,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
diamond_detecton_path = [new_path_vertex | diamond_detecton_path]

cond do
not is_map_key(state.diamond_detection.ref_to_path, diamond_detection_ref) ->
not is_map_key(state.diamond_detection_state.ref_to_path, diamond_detection_ref) ->
:ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state)

:ok =
Expand All @@ -170,13 +171,13 @@ defmodule Membrane.Core.Element.DiamondDetectionController do

have_common_prefix?(
diamond_detecton_path,
state.diamond_detection.ref_to_path[diamond_detection_ref]
state.diamond_detection_state.ref_to_path[diamond_detection_ref]
) ->
state

true ->
old_diamond_detection_path =
state.diamond_detection.ref_to_path[diamond_detection_ref]
state.diamond_detection_state.ref_to_path[diamond_detection_ref]
|> remove_component_path_prefix()

:ok =
Expand Down Expand Up @@ -244,7 +245,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
@spec start_diamond_detection_trigger(reference(), State.t()) :: State.t()
defp start_diamond_detection_trigger(spec_ref, state) do
if map_size(state.pads_data) < 2 or
MapSet.member?(state.diamond_detection.trigger_refs, spec_ref) do
MapSet.member?(state.diamond_detection_state.trigger_refs, spec_ref) do
state
else
do_handle_diamond_detection_trigger(spec_ref, state)
Expand All @@ -254,7 +255,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
@spec handle_diamond_detection_trigger(reference(), State.t()) :: State.t()
defp handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
if state.type == :endpoint or
MapSet.member?(state.diamond_detection.trigger_refs, trigger_ref),
MapSet.member?(state.diamond_detection_state.trigger_refs, trigger_ref),
do: state,
else: do_handle_diamond_detection_trigger(trigger_ref, state)
end
Expand All @@ -278,7 +279,8 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
else: state
end

defp postpone_diamond_detection(%State{} = state) when state.diamond_detection.postponed? do
defp postpone_diamond_detection(%State{} = state)
when state.diamond_detection_state.postponed? do
state
end

Expand Down Expand Up @@ -313,7 +315,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
end

defp get_component_path(state) do
case state.diamond_detection.serialized_component_path do
case state.diamond_detection_state.serialized_component_path do
nil ->
# adding @component_path_prefix to component path causes that component path
# always has more than 64 bytes, so it won't be copied during sending a message
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Membrane.Core.Element.DiamondDetectionController.DiamondDatectionState do
@moduledoc false

alias Membrane.Core.Element.DiamondDetectionController.PathInGraph

defstruct [
:serialized_component_path,
ref_to_path: %{},
trigger_refs: MapSet.new(),
postponed?: false
]

@type t :: %__MODULE__{
serialized_component_path: String.t() | nil,
ref_to_path: %{optional(reference()) => PathInGraph.t()},
trigger_refs: MapSet.t(reference()),
postponed?: boolean()
}
end
16 changes: 3 additions & 13 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Membrane.Core.Element.State do

alias Membrane.{Clock, Element, Pad, Sync}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.DiamondDetectionController.PathInGraph
alias Membrane.Core.Element.DiamondDetectionController.DiamondDatectionState
alias Membrane.Core.Element.EffectiveFlowController
alias Membrane.Core.Timer

Expand Down Expand Up @@ -48,12 +48,7 @@ defmodule Membrane.Core.Element.State do
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t(),
resume_delayed_demands_loop_in_mailbox?: boolean(),
diamond_detection: %{
serialized_component_path: String.t() | nil,
ref_to_path: %{optional(reference()) => PathInGraph.t()},
trigger_refs: MapSet.t(reference()),
postponed?: boolean()
}
diamond_detection_state: DiamondDatectionState.t()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand Down Expand Up @@ -86,12 +81,7 @@ defmodule Membrane.Core.Element.State do
handle_demand_loop_counter: 0,
pads_to_snapshot: MapSet.new(),
playback_queue: [],
diamond_detection: %{
serialized_component_path: nil,
ref_to_path: %{},
trigger_refs: MapSet.new(),
postponed?: false
},
diamond_detection_state: %DiamondDatectionState{},
pads_data: %{},
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
Expand Down

0 comments on commit d364df9

Please sign in to comment.