From 34570166ef95fea18292c2827a110903e5e22184 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 17 Jan 2024 14:22:47 +0100 Subject: [PATCH 01/22] wip --- .../actions_handling_order_test.exs | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 test/membrane/integration/actions_handling_order_test.exs diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs new file mode 100644 index 000000000..9b67ebbb4 --- /dev/null +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -0,0 +1,40 @@ +defmodule Membrane.Integration.ActionsHandlingOrderTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + defmodule DelayedPlayingPipeline do + use Membrane.Pipeline + + @impl true + def handle_setup(_ctx, state) do + self() |> send(:wake_up_buddy) + {[setup: :incomplete, start_timer: {:one, Membrane.Time.second()}], state} + end + + @impl true + def handle_info(:wake_up_buddy, _ctx, state) do + {[setup: :complete, timer_interval: {:one, Membrane.Time.seconds(10)}], state} + end + + @impl true + def handle_playing(_ctx, state) do + {[timer_interval: {:one, Membrane.Time.seconds(5)}], state} + end + + @impl true + def handle_tick(:one, ctx, state) do + # IO.inspect(ctx, limit: :infinity) + IO.puts("TICK") + {[], state} + end + end + + # no to to co trzeba zrobic: dopisz pipeline podobny do powyzszego, ktory w handle info rzuci bufor, i w handle playing rzuci bufor + # na moje oko to zachowanie powyzej powinno przejsc, tak, ze najpierw bedzie wyslany bufor z playing zamiast z info, ale no + # to powinno sie wyalac wg mnie. + + # ale to powyzej nie ejst jakies turbo istotne, najpierw zrob tak zeby tick powyzej trwal co 5 sekund + # i przekmin dlaczego zmiany lukasza cokolwiek naprawily +end From 69d8a0328dcba6d8be6b39c9761b324865ffc7e8 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 18 Jan 2024 17:33:33 +0100 Subject: [PATCH 02/22] Write tests wip --- .../actions_handling_order_test.exs | 145 ++++++++++++++++-- 1 file changed, 131 insertions(+), 14 deletions(-) diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs index 9b67ebbb4..e7270e789 100644 --- a/test/membrane/integration/actions_handling_order_test.exs +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -4,37 +4,154 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do import Membrane.ChildrenSpec import Membrane.Testing.Assertions - defmodule DelayedPlayingPipeline do + alias Membrane.Testing + + defmodule TickingPipeline do + use Membrane.Pipeline + + @tick_time Membrane.Time.milliseconds(100) + + @impl true + def handle_init(_ctx, test_process: test_process), + do: {[], %{ticked?: false, test_process: test_process}} + + @impl true + def handle_setup(_ctx, state) do + {[setup: :incomplete, start_timer: {:one, @tick_time}], state} + end + + @impl true + def handle_playing(_ctx, state) do + {[timer_interval: {:one, @tick_time}], state} + end + + @impl true + def handle_tick(:one, ctx, %{ticked?: false} = state) do + {[setup: :complete, timer_interval: {:one, :no_interval}], %{state | ticked?: true}} + end + + @impl true + def handle_tick(:one, _ctx, state) do + send(state.test_process, :ticked_two_times) + {[timer_interval: {:one, :no_interval}], state} + end + end + + defmodule NotifyingPipeline do use Membrane.Pipeline + alias Membrane.Integration.ActionsHandlingOrderTest.NotifyingPipelineChild + + @impl true + def handle_init(_ctx, _opts) do + spec = child(:child, NotifyingPipelineChild) + {[spec: spec], %{}} + end + @impl true def handle_setup(_ctx, state) do - self() |> send(:wake_up_buddy) - {[setup: :incomplete, start_timer: {:one, Membrane.Time.second()}], state} + self() |> send(:time_to_play) + {[setup: :incomplete], state} end @impl true - def handle_info(:wake_up_buddy, _ctx, state) do - {[setup: :complete, timer_interval: {:one, Membrane.Time.seconds(10)}], state} + def handle_info(:time_to_play, _ctx, state) do + {[setup: :complete, notify_child: {:child, :first_notification}], state} end @impl true def handle_playing(_ctx, state) do - {[timer_interval: {:one, Membrane.Time.seconds(5)}], state} + {[notify_child: {:child, :second_notification}], state} end @impl true - def handle_tick(:one, ctx, state) do - # IO.inspect(ctx, limit: :infinity) - IO.puts("TICK") + def handle_info({:get_notifications, test_process}, _ctx, state) do + actions = [notify_child: {:child, :get_notifications}] + state = Map.put(state, :test_process, test_process) + + {actions, state} + end + + @impl true + def handle_child_notification(notifications, :child, _ctx, state) do + send(state.test_process, {:notifications, notifications}) {[], state} end end - # no to to co trzeba zrobic: dopisz pipeline podobny do powyzszego, ktory w handle info rzuci bufor, i w handle playing rzuci bufor - # na moje oko to zachowanie powyzej powinno przejsc, tak, ze najpierw bedzie wyslany bufor z playing zamiast z info, ale no - # to powinno sie wyalac wg mnie. + defmodule NotifyingPipelineChild do + use Membrane.Filter + + @impl true + def handle_init(_ctx, _opts), do: {[], %{}} + + @impl true + def handle_parent_notification(:get_notifications, _ctx, state) do + {[notify_parent: state.notifications], state} + end - # ale to powyzej nie ejst jakies turbo istotne, najpierw zrob tak zeby tick powyzej trwal co 5 sekund - # i przekmin dlaczego zmiany lukasza cokolwiek naprawily + @impl true + def handle_parent_notification(notification, _ctx, state) do + state = Map.update(state, :notifications, [notification], &(&1 ++ [notification])) + {[], state} + end + end + + defmodule TickingSink do + use Membrane.Sink + + @tick_time Membrane.Time.milliseconds(100) + + def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any + + @impl true + def handle_init(_ctx, _opts), do: {[], %{ticked?: false}} + + @impl true + def handle_parent_notification(:start_timer, _ctx, state) do + {[start_timer: {:timer, @tick_time}], state} + end + + @impl true + def handle_tick(:timer, _ctx, %{ticekd?: false} = state) do + actions = [ + demand: {:input, 1}, + timer_interval: {:timer, :no_interval} + ] + + {actions, %{state | ticked?: true}} + end + + @impl true + def handle_tick(:timer, _ctx, %{ticked?: true} = state) do + {[notify_parent: :second_tick], state} + end + + @impl true + def handle_buffer(:input, _buffer, _ctx, state) do + {[timer_interval: {:timer, @tick_time}], state} + end + end + + test "order of handling :tick action" do + {:ok, _supervisor, pipeline} = + Membrane.Pipeline.start_link(TickingPipeline, test_process: self()) + + assert_receive :ticked_two_times + + Membrane.Pipeline.terminate(pipeline) + end + + test "order of handling :notify_child action" do + {:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(NotifyingPipeline) + + # time for pipeline to play + Process.sleep(500) + + send(pipeline, {:get_notifications, self()}) + + assert_receive {:notifications, [:first_notification, :second_notification]} + + Membrane.Pipeline.terminate(pipeline) + end end From ebf574cc9dfdade5ee0d2e2112abc83e5a6fb36f Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 18 Jan 2024 17:41:50 +0100 Subject: [PATCH 03/22] Write tests wip --- .../actions_handling_order_test.exs | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs index e7270e789..599b10a80 100644 --- a/test/membrane/integration/actions_handling_order_test.exs +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -26,7 +26,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do end @impl true - def handle_tick(:one, ctx, %{ticked?: false} = state) do + def handle_tick(:one, _ctx, %{ticked?: false} = state) do {[setup: :complete, timer_interval: {:one, :no_interval}], %{state | ticked?: true}} end @@ -55,13 +55,13 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do end @impl true - def handle_info(:time_to_play, _ctx, state) do - {[setup: :complete, notify_child: {:child, :first_notification}], state} + def handle_playing(_ctx, state) do + {[notify_child: {:child, :second_notification}], state} end @impl true - def handle_playing(_ctx, state) do - {[notify_child: {:child, :second_notification}], state} + def handle_info(:time_to_play, _ctx, state) do + {[setup: :complete, notify_child: {:child, :first_notification}], state} end @impl true @@ -113,7 +113,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do end @impl true - def handle_tick(:timer, _ctx, %{ticekd?: false} = state) do + def handle_tick(:timer, _ctx, %{ticked?: false} = state) do actions = [ demand: {:input, 1}, timer_interval: {:timer, :no_interval} @@ -154,4 +154,21 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do Membrane.Pipeline.terminate(pipeline) end + + test "order of handling :timer_interval and :demand actions" do + spec = + child(:source, %Testing.Source{output: [<<>>]}) + |> child(:sink, TickingSink) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + # time for pipeline to play + Process.sleep(500) + + Testing.Pipeline.message_child(pipeline, :sink, :start_timer) + + assert_pipeline_notified(pipeline, :sink, :second_tick) + + Testing.Pipeline.terminate(pipeline) + end end From ff72c7b40085d8f6af68a12d463eb77f81feec57 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 23 Jan 2024 16:59:05 +0100 Subject: [PATCH 04/22] Revert "Fix timer running late (#685)" This reverts commit 168f57e12bf3280cda82b1116796151d4b1cb486. --- lib/membrane/core/element.ex | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 8bd3d91c7..5901c9048 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -256,20 +256,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:timer_tick, timer_id), state) do - # Guarding the `TimerController.handle_tick/2` invocation is - # required since there might be a case in which `handle_tick` - # callback's implementation returns demand action. - # In this scenario, without this guard, there would a possibility that - # the `handle_buffer` would be called immediately, returning - # some action that would affect the timer and the original state - # of the timer, set with actions returned from `handle_tick`, - # would be overwritten with that action. - # - # For more information see: https://github.com/membraneframework/membrane_core/issues/670 - state = %{state | supplying_demand?: true} state = TimerController.handle_tick(timer_id, state) - state = %{state | supplying_demand?: false} - state = Membrane.Core.Element.DemandHandler.handle_delayed_demands(state) {:noreply, state} end From ed062814c68fedd660966497c317dc0c8cb2f650 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 29 Jan 2024 15:24:15 +0100 Subject: [PATCH 05/22] Fix actions handling order bug related to Pipeline.handle_playing --- lib/membrane/core/lifecycle_controller.ex | 10 +++++++--- lib/membrane/core/pipeline/action_handler.ex | 9 +++++++++ lib/membrane/core/pipeline/state.ex | 6 ++++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/lib/membrane/core/lifecycle_controller.ex b/lib/membrane/core/lifecycle_controller.ex index cbfc310cd..1c1d62387 100644 --- a/lib/membrane/core/lifecycle_controller.ex +++ b/lib/membrane/core/lifecycle_controller.ex @@ -14,12 +14,16 @@ defmodule Membrane.Core.LifecycleController do def handle_setup_operation(operation, state) do :ok = assert_operation_allowed!(operation, state.setup_incomplete?) - case operation do - :incomplete -> + cond do + operation == :incomplete -> Membrane.Logger.debug("Component deferred initialization") %{state | setup_incomplete?: true} - :complete -> + Component.is_pipeline?(state) -> + # complete_setup/1 will be called in Membrane.Core.Pipeline.ActionHandler.handle_end_of_actions/1 + %{state | awaiting_setup_completition?: true} + + Component.is_child?(state) -> complete_setup(state) end end diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index 92065ed43..7fc2faca7 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -103,4 +103,13 @@ defmodule Membrane.Core.Pipeline.ActionHandler do def handle_action(action, _callback, _params, _state) do raise ActionError, action: action, reason: {:unknown_action, Membrane.Pipeline.Action} end + + @impl CallbackHandler + def handle_end_of_actions(state) when state.awaiting_setup_completition? do + %{state | awaiting_setup_completition?: false} + |> Membrane.Core.LifecycleController.complete_setup() + end + + @impl CallbackHandler + def handle_end_of_actions(state), do: state end diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 59ecaded7..6f644947b 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -34,7 +34,8 @@ defmodule Membrane.Core.Pipeline.State do setup_incomplete?: boolean(), handling_action?: boolean(), stalker: Membrane.Core.Stalker.t(), - subprocess_supervisor: pid() + subprocess_supervisor: pid(), + awaiting_setup_completition?: boolean() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -58,5 +59,6 @@ defmodule Membrane.Core.Pipeline.State do handling_action?: false, stalker: nil, resource_guard: nil, - subprocess_supervisor: nil + subprocess_supervisor: nil, + awaiting_setup_completition?: false end From 0e4063842b23fba099d42aee116253a4b843ad22 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 29 Jan 2024 15:30:34 +0100 Subject: [PATCH 06/22] Add assertion on value passed with :setup action --- lib/membrane/core/lifecycle_controller.ex | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/membrane/core/lifecycle_controller.ex b/lib/membrane/core/lifecycle_controller.ex index 1c1d62387..5e4e03f72 100644 --- a/lib/membrane/core/lifecycle_controller.ex +++ b/lib/membrane/core/lifecycle_controller.ex @@ -56,5 +56,13 @@ defmodule Membrane.Core.LifecycleController do """ end + defp assert_operation_allowed!(operation, _status) + when operation not in [:incomplete, :complete] do + raise SetupError, """ + Action {:setup, #{inspect(operation)}} was returned, but second element in the tuple must + be :complete or :incomplete + """ + end + defp assert_operation_allowed!(_operation, _status), do: :ok end From 990172b1d74f7e2a4c9af312b53bbeb8bb3ce5a5 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 29 Jan 2024 17:05:06 +0100 Subject: [PATCH 07/22] WIP Fix bug in executing handle_buffer while handling actions from previous callback --- lib/membrane/core/callback_handler.ex | 9 ++++++++ lib/membrane/core/element/action_handler.ex | 23 +++++++++++++------ lib/membrane/core/element/demand_handler.ex | 9 ++++++++ lib/membrane/core/element/event_controller.ex | 2 ++ .../integration/toilet_forwarding_test.exs | 3 ++- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 30cb8d3bc..f300aafd4 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -7,6 +7,7 @@ defmodule Membrane.Core.CallbackHandler do use Bunch + alias Membrane.Core.Component alias Membrane.CallbackError require Membrane.Logger @@ -191,6 +192,9 @@ defmodule Membrane.Core.CallbackHandler do was_handling_action? = state.handling_action? state = %{state | handling_action?: true} + was_supplying_demand? = Map.get(state, :supplying_demand?, false) + state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state + state = Enum.reduce(actions, state, fn action, state -> try do @@ -210,6 +214,11 @@ defmodule Membrane.Core.CallbackHandler do do: state, else: %{state | handling_action?: false} + state = + if Component.is_element?(state) and not was_supplying_demand?, + do: %{state | supplying_demand?: false}, + else: state + handler_module.handle_end_of_actions(state) end end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0812367b6..b4c9deed7 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -47,13 +47,20 @@ defmodule Membrane.Core.Element.ActionHandler do defguardp is_demand_size(size) when is_integer(size) or is_function(size) @impl CallbackHandler - def handle_end_of_actions(state) when not state.handling_action? do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) - end + def handle_end_of_actions(state) do + state = + with %{handling_action?: false} <- state do + Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) + end - @impl CallbackHandler - def handle_end_of_actions(state), do: state + state = + with %{supplying_demand?: false} <- state do + DemandHandler.handle_delayed_demands(state) + end + + state + end @impl CallbackHandler def handle_action({action, _}, :handle_init, _params, _state) @@ -467,7 +474,9 @@ defmodule Membrane.Core.Element.ActionHandler do defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do state = PadController.remove_pad_associations(pad_ref, state) - PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + + DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + |> PadModel.set_data!(pad_ref, :end_of_stream?, true) else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 74c6ce05a..1d22d7eb9 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do end end + @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() + def remove_pad_from_delayed_demands(pad_ref, state) do + Map.update!(state, :delayed_demands, fn delayed_demands_set -> + delayed_demands_set + |> MapSet.delete({pad_ref, :supply}) + |> MapSet.delete({pad_ref, :redemand}) + end) + end + @spec handle_input_queue_output( Pad.ref(), [InputQueue.output_value()], diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 377b70a30..6d9bf1c2f 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, + DemandHandler, InputQueue, PadController, PlaybackQueue, @@ -98,6 +99,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + state = DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) state = PadController.remove_pad_associations(pad_ref, state) %{ diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs index 1f240dfff..37097add8 100644 --- a/test/membrane/integration/toilet_forwarding_test.exs +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -234,7 +234,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do ) for i <- 1..3000 do - assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: <>}) + assert_sink_buffer(pipeline, :sink, buffer) + assert %Membrane.Buffer{payload: <>} = buffer assert buff_idx == i end From d6a016942281033622ea9f651944bc6576c783c5 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 30 Jan 2024 13:29:58 +0100 Subject: [PATCH 08/22] Fix tests wip --- lib/membrane/core/element/action_handler.ex | 33 +++++++++++++++---- .../core/element/event_controller_test.exs | 5 ++- .../element/lifecycle_controller_test.exs | 2 ++ .../core/element/pad_controller_test.exs | 3 ++ .../element/stream_format_controller_test.exs | 5 +-- 5 files changed, 38 insertions(+), 10 deletions(-) diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index b4c9deed7..b42dd7257 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -48,20 +48,39 @@ defmodule Membrane.Core.Element.ActionHandler do @impl CallbackHandler def handle_end_of_actions(state) do + # Fixed order of handling demand of manual and auto pads would lead to + # favoring manual pads over auto pads (or vice versa), especially after + # introducting auto flow queues. + manual_demands_first? = Enum.random([1, 2]) == 1 + state = - with %{handling_action?: false} <- state do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) - end + if manual_demands_first?, + do: maybe_handle_delayed_demands(state), + else: state + + state = maybe_handle_pads_to_snapshot(state) state = - with %{supplying_demand?: false} <- state do - DemandHandler.handle_delayed_demands(state) - end + if manual_demands_first?, + do: state, + else: maybe_handle_delayed_demands(state) state end + defp maybe_handle_delayed_demands(state) do + with %{supplying_demand?: false} <- state do + DemandHandler.handle_delayed_demands(state) + end + end + + defp maybe_handle_pads_to_snapshot(state) do + with %{handling_action?: false} <- state do + Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) + end + end + @impl CallbackHandler def handle_action({action, _}, :handle_init, _params, _state) when action not in [:latency, :notify_parent] do diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 198d13e4e..65ea9bf5a 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -43,7 +43,7 @@ defmodule Membrane.Core.Element.EventControllerTest do }) state = - struct(State, + struct!(State, module: MockEventHandlingElement, name: :test_name, type: :filter, @@ -51,7 +51,10 @@ defmodule Membrane.Core.Element.EventControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), + handle_demand_loop_counter: 0, pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index e2d68faa2..8e40c1bd3 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -50,7 +50,9 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index ad17195e1..c9c00e8a4 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -19,7 +19,10 @@ defmodule Membrane.Core.Element.PadControllerTest do name: name, module: elem_module, callback_depth_counter: 0, + handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), parent_pid: self(), internal_state: %{}, synchronization: %{clock: nil, parent_clock: nil}, diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 309313887..f1c7dbe13 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -35,15 +35,16 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do }) state = - struct(State, + struct!(State, module: Filter, name: :test_name, - parent: self(), type: :filter, playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + handle_demand_loop_counter: 0, pads_data: %{ input: struct(Membrane.Element.PadData, From 9c567e64aade27ba47ce4978473b60f3bbbb581b Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 31 Jan 2024 18:58:10 +0100 Subject: [PATCH 09/22] Fix CI --- lib/membrane/core/callback_handler.ex | 2 +- test/membrane/core/element/stream_format_controller_test.exs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index f300aafd4..4b8a0c9f6 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -7,8 +7,8 @@ defmodule Membrane.Core.CallbackHandler do use Bunch - alias Membrane.Core.Component alias Membrane.CallbackError + alias Membrane.Core.Component require Membrane.Logger diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index f1c7dbe13..766ae89db 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -44,6 +44,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do handling_action?: false, supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), handle_demand_loop_counter: 0, pads_data: %{ input: From 9efb412b12da3f7d37e35d58054ad8d40ef4dbcc Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 31 Jan 2024 19:08:34 +0100 Subject: [PATCH 10/22] Update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 000523f6c..4f1cb8e71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ * Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626) * Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680) * Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681) - * Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702) + * Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702) + * Fix bug in the order of handling actions from callbacks [#708](https://github.com/membraneframework/membrane_core/pull/708) ## 1.0.0 * Introduce `:remove_link` action in pipelines and bins. From 47e97c925966405730df4487e128e503950010ad Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 2 Feb 2024 15:12:53 +0100 Subject: [PATCH 11/22] Stopt calling handle_spec_started in between handling actions --- lib/membrane/core/bin/action_handler.ex | 15 ++++++++++- lib/membrane/core/bin/state.ex | 2 ++ .../core/parent/child_life_controller.ex | 18 +++++++++++-- lib/membrane/core/pipeline/action_handler.ex | 25 +++++++++++++------ lib/membrane/core/pipeline/state.ex | 2 ++ test/membrane/core/pipeline_test.exs | 2 ++ 6 files changed, 53 insertions(+), 11 deletions(-) diff --git a/lib/membrane/core/bin/action_handler.ex b/lib/membrane/core/bin/action_handler.ex index a4888c529..bd507ca41 100644 --- a/lib/membrane/core/bin/action_handler.ex +++ b/lib/membrane/core/bin/action_handler.ex @@ -6,6 +6,7 @@ defmodule Membrane.Core.Bin.ActionHandler do alias Membrane.Core alias Membrane.Core.Bin.State alias Membrane.Core.{Message, Parent, TimerController} + alias Membrane.Core.Parent.ChildLifeController require Membrane.Logger require Message @@ -34,7 +35,14 @@ defmodule Membrane.Core.Bin.ActionHandler do end @impl CallbackHandler - def handle_action({:spec, spec}, _cb, _params, state) do + def handle_action({:spec, spec}, cb, _params, state) do + if cb == :handle_spec_started do + Membrane.Logger.warning(""" + Action :spec was returned from handle_spec_started/3 callback. It is suggested not to do this, + because it might lead to infinite loof of handle_spec_started/3 executions. + """) + end + Parent.ChildLifeController.handle_spec(spec, state) end @@ -108,4 +116,9 @@ defmodule Membrane.Core.Bin.ActionHandler do def handle_action(action, _callback, _params, _state) do raise ActionError, action: action, reason: {:unknown_action, Membrane.Bin.Action} end + + @impl CallbackHandler + def handle_end_of_actions(state) do + ChildLifeController.trigger_specs(state) + end end diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index 9ac285763..eacdd9b8d 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -40,6 +40,7 @@ defmodule Membrane.Core.Bin.State do }, children_log_metadata: Keyword.t(), pending_specs: ChildLifeController.pending_specs(), + specs_to_trigger: [ChildLifeController.spec_ref()], playback: Membrane.Playback.t(), initialized?: boolean(), terminating?: boolean(), @@ -69,6 +70,7 @@ defmodule Membrane.Core.Bin.State do links: %{}, crash_groups: %{}, pending_specs: %{}, + specs_to_trigger: [], synchronization: nil, initialized?: false, terminating?: false, diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 794c5ea64..6624e5644 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -71,6 +71,12 @@ defmodule Membrane.Core.Parent.ChildLifeController do log_metadata: [] } + # todo: handle documentation + @spec handle_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() + def handle_spec(spec, state) do + Map.update!(state, :specs_to_trigger, &[spec | &1]) + end + @doc """ Handles `Membrane.ChildrenSpec` returned with `spec` action. @@ -99,8 +105,16 @@ defmodule Membrane.Core.Parent.ChildLifeController do - Cleanup spec: remove it from `pending_specs` and all other specs' `dependent_specs` and try proceeding startup for all other pending specs that depended on the spec. """ - @spec handle_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() | no_return() - def handle_spec(spec, state) do + @spec trigger_specs(Parent.state()) :: Parent.state() | no_return() + def trigger_specs(state) do + specs_to_trigger = state.specs_to_trigger + state = %{state | specs_to_trigger: []} + + List.foldr(specs_to_trigger, state, &do_trigger_spec/2) + end + + @spec do_trigger_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() | no_return() + defp do_trigger_spec(spec, state) do spec_ref = make_ref() canonical_spec = make_canonical(spec) diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index 7fc2faca7..bc3ec14a0 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -5,9 +5,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do alias Membrane.ActionError alias Membrane.Core alias Membrane.Core.{Parent, TimerController} - alias Membrane.Core.Parent.LifecycleController + alias Membrane.Core.Parent.{ChildLifeController, LifecycleController} alias Membrane.Core.Pipeline.State + require Membrane.Logger + @impl CallbackHandler def handle_action({:spec, args}, _cb, _params, %State{terminating?: true}) do raise Membrane.ParentError, @@ -32,7 +34,14 @@ defmodule Membrane.Core.Pipeline.ActionHandler do end @impl CallbackHandler - def handle_action({:spec, spec}, _cb, _params, state) do + def handle_action({:spec, spec}, cb, _params, state) do + if cb == :handle_spec_started do + Membrane.Logger.warning(""" + Action :spec was returned from handle_spec_started/3 callback. It is suggested not to do this, + because it might lead to infinite loof of handle_spec_started/3 executions. + """) + end + Parent.ChildLifeController.handle_spec(spec, state) end @@ -105,11 +114,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do end @impl CallbackHandler - def handle_end_of_actions(state) when state.awaiting_setup_completition? do - %{state | awaiting_setup_completition?: false} - |> Membrane.Core.LifecycleController.complete_setup() + def handle_end_of_actions(state) do + with %{awaiting_setup_completition?: true} <- state do + %{state | awaiting_setup_completition?: false} + |> Membrane.Core.LifecycleController.complete_setup() + end + |> ChildLifeController.trigger_specs() end - - @impl CallbackHandler - def handle_end_of_actions(state), do: state end diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 6f644947b..6fb5b745f 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -20,6 +20,7 @@ defmodule Membrane.Core.Pipeline.State do links: %{Link.id() => Link.t()}, crash_groups: %{CrashGroup.name() => CrashGroup.t()}, pending_specs: ChildLifeController.pending_specs(), + specs_to_trigger: [ChildLifeController.spec_ref()], synchronization: %{ timers: %{Timer.id() => Timer.t()}, clock_provider: %{ @@ -52,6 +53,7 @@ defmodule Membrane.Core.Pipeline.State do links: %{}, crash_groups: %{}, pending_specs: %{}, + specs_to_trigger: [], synchronization: nil, initialized?: false, terminating?: false, diff --git a/test/membrane/core/pipeline_test.exs b/test/membrane/core/pipeline_test.exs index 5ef8f2f7d..d59fc2b2c 100644 --- a/test/membrane/core/pipeline_test.exs +++ b/test/membrane/core/pipeline_test.exs @@ -79,6 +79,7 @@ defmodule Membrane.Core.PipelineTest do [], state ) + |> ActionHandler.handle_end_of_actions() end end @@ -92,6 +93,7 @@ defmodule Membrane.Core.PipelineTest do [], state ) + |> ActionHandler.handle_end_of_actions() end end end From bbec5ef44835c45d811656d22491ed19d892573b Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 2 Feb 2024 16:13:49 +0100 Subject: [PATCH 12/22] Make demands test more strict --- test/membrane/integration/demands_test.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index ea9880c56..bd5da5b11 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -15,7 +15,8 @@ defmodule Membrane.Integration.DemandsTest do defp assert_buffers_received(range, pid) do Enum.each(range, fn i -> - assert_sink_buffer(pid, :sink, %Buffer{payload: <<^i::16>> <> <<255>>}) + assert_sink_buffer(pid, :sink, buffer) + assert %Buffer{payload: <<^i::16>> <> <<255>>} = buffer end) end From e6b8acb938d8db68d5b93f484a4ff2eb4aaa86c7 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 6 Feb 2024 17:34:27 +0100 Subject: [PATCH 13/22] Add dots to changelog --- CHANGELOG.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f1cb8e71..56f16c2ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,11 @@ ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) - * Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626) - * Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680) - * Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681) - * Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702) - * Fix bug in the order of handling actions from callbacks [#708](https://github.com/membraneframework/membrane_core/pull/708) + * Fix clock selection. [#626](https://github.com/membraneframework/membrane_core/pull/626) + * Log messages in the default handle_info implementation. [#680](https://github.com/membraneframework/membrane_core/pull/680) + * Fix typespecs in Membrane.UtilitySupervisor. [#681](https://github.com/membraneframework/membrane_core/pull/681) + * Improve callback return types and group actions types. [#702](https://github.com/membraneframework/membrane_core/pull/702) + * Fix bug in the order of handling actions from callbacks. [#708](https://github.com/membraneframework/membrane_core/pull/708) ## 1.0.0 * Introduce `:remove_link` action in pipelines and bins. From 989764f26c13a3b711e272e6ca41023cc76ab0f0 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 8 Feb 2024 10:35:25 +0100 Subject: [PATCH 14/22] Fix double tick bug --- lib/membrane/core/timer.ex | 21 +++++++++-- lib/membrane/core/timer_controller.ex | 7 ++++ .../actions_handling_order_test.exs | 37 ++++++++++++++----- 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/lib/membrane/core/timer.ex b/lib/membrane/core/timer.ex index 5a23c957c..bd4eab23f 100644 --- a/lib/membrane/core/timer.ex +++ b/lib/membrane/core/timer.ex @@ -15,11 +15,13 @@ defmodule Membrane.Core.Timer do clock: Clock.t(), next_tick_time: Time.t(), ratio: Clock.ratio(), - timer_ref: reference() | nil + timer_ref: reference() | nil, + awaiting_message?: boolean() } @enforce_keys [:interval, :clock, :init_time, :id] - defstruct @enforce_keys ++ [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil] + defstruct @enforce_keys ++ + [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil, awaiting_message?: false] @spec start(id, interval, Clock.t()) :: t def start(id, interval, clock) do @@ -42,8 +44,14 @@ defmodule Membrane.Core.Timer do %__MODULE__{timer | ratio: ratio} end + @spec handle_message_arrived(t) :: t + def handle_message_arrived(%__MODULE__{awaiting_message?: true} = timer) do + %{timer | awaiting_message?: false} + end + @spec tick(t) :: t - def tick(%__MODULE__{interval: :no_interval} = timer) do + def tick(%__MODULE__{} = timer) + when timer.awaiting_message? or timer.interval == :no_interval do timer end @@ -67,7 +75,12 @@ defmodule Membrane.Core.Timer do timer_ref = Process.send_after(self(), Message.new(:timer_tick, id), beam_next_tick_time, abs: true) - %__MODULE__{timer | next_tick_time: next_tick_time |> Ratio.floor(), timer_ref: timer_ref} + %__MODULE__{ + timer + | next_tick_time: next_tick_time |> Ratio.floor(), + timer_ref: timer_ref, + awaiting_message?: true + } end @spec set_interval(t, interval) :: t diff --git a/lib/membrane/core/timer_controller.ex b/lib/membrane/core/timer_controller.ex index 85e154fcb..a5c5faa64 100644 --- a/lib/membrane/core/timer_controller.ex +++ b/lib/membrane/core/timer_controller.ex @@ -58,6 +58,13 @@ defmodule Membrane.Core.TimerController do @spec handle_tick(Timer.id(), Component.state()) :: Component.state() def handle_tick(timer_id, state) when is_timer_present(timer_id, state) do + state = + update_in( + state, + [:synchronization, :timers, timer_id], + &Timer.handle_message_arrived/1 + ) + state = CallbackHandler.exec_and_handle_callback( :handle_tick, diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs index 599b10a80..f54ac0d49 100644 --- a/test/membrane/integration/actions_handling_order_test.exs +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -100,36 +100,47 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do defmodule TickingSink do use Membrane.Sink - @tick_time Membrane.Time.milliseconds(100) + @short_tick_time Membrane.Time.milliseconds(100) + @long_tick_time Membrane.Time.seconds(2) def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any @impl true - def handle_init(_ctx, _opts), do: {[], %{ticked?: false}} + def handle_init(_ctx, _opts), do: {[], %{tick_counter: 0}} @impl true def handle_parent_notification(:start_timer, _ctx, state) do - {[start_timer: {:timer, @tick_time}], state} + {[start_timer: {:timer, @short_tick_time}], state} end @impl true - def handle_tick(:timer, _ctx, %{ticked?: false} = state) do + def handle_tick(:timer, _ctx, %{tick_counter: 0} = state) do actions = [ demand: {:input, 1}, timer_interval: {:timer, :no_interval} ] - {actions, %{state | ticked?: true}} + {actions, %{state | tick_counter: 1}} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_counter: 1} = state) do + actions = [ + notify_parent: :second_tick, + timer_interval: {:timer, @long_tick_time} + ] + + {actions, %{state | tick_counter: 2}} end @impl true - def handle_tick(:timer, _ctx, %{ticked?: true} = state) do - {[notify_parent: :second_tick], state} + def handle_tick(:timer, _ctx, %{tick_counter: 2} = state) do + {[notify_parent: :third_tick], %{state | tick_counter: 3}} end @impl true def handle_buffer(:input, _buffer, _ctx, state) do - {[timer_interval: {:timer, @tick_time}], state} + {[timer_interval: {:timer, @short_tick_time}], state} end end @@ -155,7 +166,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do Membrane.Pipeline.terminate(pipeline) end - test "order of handling :timer_interval and :demand actions" do + test ":demand and :timer_interval actions don't interact with each other" do spec = child(:source, %Testing.Source{output: [<<>>]}) |> child(:sink, TickingSink) @@ -163,12 +174,18 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) # time for pipeline to play - Process.sleep(500) + Process.sleep(100) Testing.Pipeline.message_child(pipeline, :sink, :start_timer) assert_pipeline_notified(pipeline, :sink, :second_tick) + # third tick should arrive after two seconds, not ealier + refute_pipeline_notified(pipeline, :sink, :third_tick, 1_500) + assert_pipeline_notified(pipeline, :sink, :third_tick) + + assert Testing.Pipeline.get_child_pid!(pipeline, :source) |> Process.alive?() + Testing.Pipeline.terminate(pipeline) end end From 1076db250296e56615e3e26317b1ca753ca64257 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 12 Feb 2024 18:43:06 +0100 Subject: [PATCH 15/22] wip --- lib/membrane/bin.ex | 7 +++--- lib/membrane/core/bin/action_handler.ex | 5 ---- lib/membrane/core/bin/state.ex | 2 -- lib/membrane/core/callback_handler.ex | 4 +++ lib/membrane/core/element/action_handler.ex | 25 ------------------- lib/membrane/core/parent.ex | 11 ++++++++ .../core/parent/child_life_controller.ex | 18 ++----------- .../child_life_controller/startup_utils.ex | 21 ++++++++++------ lib/membrane/core/pipeline/action_handler.ex | 1 - lib/membrane/core/pipeline/state.ex | 2 -- lib/membrane/pipeline.ex | 7 +++--- 11 files changed, 38 insertions(+), 65 deletions(-) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index d13151fd4..e5c0c89ef 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -309,6 +309,7 @@ defmodule Membrane.Bin do alias unquote(__MODULE__) @behaviour unquote(__MODULE__) @before_compile unquote(__MODULE__) + @after_compile Membrane.Core.Parent unquote(bring_spec) unquote(bring_pad) @@ -354,8 +355,8 @@ defmodule Membrane.Bin do {[], state} end - @impl true - def handle_spec_started(new_children, _ctx, state), do: {[], state} + # @impl true + # def handle_spec_started(new_children, _ctx, state), do: {[], state} @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -381,7 +382,7 @@ defmodule Membrane.Bin do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_started: 3, + # handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/core/bin/action_handler.ex b/lib/membrane/core/bin/action_handler.ex index bd507ca41..912a3bc0d 100644 --- a/lib/membrane/core/bin/action_handler.ex +++ b/lib/membrane/core/bin/action_handler.ex @@ -116,9 +116,4 @@ defmodule Membrane.Core.Bin.ActionHandler do def handle_action(action, _callback, _params, _state) do raise ActionError, action: action, reason: {:unknown_action, Membrane.Bin.Action} end - - @impl CallbackHandler - def handle_end_of_actions(state) do - ChildLifeController.trigger_specs(state) - end end diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index eacdd9b8d..9ac285763 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -40,7 +40,6 @@ defmodule Membrane.Core.Bin.State do }, children_log_metadata: Keyword.t(), pending_specs: ChildLifeController.pending_specs(), - specs_to_trigger: [ChildLifeController.spec_ref()], playback: Membrane.Playback.t(), initialized?: boolean(), terminating?: boolean(), @@ -70,7 +69,6 @@ defmodule Membrane.Core.Bin.State do links: %{}, crash_groups: %{}, pending_specs: %{}, - specs_to_trigger: [], synchronization: nil, initialized?: false, terminating?: false, diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 4b8a0c9f6..91729aef0 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -192,6 +192,10 @@ defmodule Membrane.Core.CallbackHandler do was_handling_action? = state.handling_action? state = %{state | handling_action?: true} + # Updating :supplying_demand? flag value here is a temporal fix. + # Setting it to `true` while handling actions causes postponing calls + # of handle_redemand/2 and supply_demand/2 until a moment, when all + # actions returned from the callback are handled was_supplying_demand? = Map.get(state, :supplying_demand?, false) state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index b42dd7257..4d336f2c0 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -40,7 +40,6 @@ defmodule Membrane.Core.Element.ActionHandler do @impl CallbackHandler def transform_actions(actions, callback, _handler_params, state) do actions = join_buffers(actions) - ensure_nothing_after_redemand(actions, callback, state) {actions, state} end @@ -306,30 +305,6 @@ defmodule Membrane.Core.Element.ActionHandler do ) end - defp ensure_nothing_after_redemand(actions, callback, state) do - {redemands, actions_after_redemands} = - actions - |> Enum.drop_while(fn - {:redemand, _args} -> false - _other_action -> true - end) - |> Enum.split_while(fn - {:redemand, _args} -> true - _other_action -> false - end) - - case {redemands, actions_after_redemands} do - {_redemands, []} -> - :ok - - {[redemand | _redemands], _actions_after_redemands} -> - raise ActionError, - reason: :actions_after_redemand, - action: redemand, - callback: {state.module, callback} - end - end - @spec send_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() defp send_buffer(_pad_ref, [], state) do state diff --git a/lib/membrane/core/parent.ex b/lib/membrane/core/parent.ex index 4f0fb5f8a..5aee8bad4 100644 --- a/lib/membrane/core/parent.ex +++ b/lib/membrane/core/parent.ex @@ -2,4 +2,15 @@ defmodule Membrane.Core.Parent do @moduledoc false @type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Pipeline.State.t() + + @warn_whitelist [Membrane.Testing.Pipeline] + + @spec __after_compile__(Macro.Env.t(), binary()) :: any() + def __after_compile__(env, _bytecode) do + if env.module not in @warn_whitelist and Module.defines?(__MODULE__, {:handle_spec_started, 3}, :def) do + IO.warn(""" + Callback handle_spec_started/3 has been deprecated since :membrane_core v1.0.1, but it is implemented in #{inspect(__MODULE__)} + """) + end + end end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 6624e5644..794c5ea64 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -71,12 +71,6 @@ defmodule Membrane.Core.Parent.ChildLifeController do log_metadata: [] } - # todo: handle documentation - @spec handle_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() - def handle_spec(spec, state) do - Map.update!(state, :specs_to_trigger, &[spec | &1]) - end - @doc """ Handles `Membrane.ChildrenSpec` returned with `spec` action. @@ -105,16 +99,8 @@ defmodule Membrane.Core.Parent.ChildLifeController do - Cleanup spec: remove it from `pending_specs` and all other specs' `dependent_specs` and try proceeding startup for all other pending specs that depended on the spec. """ - @spec trigger_specs(Parent.state()) :: Parent.state() | no_return() - def trigger_specs(state) do - specs_to_trigger = state.specs_to_trigger - state = %{state | specs_to_trigger: []} - - List.foldr(specs_to_trigger, state, &do_trigger_spec/2) - end - - @spec do_trigger_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() | no_return() - defp do_trigger_spec(spec, state) do + @spec handle_spec(ChildrenSpec.t(), Parent.state()) :: Parent.state() | no_return() + def handle_spec(spec, state) do spec_ref = make_ref() canonical_spec = make_canonical(spec) diff --git a/lib/membrane/core/parent/child_life_controller/startup_utils.ex b/lib/membrane/core/parent/child_life_controller/startup_utils.ex index a4d4d8884..70cc96de8 100644 --- a/lib/membrane/core/parent/child_life_controller/startup_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/startup_utils.ex @@ -104,15 +104,20 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do @spec exec_handle_spec_started([Membrane.Child.name()], Parent.state()) :: Parent.state() def exec_handle_spec_started(children_names, state) do - action_handler = Component.action_handler(state) - - CallbackHandler.exec_and_handle_callback( - :handle_spec_started, - action_handler, - %{context: &Component.context_from_state/1}, - [children_names], + # handle_spec_started/3 callback is deprecated, so we don't require its implementation + if function_exported?(state.module, :handle_spec_started, 3) do + action_handler = Component.action_handler(state) + + CallbackHandler.exec_and_handle_callback( + :handle_spec_started, + action_handler, + %{context: &Component.context_from_state/1}, + [children_names], + state + ) + else state - ) + end end @spec check_if_children_names_and_children_groups_ids_are_unique( diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index bc3ec14a0..f31378171 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -119,6 +119,5 @@ defmodule Membrane.Core.Pipeline.ActionHandler do %{state | awaiting_setup_completition?: false} |> Membrane.Core.LifecycleController.complete_setup() end - |> ChildLifeController.trigger_specs() end end diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 6fb5b745f..6f644947b 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -20,7 +20,6 @@ defmodule Membrane.Core.Pipeline.State do links: %{Link.id() => Link.t()}, crash_groups: %{CrashGroup.name() => CrashGroup.t()}, pending_specs: ChildLifeController.pending_specs(), - specs_to_trigger: [ChildLifeController.spec_ref()], synchronization: %{ timers: %{Timer.id() => Timer.t()}, clock_provider: %{ @@ -53,7 +52,6 @@ defmodule Membrane.Core.Pipeline.State do links: %{}, crash_groups: %{}, pending_specs: %{}, - specs_to_trigger: [], synchronization: nil, initialized?: false, terminating?: false, diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index f78be3d60..7bea5af43 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -459,6 +459,7 @@ defmodule Membrane.Pipeline do alias unquote(__MODULE__) require Membrane.Logger @behaviour unquote(__MODULE__) + @after_compile Membrane.Core.Parent unquote(bring_spec) unquote(bring_pad) @@ -502,8 +503,8 @@ defmodule Membrane.Pipeline do {[], state} end - @impl true - def handle_spec_started(new_children, _ctx, state), do: {[], state} + # @impl true + # def handle_spec_started(new_children, _ctx, state), do: {[], state} @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -528,7 +529,7 @@ defmodule Membrane.Pipeline do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_started: 3, + # handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, From 37bc27c972ee40dbd81145e6d2720db72f4db269 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 13 Feb 2024 12:08:40 +0100 Subject: [PATCH 16/22] Deprecate handle_spec_started/3 --- lib/membrane/bin.ex | 6 ++- lib/membrane/core/parent.ex | 23 ++++++--- lib/membrane/pipeline.ex | 6 ++- lib/membrane/testing/pipeline.ex | 7 ++- .../core/element/action_handler_test.exs | 11 ----- test/membrane/integration/linking_test.exs | 48 +++++-------------- 6 files changed, 43 insertions(+), 58 deletions(-) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index e5c0c89ef..5d4937787 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -309,7 +309,9 @@ defmodule Membrane.Bin do alias unquote(__MODULE__) @behaviour unquote(__MODULE__) @before_compile unquote(__MODULE__) - @after_compile Membrane.Core.Parent + + require Membrane.Core.Parent + Membrane.Core.Parent.bring_after_compile_check() unquote(bring_spec) unquote(bring_pad) @@ -382,7 +384,7 @@ defmodule Membrane.Bin do handle_setup: 2, handle_playing: 2, handle_info: 3, - # handle_spec_started: 3, + # handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/core/parent.ex b/lib/membrane/core/parent.ex index 5aee8bad4..f0b6e3f36 100644 --- a/lib/membrane/core/parent.ex +++ b/lib/membrane/core/parent.ex @@ -3,14 +3,23 @@ defmodule Membrane.Core.Parent do @type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Pipeline.State.t() - @warn_whitelist [Membrane.Testing.Pipeline] + defmacro bring_after_compile_check() do + quote do + @after_compile {__MODULE__, :__membrane_check_deprecated_functions__} - @spec __after_compile__(Macro.Env.t(), binary()) :: any() - def __after_compile__(env, _bytecode) do - if env.module not in @warn_whitelist and Module.defines?(__MODULE__, {:handle_spec_started, 3}, :def) do - IO.warn(""" - Callback handle_spec_started/3 has been deprecated since :membrane_core v1.0.1, but it is implemented in #{inspect(__MODULE__)} - """) + def __membrane_check_deprecated_functions__(env, _bytecode) do + modules_whitelist = [Membrane.Testing.Pipeline] + + if env.module not in modules_whitelist and + Module.defines?(env.module, {:handle_spec_started, 3}, :def) do + warn_message = """ + Callback handle_spec_started/3 has been deprecated since \ + :membrane_core v1.0.1, but it is implemented in #{inspect(env.module)} + """ + + IO.warn(warn_message, []) + end + end end end end diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 7bea5af43..c39d6f7d0 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -459,7 +459,9 @@ defmodule Membrane.Pipeline do alias unquote(__MODULE__) require Membrane.Logger @behaviour unquote(__MODULE__) - @after_compile Membrane.Core.Parent + + require Membrane.Core.Parent + Membrane.Core.Parent.bring_after_compile_check() unquote(bring_spec) unquote(bring_pad) @@ -529,7 +531,7 @@ defmodule Membrane.Pipeline do handle_setup: 2, handle_playing: 2, handle_info: 3, - # handle_spec_started: 3, + # handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 7d4084ef5..73becaa8c 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -522,7 +522,12 @@ defmodule Membrane.Testing.Pipeline do do: {[], nil} defp eval_injected_module_callback(callback, args, state) do - apply(state.module, callback, args ++ [state.custom_pipeline_state]) + if callback != :handle_spec_started or + function_exported?(state.module, :handle_spec_started, 3) do + apply(state.module, callback, args ++ [state.custom_pipeline_state]) + else + {[], state.custom_pipeline_state} + end end defp notify_test_process(test_process, message) do diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 4c505230d..3c721fc99 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -540,16 +540,5 @@ defmodule Membrane.Core.Element.ActionHandlerTest do end ) end - - test "when :redemand is not the last action", %{state: state} do - assert_raise ActionError, ~r/redemand.*last/i, fn -> - @module.transform_actions( - [redemand: :output, notify_parent: :a, notify_parent: :b], - :handle_other, - %{}, - state - ) - end - end end end diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index b23cde06c..a2a9e5081 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -110,12 +110,6 @@ defmodule Membrane.Integration.LinkingTest do def handle_info(_msg, _ctx, state) do {[], state} end - - @impl true - def handle_spec_started(_children, _ctx, state) do - send(state.testing_pid, :spec_started) - {[], state} - end end setup do @@ -139,12 +133,13 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"a"}) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"b"}) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"c"}) send(pipeline, {:remove_children, :sink}) assert_pipeline_notified(pipeline, :bin, :handle_pad_removed) + + Membrane.Pipeline.terminate(pipeline) end test "and element crashes, bin forwards the unlink message to child", %{pipeline: pipeline} do @@ -166,10 +161,7 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: bin_spec}}) - assert_receive(:spec_started) - send(pipeline, {:start_spec, %{spec: sink_spec}}) - assert_receive(:spec_started) sink_pid = get_child_pid(:sink, pipeline) bin_pid = get_child_pid(:bin, pipeline) @@ -188,6 +180,8 @@ defmodule Membrane.Integration.LinkingTest do match?(%Membrane.PadError{}, error) assert error.message =~ ~r/static.*pad.*unlink/u + + Membrane.Pipeline.terminate(pipeline) end end @@ -208,13 +202,12 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: spec}}) - send(pipeline, {:kill, [:sink]}) - assert_receive(:spec_started) - assert_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) + + Membrane.Pipeline.terminate(pipeline) end test "element shouldn't crash when its neighbor connected via dynamic pad crashes", %{ @@ -237,12 +230,12 @@ defmodule Membrane.Integration.LinkingTest do spec = [spec_1, spec_2, links_spec] send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) - send(pipeline, {:kill, [:sink]}) refute_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) + + Membrane.Pipeline.terminate(pipeline) end test "element shouldn't crash when its neighbor connected via dynamic pad crashes and the crash groups are set within nested spec", @@ -264,31 +257,12 @@ defmodule Membrane.Integration.LinkingTest do |> get_child(:sink) send(pipeline, {:start_spec, %{spec: [spec, links_spec]}}) - assert_receive(:spec_started) - send(pipeline, {:kill, [:sink]}) refute_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) - end - - test "pipeline playback should change successfully after spec with links has been returned", - %{pipeline: pipeline} do - bin_spec = { - child(:bin, %Bin{child: %Testing.Source{output: [~c"a", ~c"b", ~c"c"]}}), - group: :group_1, crash_group_mode: :temporary - } - - sink_spec = { - child(:sink, Testing.Sink), - group: :group_1, crash_group_mode: :temporary - } - links_spec = get_child(:bin) |> get_child(:sink) - - spec = [bin_spec, sink_spec, links_spec] - send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) + Membrane.Pipeline.terminate(pipeline) end defmodule SlowSetupSink do @@ -349,6 +323,8 @@ defmodule Membrane.Integration.LinkingTest do {element, pad} end) + + Membrane.Pipeline.terminate(pipeline) end test "Elements and bins can be spawned, linked and removed" do @@ -507,6 +483,8 @@ defmodule Membrane.Integration.LinkingTest do refute_link_removed(pipeline, i) end end + + Membrane.Pipeline.terminate(pipeline) end describe "Spec shouldn't wait on links with" do From f4402ad7ffbc6d3cb065b0aba0c3624ce9b72e7d Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 13 Feb 2024 12:11:35 +0100 Subject: [PATCH 17/22] Remove unused aliases --- lib/membrane/core/bin/action_handler.ex | 1 - lib/membrane/core/element/action_handler.ex | 2 +- lib/membrane/core/pipeline/action_handler.ex | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/membrane/core/bin/action_handler.ex b/lib/membrane/core/bin/action_handler.ex index 912a3bc0d..b37011b8a 100644 --- a/lib/membrane/core/bin/action_handler.ex +++ b/lib/membrane/core/bin/action_handler.ex @@ -6,7 +6,6 @@ defmodule Membrane.Core.Bin.ActionHandler do alias Membrane.Core alias Membrane.Core.Bin.State alias Membrane.Core.{Message, Parent, TimerController} - alias Membrane.Core.Parent.ChildLifeController require Membrane.Logger require Message diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 4d336f2c0..b4c1021a5 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -38,7 +38,7 @@ defmodule Membrane.Core.Element.ActionHandler do require Membrane.Logger @impl CallbackHandler - def transform_actions(actions, callback, _handler_params, state) do + def transform_actions(actions, _callback, _handler_params, state) do actions = join_buffers(actions) {actions, state} end diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index f31378171..f9249e101 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -5,7 +5,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do alias Membrane.ActionError alias Membrane.Core alias Membrane.Core.{Parent, TimerController} - alias Membrane.Core.Parent.{ChildLifeController, LifecycleController} + alias Membrane.Core.Parent.LifecycleController alias Membrane.Core.Pipeline.State require Membrane.Logger From 2fa7aba8de8786eac5882289bdf8534a03835050 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 13 Feb 2024 12:14:11 +0100 Subject: [PATCH 18/22] Remove unnecessary warning --- lib/membrane/core/bin/action_handler.ex | 9 +-------- lib/membrane/core/pipeline/action_handler.ex | 9 +-------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/lib/membrane/core/bin/action_handler.ex b/lib/membrane/core/bin/action_handler.ex index b37011b8a..a4888c529 100644 --- a/lib/membrane/core/bin/action_handler.ex +++ b/lib/membrane/core/bin/action_handler.ex @@ -34,14 +34,7 @@ defmodule Membrane.Core.Bin.ActionHandler do end @impl CallbackHandler - def handle_action({:spec, spec}, cb, _params, state) do - if cb == :handle_spec_started do - Membrane.Logger.warning(""" - Action :spec was returned from handle_spec_started/3 callback. It is suggested not to do this, - because it might lead to infinite loof of handle_spec_started/3 executions. - """) - end - + def handle_action({:spec, spec}, _cb, _params, state) do Parent.ChildLifeController.handle_spec(spec, state) end diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index f9249e101..382a4f5a8 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -34,14 +34,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do end @impl CallbackHandler - def handle_action({:spec, spec}, cb, _params, state) do - if cb == :handle_spec_started do - Membrane.Logger.warning(""" - Action :spec was returned from handle_spec_started/3 callback. It is suggested not to do this, - because it might lead to infinite loof of handle_spec_started/3 executions. - """) - end - + def handle_action({:spec, spec}, _cb, _params, state) do Parent.ChildLifeController.handle_spec(spec, state) end From c498f75b0eeeda8fa36b64b73e57f1c3ebdbc757 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 13 Feb 2024 12:17:10 +0100 Subject: [PATCH 19/22] Bump version to 1.0.1 --- CHANGELOG.md | 1 + README.md | 2 +- mix.exs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56f16c2ac..8931e4963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Fix typespecs in Membrane.UtilitySupervisor. [#681](https://github.com/membraneframework/membrane_core/pull/681) * Improve callback return types and group actions types. [#702](https://github.com/membraneframework/membrane_core/pull/702) * Fix bug in the order of handling actions from callbacks. [#708](https://github.com/membraneframework/membrane_core/pull/708) + * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) ## 1.0.0 * Introduce `:remove_link` action in pipelines and bins. diff --git a/README.md b/README.md index a7d250fc5..60a1ee876 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ Apart from plugins, Membrane has stream formats, which live in `membrane_X_forma The API for creating pipelines (and custom elements too) is provided by [membrane_core](https://github.com/membraneframework/membrane_core). To install it, add the following line to your `deps` in `mix.exs` and run `mix deps.get` ```elixir -{:membrane_core, "~> 1.0.0"} +{:membrane_core, "~> 1.0"} ``` **Standalone libraries** diff --git a/mix.exs b/mix.exs index 4c96bd981..ac1df050b 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.0.0" + @version "1.0.1" @source_ref "v#{@version}" def project do From 9438d456811f16dfef4d6e51a45cb122df2078ec Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 13 Feb 2024 12:20:03 +0100 Subject: [PATCH 20/22] Remove leftovers --- lib/membrane/bin.ex | 3 --- lib/membrane/pipeline.ex | 3 --- 2 files changed, 6 deletions(-) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index 5d4937787..ca2983bb1 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -357,9 +357,6 @@ defmodule Membrane.Bin do {[], state} end - # @impl true - # def handle_spec_started(new_children, _ctx, state), do: {[], state} - @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index c39d6f7d0..9222c2b43 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -505,9 +505,6 @@ defmodule Membrane.Pipeline do {[], state} end - # @impl true - # def handle_spec_started(new_children, _ctx, state), do: {[], state} - @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} From eda3f64def6299beabbdb4fe9b4cfb4c47cc0068 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 13 Feb 2024 14:24:29 +0100 Subject: [PATCH 21/22] Fix docs --- lib/membrane/bin.ex | 5 +++-- lib/membrane/core/parent.ex | 3 +++ lib/membrane/pipeline.ex | 5 +++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index ca2983bb1..4eb430bde 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -167,9 +167,11 @@ defmodule Membrane.Bin do ) :: callback_return @doc """ + This callback is deprecated since v1.0.1. + Callback invoked when children of `Membrane.ChildrenSpec` are started. - By default, it does nothing. + It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens. """ @callback handle_spec_started( children :: [Child.name()], @@ -381,7 +383,6 @@ defmodule Membrane.Bin do handle_setup: 2, handle_playing: 2, handle_info: 3, - # handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/core/parent.ex b/lib/membrane/core/parent.ex index f0b6e3f36..a4059ee08 100644 --- a/lib/membrane/core/parent.ex +++ b/lib/membrane/core/parent.ex @@ -7,6 +7,7 @@ defmodule Membrane.Core.Parent do quote do @after_compile {__MODULE__, :__membrane_check_deprecated_functions__} + @spec __membrane_check_deprecated_functions__(Macro.Env.t(), binary) :: :ok def __membrane_check_deprecated_functions__(env, _bytecode) do modules_whitelist = [Membrane.Testing.Pipeline] @@ -19,6 +20,8 @@ defmodule Membrane.Core.Parent do IO.warn(warn_message, []) end + + :ok end end end diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 9222c2b43..99758ef19 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -204,9 +204,11 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ + This callback is deprecated since v1.0.1. + Callback invoked when children of `Membrane.ChildrenSpec` are started. - By default, it does nothing. + It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens. """ @callback handle_spec_started( children :: [Child.name()], @@ -528,7 +530,6 @@ defmodule Membrane.Pipeline do handle_setup: 2, handle_playing: 2, handle_info: 3, - # handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, From 2f785098f5eb91b528de06340ff2fbe13bf1cf70 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 14 Feb 2024 12:14:44 +0100 Subject: [PATCH 22/22] Implement suggestions from CR, bump version to 1.1.0-rc --- lib/membrane/bin.ex | 6 ++---- lib/membrane/core/parent.ex | 30 ++++++++++++------------------ lib/membrane/pipeline.ex | 6 ++---- mix.exs | 2 +- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index 4eb430bde..b2968073e 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -167,7 +167,7 @@ defmodule Membrane.Bin do ) :: callback_return @doc """ - This callback is deprecated since v1.0.1. + This callback is deprecated since v1.1.0-rc0 Callback invoked when children of `Membrane.ChildrenSpec` are started. @@ -311,9 +311,7 @@ defmodule Membrane.Bin do alias unquote(__MODULE__) @behaviour unquote(__MODULE__) @before_compile unquote(__MODULE__) - - require Membrane.Core.Parent - Membrane.Core.Parent.bring_after_compile_check() + @after_compile {Membrane.Core.Parent, :check_deprecated_callbacks} unquote(bring_spec) unquote(bring_pad) diff --git a/lib/membrane/core/parent.ex b/lib/membrane/core/parent.ex index a4059ee08..f4471b697 100644 --- a/lib/membrane/core/parent.ex +++ b/lib/membrane/core/parent.ex @@ -3,26 +3,20 @@ defmodule Membrane.Core.Parent do @type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Pipeline.State.t() - defmacro bring_after_compile_check() do - quote do - @after_compile {__MODULE__, :__membrane_check_deprecated_functions__} + @spec check_deprecated_callbacks(Macro.Env.t(), binary) :: :ok + def check_deprecated_callbacks(env, _bytecode) do + modules_whitelist = [Membrane.Testing.Pipeline] - @spec __membrane_check_deprecated_functions__(Macro.Env.t(), binary) :: :ok - def __membrane_check_deprecated_functions__(env, _bytecode) do - modules_whitelist = [Membrane.Testing.Pipeline] + if env.module not in modules_whitelist and + Module.defines?(env.module, {:handle_spec_started, 3}, :def) do + warn_message = """ + Callback handle_spec_started/3 has been deprecated since \ + :membrane_core v1.1.0-rc0, but it is implemented in #{inspect(env.module)} + """ - if env.module not in modules_whitelist and - Module.defines?(env.module, {:handle_spec_started, 3}, :def) do - warn_message = """ - Callback handle_spec_started/3 has been deprecated since \ - :membrane_core v1.0.1, but it is implemented in #{inspect(env.module)} - """ - - IO.warn(warn_message, []) - end - - :ok - end + IO.warn(warn_message, []) end + + :ok end end diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 99758ef19..123328700 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -204,7 +204,7 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ - This callback is deprecated since v1.0.1. + This callback is deprecated since v1.1.0-rc0. Callback invoked when children of `Membrane.ChildrenSpec` are started. @@ -461,9 +461,7 @@ defmodule Membrane.Pipeline do alias unquote(__MODULE__) require Membrane.Logger @behaviour unquote(__MODULE__) - - require Membrane.Core.Parent - Membrane.Core.Parent.bring_after_compile_check() + @after_compile {Membrane.Core.Parent, :check_deprecated_callbacks} unquote(bring_spec) unquote(bring_pad) diff --git a/mix.exs b/mix.exs index ac1df050b..7f7033497 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.0.1" + @version "1.1.0-rc0" @source_ref "v#{@version}" def project do