Skip to content

Commit

Permalink
Merge branch 'master' into pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
bradhanks authored Feb 20, 2024
2 parents 36cbf75 + f112f94 commit 573031b
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681)
* Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702)
* Add `crash_reason` to `handle_crash_group_down/3` callback context in bins and pipelines. [#720](https://github.com/membraneframework/membrane_core/pull/720)

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ defmodule Membrane.Bin.CallbackContext do
Field `:start_of_stream_received?` is present only in
`c:Membrane.Bin.handle_element_end_of_stream/4`.
Fields `:members` and `:crash_initiator` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
Fields `:members`, `:crash_initiator` and `crash_reason` and are present only in
`c:Membrane.Bin.handle_crash_group_down/3`.
"""
@type t :: %{
:clock => Membrane.Clock.t(),
Expand All @@ -27,6 +27,7 @@ defmodule Membrane.Bin.CallbackContext do
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name(),
optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
optional(:start_of_stream_received?) => boolean()
}
end
6 changes: 5 additions & 1 deletion lib/membrane/core/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ defmodule Membrane.Core.Bin.CallbackContext do

@type optional_fields ::
[pad_options: map()]
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
| [
members: [Membrane.Child.name()],
crash_initiator: Membrane.Child.name(),
crash_reason: :normal | :shutdown | {:shutdown, term()} | term()
]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
end
end

def handle_crash_group_member_death(child_name, %CrashGroup{} = group, _reason, state) do
def handle_crash_group_member_death(child_name, %CrashGroup{} = group, crash_reason, state) do
state =
if group.detonating? do
state
else
detonate_crash_group(child_name, group, state)
detonate_crash_group(child_name, group, crash_reason, state)
end

all_members_dead? =
Expand All @@ -72,7 +72,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
end
end

defp detonate_crash_group(crash_initiator, %CrashGroup{} = group, state) do
defp detonate_crash_group(crash_initiator, %CrashGroup{} = group, crash_reason, state) do
state = ChildLifeController.remove_children_from_specs(group.members, state)
state = LinkUtils.unlink_crash_group(group, state)

Expand All @@ -88,7 +88,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
&%CrashGroup{
&1
| detonating?: true,
crash_initiator: crash_initiator
crash_initiator: crash_initiator,
crash_reason: crash_reason
}
)
end
Expand All @@ -108,7 +109,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
context_generator =
&Component.context_from_state(&1,
members: crash_group.members,
crash_initiator: crash_group.crash_initiator
crash_initiator: crash_group.crash_initiator,
crash_reason: crash_group.crash_reason
)

CallbackHandler.exec_and_handle_callback(
Expand Down
7 changes: 5 additions & 2 deletions lib/membrane/core/parent/crash_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.CrashGroup do
# * name - name that identifies the group
# * type - responsible for restart policy of members of groups
# * members - list of members of group
# * reason - reason of the crash

use Bunch.Access

Expand All @@ -15,9 +16,11 @@ defmodule Membrane.Core.Parent.CrashGroup do
mode: :temporary,
members: [Membrane.Child.name()],
detonating?: boolean(),
crash_initiator: Membrane.Child.name()
crash_initiator: Membrane.Child.name(),
crash_reason: :normal | :shutdown | {:shutdown, term()} | term()
}

@enforce_keys [:name, :mode]
defstruct @enforce_keys ++ [members: [], detonating?: false, crash_initiator: nil]
defstruct @enforce_keys ++
[members: [], detonating?: false, crash_initiator: nil, crash_reason: nil]
end
6 changes: 5 additions & 1 deletion lib/membrane/core/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ defmodule Membrane.Core.Pipeline.CallbackContext do

@type optional_fields ::
[from: GenServer.from()]
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
| [
members: [Membrane.Child.name()],
crash_initiator: Membrane.Child.name(),
crash_reason: :normal | :shutdown | {:shutdown, term()} | term()
]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Pipeline.State.t(), optional_fields()) ::
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Membrane.Pipeline.CallbackContext do
Field `:start_of_stream_received?` is present only in
`c:Membrane.Pipeline.handle_element_end_of_stream/4`.
Fields `:members` and `:crash_initiator` are present only in
Fields `:members`, `:crash_initiator` and `:crash_reason` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
"""
@type t :: %{
Expand All @@ -23,6 +23,7 @@ defmodule Membrane.Pipeline.CallbackContext do
optional(:from) => [GenServer.from()],
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name(),
optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
optional(:start_of_stream_received?) => boolean()
}
end
17 changes: 17 additions & 0 deletions test/membrane/integration/child_crash_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ defmodule Membrane.Integration.ChildCrashTest do
assert_pipeline_crash_group_down(pipeline_pid, 1)
end

test "Pipeline receives correct crash reason" do
pipeline_pid = Testing.Pipeline.start_supervised!(module: ChildCrashTest.Pipeline)
ChildCrashTest.Pipeline.add_path(pipeline_pid, [], :source, 1, :group_1)

# time for pipeline to start :source
Process.sleep(100)

ChildCrashTest.Pipeline.inform_about_details_in_case_of_crash(pipeline_pid)

Testing.Pipeline.get_child_pid!(pipeline_pid, :source)
|> Process.exit(:custom_crash_reason)

assert_receive {:crash, crash_reason: :custom_crash_reason}

Testing.Pipeline.terminate(pipeline_pid)
end

test "Crash group consisting of bin crashes" do
Process.flag(:trap_exit, true)

Expand Down
23 changes: 22 additions & 1 deletion test/support/child_crash_test/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,30 @@ defmodule Membrane.Support.ChildCrashTest.Pipeline do
child(:center_filter, Filter)
|> child(:sink, Testing.Sink)

{[spec: spec], %{}}
{[spec: spec], %{send_to: nil}}
end

@impl true
def handle_info({:create_path, spec}, _ctx, state) do
{[spec: spec], state}
end

@impl true
def handle_info({:inform_about_crash, send_to}, _ctx, state) do
{[], %{state | send_to: send_to}}
end

@impl true
def handle_crash_group_down(_group_name, _ctx, %{send_to: nil} = state) do
{[], state}
end

@impl true
def handle_crash_group_down(_group_name, %{crash_reason: crash_reason}, %{send_to: pid} = state) do
send(pid, {:crash, crash_reason: crash_reason})
{[], state}
end

@spec add_single_source(pid(), any(), any(), any()) :: any()
def add_single_source(pid, source_name, group \\ nil, source \\ Testing.Source) do
spec = child(source_name, source) |> get_child(:center_filter)
Expand Down Expand Up @@ -92,4 +108,9 @@ defmodule Membrane.Support.ChildCrashTest.Pipeline do

send(pid, {:create_path, spec})
end

@spec inform_about_details_in_case_of_crash(pid()) :: any()
def inform_about_details_in_case_of_crash(pid) do
send(pid, {:inform_about_crash, self()})
end
end

0 comments on commit 573031b

Please sign in to comment.