Skip to content

Commit 0f3e838

Browse files
authored
handle_child_terminated (#894)
* Implement handle_child_terminated, bump version to 1.1.2
1 parent a9b052d commit 0f3e838

File tree

9 files changed

+163
-20
lines changed

9 files changed

+163
-20
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Changelog
22

33
## 1.1.2
4-
* Remove 'failed to insert a metric' stalker warning [#849](https://github.com/membraneframework/membrane_core/pull/849)
4+
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
5+
* Remove 'failed to insert a metric' stalker warning. [#849](https://github.com/membraneframework/membrane_core/pull/849)
56

67
## 1.1.1
78
* Fix 'table identifier does not refer to an existing ETS table' error when inserting metrics into the observability ETS. [#835](https://github.com/membraneframework/membrane_core/pull/835)

lib/membrane/bin.ex

+21-2
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,20 @@ defmodule Membrane.Bin do
201201
state
202202
) :: callback_return
203203

204+
@doc """
205+
Callback invoked after a child terminates.
206+
207+
Terminated child won't be present in the context of this callback. It is allowed to spawn a new
208+
child with the same name.
209+
210+
By default, it does nothing.
211+
"""
212+
@callback handle_child_terminated(
213+
child :: Child.name(),
214+
context :: CallbackContext.t(),
215+
state
216+
) :: callback_return
217+
204218
@doc """
205219
Callback invoked upon each timer tick. A timer can be started with `t:Membrane.Bin.Action.start_timer/0`
206220
action.
@@ -248,7 +262,8 @@ defmodule Membrane.Bin do
248262
handle_tick: 3,
249263
handle_crash_group_down: 3,
250264
handle_terminate_request: 2,
251-
handle_child_pad_removed: 4
265+
handle_child_pad_removed: 4,
266+
handle_child_terminated: 3
252267

253268
@doc PadsSpecs.def_pad_docs(:input, :bin)
254269
defmacro def_input_pad(name, spec) do
@@ -405,6 +420,9 @@ defmodule Membrane.Bin do
405420
@impl true
406421
def handle_terminate_request(_ctx, state), do: {[terminate: :normal], state}
407422

423+
@impl true
424+
def handle_child_terminated(_child, _ctx, state), do: {[], state}
425+
408426
defoverridable handle_init: 2,
409427
handle_pad_added: 3,
410428
handle_pad_removed: 3,
@@ -418,7 +436,8 @@ defmodule Membrane.Bin do
418436
handle_child_notification: 4,
419437
handle_parent_notification: 3,
420438
handle_crash_group_down: 3,
421-
handle_terminate_request: 2
439+
handle_terminate_request: 2,
440+
handle_child_terminated: 3
422441
end
423442
end
424443

lib/membrane/core/parent/child_life_controller.ex

+15-1
Original file line numberDiff line numberDiff line change
@@ -724,10 +724,14 @@ defmodule Membrane.Core.Parent.ChildLifeController do
724724
CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state)
725725
|> ChildrenModel.delete_child(child_name)
726726

727+
state = exec_handle_child_terminated(child_name, state)
728+
727729
{:ok, state}
728730
else
729731
:error when reason == :normal ->
730-
{:ok, ChildrenModel.delete_child(state, child_name)}
732+
state = ChildrenModel.delete_child(state, child_name)
733+
state = exec_handle_child_terminated(child_name, state)
734+
{:ok, state}
731735

732736
:error when reason == {:shutdown, :membrane_crash_group_kill} ->
733737
raise Membrane.PipelineError,
@@ -771,4 +775,14 @@ defmodule Membrane.Core.Parent.ChildLifeController do
771775
state = %{state | pending_specs: Map.merge(state.pending_specs, related_specs)}
772776
related_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2)
773777
end
778+
779+
defp exec_handle_child_terminated(child_name, state) do
780+
CallbackHandler.exec_and_handle_callback(
781+
:handle_child_terminated,
782+
Component.action_handler(state),
783+
%{context: &Component.context_from_state/1},
784+
[child_name],
785+
state
786+
)
787+
end
774788
end

lib/membrane/pipeline.ex

+21-2
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,20 @@ defmodule Membrane.Pipeline do
242242
state
243243
) :: {[Action.common_actions()], state()}
244244

245+
@doc """
246+
Callback invoked after a child terminates.
247+
248+
Terminated child won't be present in the context of this callback. It is allowed to spawn a new child
249+
with the same name.
250+
251+
By default, it does nothing.
252+
"""
253+
@callback handle_child_terminated(
254+
child :: Child.name(),
255+
context :: CallbackContext.t(),
256+
state
257+
) :: callback_return
258+
245259
@doc """
246260
Callback invoked upon each timer tick. A timer can be started with `Membrane.Pipeline.Action.start_timer`
247261
action.
@@ -291,7 +305,8 @@ defmodule Membrane.Pipeline do
291305
handle_crash_group_down: 3,
292306
handle_call: 3,
293307
handle_terminate_request: 2,
294-
handle_child_pad_removed: 4
308+
handle_child_pad_removed: 4,
309+
handle_child_terminated: 3
295310

296311
@doc """
297312
Starts the pipeline based on the given module and links it to the current process.
@@ -542,6 +557,9 @@ defmodule Membrane.Pipeline do
542557
@impl true
543558
def handle_child_setup_completed(_child, _ctx, state), do: {[], state}
544559

560+
@impl true
561+
def handle_child_terminated(_child, _ctx, state), do: {[], state}
562+
545563
@impl true
546564
def handle_child_playing(_child, _ctx, state), do: {[], state}
547565

@@ -575,7 +593,8 @@ defmodule Membrane.Pipeline do
575593
handle_child_notification: 4,
576594
handle_crash_group_down: 3,
577595
handle_call: 3,
578-
handle_terminate_request: 2
596+
handle_terminate_request: 2,
597+
handle_child_terminated: 3
579598
end
580599
end
581600
end

lib/membrane/testing/assertions.ex

+54-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ defmodule Membrane.Testing.Assertions do
143143
end
144144

145145
@doc """
146-
Refutes that a crash group within pipeline won't be down within the `timeout` period specified in
146+
Asserts that a crash group within pipeline won't be down within the `timeout` period specified in
147147
milliseconds.
148148
149149
Usage example:
@@ -541,4 +541,57 @@ defmodule Membrane.Testing.Assertions do
541541
timeout
542542
)
543543
end
544+
545+
[:child_setup_completed, :child_playing, :child_terminated]
546+
|> Enum.map(fn action ->
547+
callback = :"handle_#{action}"
548+
assertion = :"assert_#{action}"
549+
refution = :"refute_#{action}"
550+
551+
@doc """
552+
Asserts that `Membrane.Testing.Pipeline` executed or will execute callback `#{callback}/3`
553+
for a specific child within the `timeout` period specified in milliseconds.
554+
"""
555+
defmacro unquote(assertion)(pipeline, child, timeout \\ @default_timeout) do
556+
callback = unquote(callback)
557+
558+
quote do
559+
child_name_value = unquote(child)
560+
561+
unquote(
562+
assert_receive_from_pipeline(
563+
pipeline,
564+
{callback,
565+
quote do
566+
^child_name_value
567+
end},
568+
timeout
569+
)
570+
)
571+
end
572+
end
573+
574+
@doc """
575+
Asserts that `Membrane.Testing.Pipeline` won't execute callback `#{callback}/3` for
576+
a specific child within the `timeout` period specified in milliseconds.
577+
"""
578+
defmacro unquote(refution)(pipeline, child, timeout \\ @default_timeout) do
579+
callback = unquote(callback)
580+
581+
quote do
582+
child_name_value = unquote(child)
583+
584+
unquote(
585+
refute_receive_from_pipeline(
586+
pipeline,
587+
{callback,
588+
quote do
589+
^child_name_value
590+
end},
591+
timeout
592+
)
593+
)
594+
end
595+
end
596+
end)
544597
end

lib/membrane/testing/pipeline.ex

+7-1
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,11 @@ defmodule Membrane.Testing.Pipeline do
408408
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
409409
end
410410

411-
[:handle_child_setup_completed, :handle_child_playing]
411+
[
412+
:handle_child_setup_completed,
413+
:handle_child_playing,
414+
:handle_child_terminated
415+
]
412416
|> Enum.map(fn callback ->
413417
@impl true
414418
def unquote(callback)(child, ctx, %State{} = state) do
@@ -419,6 +423,8 @@ defmodule Membrane.Testing.Pipeline do
419423
state
420424
)
421425

426+
:ok = notify_test_process(state.test_process, {unquote(callback), child})
427+
422428
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
423429
end
424430
end)

mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Membrane.Mixfile do
22
use Mix.Project
33

4-
@version "1.1.1"
4+
@version "1.1.2"
55
@source_ref "v#{@version}"
66

77
def project do
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
defmodule Membrane.Integration.CallbacksTest do
2+
use ExUnit.Case, async: true
3+
4+
import Membrane.Testing.Assertions
5+
import Membrane.ChildrenSpec
6+
7+
alias Membrane.Testing
8+
9+
defmodule PadlessElement do
10+
use Membrane.Endpoint
11+
end
12+
13+
defmodule PadlessElementPipeline do
14+
use Membrane.Pipeline
15+
alias Membrane.Integration.CallbacksTest.PadlessElement
16+
17+
@impl true
18+
def handle_child_terminated(child_name, ctx, state) do
19+
assert not is_map_key(ctx.children, child_name)
20+
{[spec: child(child_name, PadlessElement)], state}
21+
end
22+
end
23+
24+
test "handle_child_terminated" do
25+
pipeline = Testing.Pipeline.start_link_supervised!(module: PadlessElementPipeline)
26+
27+
Testing.Pipeline.execute_actions(pipeline, spec: child(:element, PadlessElement))
28+
first_pid = Testing.Pipeline.get_child_pid!(pipeline, :element)
29+
refute_child_terminated(pipeline, :element, 500)
30+
31+
Testing.Pipeline.execute_actions(pipeline, remove_children: :element)
32+
assert_child_terminated(pipeline, :element)
33+
second_pid = Testing.Pipeline.get_child_pid!(pipeline, :element)
34+
35+
assert first_pid != second_pid
36+
37+
Testing.Pipeline.terminate(pipeline)
38+
end
39+
end

test/membrane/integration/defer_setup_test.exs

+3-11
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ defmodule Membrane.Integration.DeferSetupTest do
112112
assert_child_playing(pipeline, bin)
113113
end
114114

115-
assert_grandhild_playing(pipeline, :bin_1, :bin_a)
115+
assert_grandchild_playing(pipeline, :bin_1, :bin_a)
116116

117117
for bin <- [:bin_b, :bin_c] do
118118
refute_grandchild_playing(pipeline, :bin_1, bin)
@@ -127,7 +127,7 @@ defmodule Membrane.Integration.DeferSetupTest do
127127
complete_grandchild_setup(pipeline, :bin_1, :bin_c)
128128

129129
for bin <- [:bin_b, :bin_c] do
130-
assert_grandhild_playing(pipeline, :bin_1, bin)
130+
assert_grandchild_playing(pipeline, :bin_1, bin)
131131
end
132132

133133
monitor_ref = Process.monitor(pipeline)
@@ -145,18 +145,10 @@ defmodule Membrane.Integration.DeferSetupTest do
145145
Pipeline.execute_actions(pipeline, notify_child: {child, {:complete_setup, grandchild}})
146146
end
147147

148-
defp assert_child_playing(pipeline, child) do
149-
assert_pipeline_notified(pipeline, child, :handle_playing)
150-
end
151-
152-
defp assert_grandhild_playing(pipeline, child, grandchild) do
148+
defp assert_grandchild_playing(pipeline, child, grandchild) do
153149
assert_pipeline_notified(pipeline, child, {^grandchild, :handle_playing})
154150
end
155151

156-
defp refute_child_playing(pipeline, child) do
157-
refute_pipeline_notified(pipeline, child, :handle_playing)
158-
end
159-
160152
defp refute_grandchild_playing(pipeline, child, grandchild) do
161153
refute_pipeline_notified(pipeline, child, {^grandchild, :handle_playing})
162154
end

0 commit comments

Comments
 (0)