diff --git a/lib/membrane/core/element/diamond_detection_controller.ex b/lib/membrane/core/element/diamond_detection_controller.ex index b5315f714..9f9256b65 100644 --- a/lib/membrane/core/element/diamond_detection_controller.ex +++ b/lib/membrane/core/element/diamond_detection_controller.ex @@ -10,16 +10,17 @@ defmodule Membrane.Core.Element.DiamondDetectionController do # This algorithm takes the directed graph made by all elements within a single pipeline and # finds some diamond-subgraphs where all the edges (links) work in the :pull mode, - # it means in :manual flow control or in :auto flow control if the effective flow control - # is set to :pull. + # that is they're either in the :manual flow control or in the :auto flow control and the + # effective flow control is set to :pull. # These diamonds can be dangerous when used with pull flow control, e.g. let's consider # a pipeline that contains: # * MP4 demuxer that has two output pads # * MKV muxer that is linked to both of them - # and let's assume that the MP4 container that is consumed by the MP4 demuxer is unbalanced. + # and let's assume that the MP4 file that is consumed by the MP4 demuxer is unbalanced + # (audio and video streams are not interleaved, for example audio comes first and then video) # If the MKV muxer has pads working in pull mode, then demand on one pad will be satisfied, - # but on the other won't, because the MP4 container is unbalanced. Then, if MP4 demuxer + # but on the other won't, because the source MP4 file is unbalanced. Then, if the MP4 demuxer # has pads in auto flow control and its effective flow control is set to :pull, it won't # demand on the input, because one of the pads output with :auto flow control doesn't # have positive demand, so the whole pipeline will get stuck and won't process more data. @@ -30,14 +31,14 @@ defmodule Membrane.Core.Element.DiamondDetectionController do # Let's notice that: # * a new diamond can be created only after linking a new spec - # * if the new spec caused some new diamond to occur, this diamond will contain some of - # the links spawned in this spec + # * if the new spec created a new diamond, this diamond will contain some of + # the links spawned in this spec # If the diamond contains a link, it must also contain an element whose output pad # is part of this link. - # After the spec status is set to :done, the parent component that returned the spec will trigger - # all elements whose output pads have been linked in this spec. The reference of the trigger - # is always set to the spec reference. + # After the spec status is set to :done, the parent component that returned the spec will + # triggerall elements whose output pads have been linked in this spec. The reference of the + # trigger is always set to the spec reference. # If the element is triggered with a specific reference for the first time, it does two # things: @@ -152,7 +153,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do diamond_detecton_path = [new_path_vertex | diamond_detecton_path] cond do - not is_map_key(state.diamond_detection.ref_to_path, diamond_detection_ref) -> + not is_map_key(state.diamond_detection_state.ref_to_path, diamond_detection_ref) -> :ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state) :ok = @@ -170,13 +171,13 @@ defmodule Membrane.Core.Element.DiamondDetectionController do have_common_prefix?( diamond_detecton_path, - state.diamond_detection.ref_to_path[diamond_detection_ref] + state.diamond_detection_state.ref_to_path[diamond_detection_ref] ) -> state true -> old_diamond_detection_path = - state.diamond_detection.ref_to_path[diamond_detection_ref] + state.diamond_detection_state.ref_to_path[diamond_detection_ref] |> remove_component_path_prefix() :ok = @@ -244,7 +245,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do @spec start_diamond_detection_trigger(reference(), State.t()) :: State.t() defp start_diamond_detection_trigger(spec_ref, state) do if map_size(state.pads_data) < 2 or - MapSet.member?(state.diamond_detection.trigger_refs, spec_ref) do + MapSet.member?(state.diamond_detection_state.trigger_refs, spec_ref) do state else do_handle_diamond_detection_trigger(spec_ref, state) @@ -254,7 +255,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do @spec handle_diamond_detection_trigger(reference(), State.t()) :: State.t() defp handle_diamond_detection_trigger(trigger_ref, %State{} = state) do if state.type == :endpoint or - MapSet.member?(state.diamond_detection.trigger_refs, trigger_ref), + MapSet.member?(state.diamond_detection_state.trigger_refs, trigger_ref), do: state, else: do_handle_diamond_detection_trigger(trigger_ref, state) end @@ -278,7 +279,8 @@ defmodule Membrane.Core.Element.DiamondDetectionController do else: state end - defp postpone_diamond_detection(%State{} = state) when state.diamond_detection.postponed? do + defp postpone_diamond_detection(%State{} = state) + when state.diamond_detection_state.postponed? do state end @@ -313,7 +315,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do end defp get_component_path(state) do - case state.diamond_detection.serialized_component_path do + case state.diamond_detection_state.serialized_component_path do nil -> # adding @component_path_prefix to component path causes that component path # always has more than 64 bytes, so it won't be copied during sending a message diff --git a/lib/membrane/core/element/diamond_detection_controller/diamond_detection_state.ex b/lib/membrane/core/element/diamond_detection_controller/diamond_detection_state.ex new file mode 100644 index 000000000..09d4f5e22 --- /dev/null +++ b/lib/membrane/core/element/diamond_detection_controller/diamond_detection_state.ex @@ -0,0 +1,19 @@ +defmodule Membrane.Core.Element.DiamondDetectionController.DiamondDatectionState do + @moduledoc false + + alias Membrane.Core.Element.DiamondDetectionController.PathInGraph + + defstruct [ + :serialized_component_path, + ref_to_path: %{}, + trigger_refs: MapSet.new(), + postponed?: false + ] + + @type t :: %__MODULE__{ + serialized_component_path: String.t() | nil, + ref_to_path: %{optional(reference()) => PathInGraph.t()}, + trigger_refs: MapSet.t(reference()), + postponed?: boolean() + } +end diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index a20daef2b..1c4d2d898 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -9,7 +9,7 @@ defmodule Membrane.Core.Element.State do alias Membrane.{Clock, Element, Pad, Sync} alias Membrane.Core.Child.PadModel - alias Membrane.Core.Element.DiamondDetectionController.PathInGraph + alias Membrane.Core.Element.DiamondDetectionController.DiamondDatectionState alias Membrane.Core.Element.EffectiveFlowController alias Membrane.Core.Timer @@ -48,12 +48,7 @@ defmodule Membrane.Core.Element.State do satisfied_auto_output_pads: MapSet.t(), awaiting_auto_input_pads: MapSet.t(), resume_delayed_demands_loop_in_mailbox?: boolean(), - diamond_detection: %{ - serialized_component_path: String.t() | nil, - ref_to_path: %{optional(reference()) => PathInGraph.t()}, - trigger_refs: MapSet.t(reference()), - postponed?: boolean() - } + diamond_detection_state: DiamondDatectionState.t() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -86,12 +81,7 @@ defmodule Membrane.Core.Element.State do handle_demand_loop_counter: 0, pads_to_snapshot: MapSet.new(), playback_queue: [], - diamond_detection: %{ - serialized_component_path: nil, - ref_to_path: %{}, - trigger_refs: MapSet.new(), - postponed?: false - }, + diamond_detection_state: %DiamondDatectionState{}, pads_data: %{}, satisfied_auto_output_pads: MapSet.new(), awaiting_auto_input_pads: MapSet.new(),