diff --git a/CHANGELOG.md b/CHANGELOG.md index ae762f4dd..17c52dbc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ # Changelog ## 1.1.2 - * Remove 'failed to insert a metric' stalker warning [#849](https://github.com/membraneframework/membrane_core/pull/849) + * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) + * Remove 'failed to insert a metric' stalker warning. [#849](https://github.com/membraneframework/membrane_core/pull/849) ## 1.1.1 * Fix 'table identifier does not refer to an existing ETS table' error when inserting metrics into the observability ETS. [#835](https://github.com/membraneframework/membrane_core/pull/835) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index 50bae12f9..eb3630ba0 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -201,6 +201,20 @@ defmodule Membrane.Bin do state ) :: callback_return + @doc """ + Callback invoked after a child terminates. + + Terminated child won't be present in the context of this callback. It is allowed to spawn a new + child with the same name. + + By default, it does nothing. + """ + @callback handle_child_terminated( + child :: Child.name(), + context :: CallbackContext.t(), + state + ) :: callback_return + @doc """ Callback invoked upon each timer tick. A timer can be started with `t:Membrane.Bin.Action.start_timer/0` action. @@ -248,7 +262,8 @@ defmodule Membrane.Bin do handle_tick: 3, handle_crash_group_down: 3, handle_terminate_request: 2, - handle_child_pad_removed: 4 + handle_child_pad_removed: 4, + handle_child_terminated: 3 @doc PadsSpecs.def_pad_docs(:input, :bin) defmacro def_input_pad(name, spec) do @@ -405,6 +420,9 @@ defmodule Membrane.Bin do @impl true def handle_terminate_request(_ctx, state), do: {[terminate: :normal], state} + @impl true + def handle_child_terminated(_child, _ctx, state), do: {[], state} + defoverridable handle_init: 2, handle_pad_added: 3, handle_pad_removed: 3, @@ -418,7 +436,8 @@ defmodule Membrane.Bin do handle_child_notification: 4, handle_parent_notification: 3, handle_crash_group_down: 3, - handle_terminate_request: 2 + handle_terminate_request: 2, + handle_child_terminated: 3 end end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index dd585bf97..c9936ba8d 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -724,10 +724,14 @@ defmodule Membrane.Core.Parent.ChildLifeController do CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state) |> ChildrenModel.delete_child(child_name) + state = exec_handle_child_terminated(child_name, state) + {:ok, state} else :error when reason == :normal -> - {:ok, ChildrenModel.delete_child(state, child_name)} + state = ChildrenModel.delete_child(state, child_name) + state = exec_handle_child_terminated(child_name, state) + {:ok, state} :error when reason == {:shutdown, :membrane_crash_group_kill} -> raise Membrane.PipelineError, @@ -771,4 +775,14 @@ defmodule Membrane.Core.Parent.ChildLifeController do state = %{state | pending_specs: Map.merge(state.pending_specs, related_specs)} related_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2) end + + defp exec_handle_child_terminated(child_name, state) do + CallbackHandler.exec_and_handle_callback( + :handle_child_terminated, + Component.action_handler(state), + %{context: &Component.context_from_state/1}, + [child_name], + state + ) + end end diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 8d32a709d..2ae71cb7f 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -242,6 +242,20 @@ defmodule Membrane.Pipeline do state ) :: {[Action.common_actions()], state()} + @doc """ + Callback invoked after a child terminates. + + Terminated child won't be present in the context of this callback. It is allowed to spawn a new child + with the same name. + + By default, it does nothing. + """ + @callback handle_child_terminated( + child :: Child.name(), + context :: CallbackContext.t(), + state + ) :: callback_return + @doc """ Callback invoked upon each timer tick. A timer can be started with `Membrane.Pipeline.Action.start_timer` action. @@ -291,7 +305,8 @@ defmodule Membrane.Pipeline do handle_crash_group_down: 3, handle_call: 3, handle_terminate_request: 2, - handle_child_pad_removed: 4 + handle_child_pad_removed: 4, + handle_child_terminated: 3 @doc """ Starts the pipeline based on the given module and links it to the current process. @@ -542,6 +557,9 @@ defmodule Membrane.Pipeline do @impl true def handle_child_setup_completed(_child, _ctx, state), do: {[], state} + @impl true + def handle_child_terminated(_child, _ctx, state), do: {[], state} + @impl true def handle_child_playing(_child, _ctx, state), do: {[], state} @@ -575,7 +593,8 @@ defmodule Membrane.Pipeline do handle_child_notification: 4, handle_crash_group_down: 3, handle_call: 3, - handle_terminate_request: 2 + handle_terminate_request: 2, + handle_child_terminated: 3 end end end diff --git a/lib/membrane/testing/assertions.ex b/lib/membrane/testing/assertions.ex index 496f2bf0f..4acd4ec17 100644 --- a/lib/membrane/testing/assertions.ex +++ b/lib/membrane/testing/assertions.ex @@ -143,7 +143,7 @@ defmodule Membrane.Testing.Assertions do end @doc """ - Refutes that a crash group within pipeline won't be down within the `timeout` period specified in + Asserts that a crash group within pipeline won't be down within the `timeout` period specified in milliseconds. Usage example: @@ -541,4 +541,57 @@ defmodule Membrane.Testing.Assertions do timeout ) end + + [:child_setup_completed, :child_playing, :child_terminated] + |> Enum.map(fn action -> + callback = :"handle_#{action}" + assertion = :"assert_#{action}" + refution = :"refute_#{action}" + + @doc """ + Asserts that `Membrane.Testing.Pipeline` executed or will execute callback `#{callback}/3` + for a specific child within the `timeout` period specified in milliseconds. + """ + defmacro unquote(assertion)(pipeline, child, timeout \\ @default_timeout) do + callback = unquote(callback) + + quote do + child_name_value = unquote(child) + + unquote( + assert_receive_from_pipeline( + pipeline, + {callback, + quote do + ^child_name_value + end}, + timeout + ) + ) + end + end + + @doc """ + Asserts that `Membrane.Testing.Pipeline` won't execute callback `#{callback}/3` for + a specific child within the `timeout` period specified in milliseconds. + """ + defmacro unquote(refution)(pipeline, child, timeout \\ @default_timeout) do + callback = unquote(callback) + + quote do + child_name_value = unquote(child) + + unquote( + refute_receive_from_pipeline( + pipeline, + {callback, + quote do + ^child_name_value + end}, + timeout + ) + ) + end + end + end) end diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index e74793d9c..1b25dd4c9 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -408,7 +408,11 @@ defmodule Membrane.Testing.Pipeline do {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)} end - [:handle_child_setup_completed, :handle_child_playing] + [ + :handle_child_setup_completed, + :handle_child_playing, + :handle_child_terminated + ] |> Enum.map(fn callback -> @impl true def unquote(callback)(child, ctx, %State{} = state) do @@ -419,6 +423,8 @@ defmodule Membrane.Testing.Pipeline do state ) + :ok = notify_test_process(state.test_process, {unquote(callback), child}) + {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)} end end) diff --git a/mix.exs b/mix.exs index b266feecd..c1fab8be5 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.1.1" + @version "1.1.2" @source_ref "v#{@version}" def project do diff --git a/test/membrane/integration/callbacks_test.exs b/test/membrane/integration/callbacks_test.exs new file mode 100644 index 000000000..bb6a14a35 --- /dev/null +++ b/test/membrane/integration/callbacks_test.exs @@ -0,0 +1,39 @@ +defmodule Membrane.Integration.CallbacksTest do + use ExUnit.Case, async: true + + import Membrane.Testing.Assertions + import Membrane.ChildrenSpec + + alias Membrane.Testing + + defmodule PadlessElement do + use Membrane.Endpoint + end + + defmodule PadlessElementPipeline do + use Membrane.Pipeline + alias Membrane.Integration.CallbacksTest.PadlessElement + + @impl true + def handle_child_terminated(child_name, ctx, state) do + assert not is_map_key(ctx.children, child_name) + {[spec: child(child_name, PadlessElement)], state} + end + end + + test "handle_child_terminated" do + pipeline = Testing.Pipeline.start_link_supervised!(module: PadlessElementPipeline) + + Testing.Pipeline.execute_actions(pipeline, spec: child(:element, PadlessElement)) + first_pid = Testing.Pipeline.get_child_pid!(pipeline, :element) + refute_child_terminated(pipeline, :element, 500) + + Testing.Pipeline.execute_actions(pipeline, remove_children: :element) + assert_child_terminated(pipeline, :element) + second_pid = Testing.Pipeline.get_child_pid!(pipeline, :element) + + assert first_pid != second_pid + + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/integration/defer_setup_test.exs b/test/membrane/integration/defer_setup_test.exs index f058cb013..cc6d1a206 100644 --- a/test/membrane/integration/defer_setup_test.exs +++ b/test/membrane/integration/defer_setup_test.exs @@ -112,7 +112,7 @@ defmodule Membrane.Integration.DeferSetupTest do assert_child_playing(pipeline, bin) end - assert_grandhild_playing(pipeline, :bin_1, :bin_a) + assert_grandchild_playing(pipeline, :bin_1, :bin_a) for bin <- [:bin_b, :bin_c] do refute_grandchild_playing(pipeline, :bin_1, bin) @@ -127,7 +127,7 @@ defmodule Membrane.Integration.DeferSetupTest do complete_grandchild_setup(pipeline, :bin_1, :bin_c) for bin <- [:bin_b, :bin_c] do - assert_grandhild_playing(pipeline, :bin_1, bin) + assert_grandchild_playing(pipeline, :bin_1, bin) end monitor_ref = Process.monitor(pipeline) @@ -145,18 +145,10 @@ defmodule Membrane.Integration.DeferSetupTest do Pipeline.execute_actions(pipeline, notify_child: {child, {:complete_setup, grandchild}}) end - defp assert_child_playing(pipeline, child) do - assert_pipeline_notified(pipeline, child, :handle_playing) - end - - defp assert_grandhild_playing(pipeline, child, grandchild) do + defp assert_grandchild_playing(pipeline, child, grandchild) do assert_pipeline_notified(pipeline, child, {^grandchild, :handle_playing}) end - defp refute_child_playing(pipeline, child) do - refute_pipeline_notified(pipeline, child, :handle_playing) - end - defp refute_grandchild_playing(pipeline, child, grandchild) do refute_pipeline_notified(pipeline, child, {^grandchild, :handle_playing}) end