Skip to content

Commit 75748ef

Browse files
authored
Merge branch 'master' into resource-guard-test
2 parents 0d1d460 + 2ed8f4d commit 75748ef

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1155
-403
lines changed

.github/actions/add_pr_to_smackore_board/action.yml

+15-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: 'Add PR to Smackore board, if author is from community'
2-
description: 'Adds PR to "New issues by community" column in Smackore project board, if PR author is from outside Membrane Team.'
2+
description: '(disabled due to github-side bug) Adds PR to "New issues by community" column in Smackore project board, if PR author is from outside Membrane Team.'
33
inputs:
44
GITHUB_TOKEN:
55
description: 'GitHub token'
@@ -19,21 +19,23 @@ runs:
1919
repository: membraneframework/membrane_core
2020
- name: Maybe add PR to board and set ticket status
2121
run: |
22-
export PROJECT_NUMBER=19
23-
export PROJECT_ID=PVT_kwDOAYE_z84AWEIB
24-
export STATUS_FIELD_ID=PVTSSF_lADOAYE_z84AWEIBzgOGd1k
25-
export TARGET_COLUMN_ID=e6b1ee10
22+
# currently this causes github action crash, more info here: https://github.com/membraneframework/membrane_core/issues/749
23+
24+
# export PROJECT_NUMBER=19
25+
# export PROJECT_ID=PVT_kwDOAYE_z84AWEIB
26+
# export STATUS_FIELD_ID=PVTSSF_lADOAYE_z84AWEIBzgOGd1k
27+
# export TARGET_COLUMN_ID=e6b1ee10
2628
27-
export AUTHOR_ORIGIN=$(gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" /orgs/membraneframework/teams/membraneteam/members | python scripts/python/get_author_origin.py $AUTHOR_LOGIN)
29+
# export AUTHOR_ORIGIN=$(gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" /orgs/membraneframework/teams/membraneteam/members | python scripts/python/get_author_origin.py $AUTHOR_LOGIN)
2830
29-
if [ "$AUTHOR_ORIGIN" == "COMMUNITY" ]
30-
then
31-
gh pr edit "$PR_URL" --add-project Smackore
32-
sleep 10
31+
# if [ "$AUTHOR_ORIGIN" == "COMMUNITY" ]
32+
# then
33+
# gh pr edit "$PR_URL" --add-project Smackore
34+
# sleep 10
3335
34-
export TICKET_ID=$(gh project item-list $PROJECT_NUMBER --owner membraneframework --format json --limit 10000000 | python scripts/python/get_ticket_id.py "$PR_URL")
35-
gh project item-edit --id $TICKET_ID --field-id $STATUS_FIELD_ID --project-id $PROJECT_ID --single-select-option-id $TARGET_COLUMN_ID
36-
fi
36+
# export TICKET_ID=$(gh project item-list $PROJECT_NUMBER --owner membraneframework --format json --limit 10000000 | python scripts/python/get_ticket_id.py "$PR_URL")
37+
# gh project item-edit --id $TICKET_ID --field-id $STATUS_FIELD_ID --project-id $PROJECT_ID --single-select-option-id $TARGET_COLUMN_ID
38+
# fi
3739
3840
env:
3941
GH_TOKEN: ${{ inputs.GITHUB_TOKEN }}

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
# Changelog
22

3+
## 1.1.0-rc0
4+
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
5+
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
6+
37
## 1.0.1
48
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
59
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)
610
* Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680)
711
* Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681)
812
* Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702)
13+
* Add `crash_reason` to `handle_crash_group_down/3` callback context in bins and pipelines. [#720](https://github.com/membraneframework/membrane_core/pull/720)
914

1015
## 1.0.0
1116
* Introduce `:remove_link` action in pipelines and bins.

README.md

+1-5
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,6 @@ Membrane.Pipeline.start_link(MyPipeline, mp3_url)
5353
```
5454

5555
This is an [Elixir](elixir-lang.org) snippet, that streams an mp3 via HTTP and plays it on your speaker. Here's how to run it:
56-
- Install [libmad](https://github.com/markjeee/libmad) and [portaudio](https://github.com/PortAudio/portaudio). Membrane uses these libs to decode the mp3 and to access your speaker, respectively. You can use these commands:
57-
- On Mac OS: `brew install libmad portaudio pkg-config`
58-
- On Debian: `apt install libmad0-dev portaudio19-dev`
59-
6056
- Option 1: Click the button below:
6157

6258
[![Run in Livebook](https://livebook.dev/badge/v1/blue.svg)](https://livebook.dev/run?url=https%3A%2F%2Fgithub.com%2Fmembraneframework%2Fmembrane_core%2Fblob%2Fmaster%2Fexample.livemd)
@@ -96,7 +92,7 @@ Apart from plugins, Membrane has stream formats, which live in `membrane_X_forma
9692
The API for creating pipelines (and custom elements too) is provided by [membrane_core](https://github.com/membraneframework/membrane_core). To install it, add the following line to your `deps` in `mix.exs` and run `mix deps.get`
9793

9894
```elixir
99-
{:membrane_core, "~> 1.0.0"}
95+
{:membrane_core, "~> 1.0"}
10096
```
10197

10298
**Standalone libraries**

lib/membrane/bin.ex

+4-5
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,11 @@ defmodule Membrane.Bin do
167167
) :: callback_return
168168

169169
@doc """
170+
This callback is deprecated since v1.1.0-rc0
171+
170172
Callback invoked when children of `Membrane.ChildrenSpec` are started.
171173
172-
By default, it does nothing.
174+
It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens.
173175
"""
174176
@callback handle_spec_started(
175177
children :: [Child.name()],
@@ -309,6 +311,7 @@ defmodule Membrane.Bin do
309311
alias unquote(__MODULE__)
310312
@behaviour unquote(__MODULE__)
311313
@before_compile unquote(__MODULE__)
314+
@after_compile {Membrane.Core.Parent, :check_deprecated_callbacks}
312315

313316
unquote(bring_spec)
314317
unquote(bring_pad)
@@ -354,9 +357,6 @@ defmodule Membrane.Bin do
354357
{[], state}
355358
end
356359

357-
@impl true
358-
def handle_spec_started(new_children, _ctx, state), do: {[], state}
359-
360360
@impl true
361361
def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state}
362362

@@ -381,7 +381,6 @@ defmodule Membrane.Bin do
381381
handle_setup: 2,
382382
handle_playing: 2,
383383
handle_info: 3,
384-
handle_spec_started: 3,
385384
handle_element_start_of_stream: 4,
386385
handle_element_end_of_stream: 4,
387386
handle_child_notification: 4,

lib/membrane/bin/callback_context.ex

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ defmodule Membrane.Bin.CallbackContext do
1212
Field `:start_of_stream_received?` is present only in
1313
`c:Membrane.Bin.handle_element_end_of_stream/4`.
1414
15-
Fields `:members` and `:crash_initiator` are present only in
16-
`c:Membrane.Pipeline.handle_crash_group_down/3`.
15+
Fields `:members`, `:crash_initiator` and `crash_reason` and are present only in
16+
`c:Membrane.Bin.handle_crash_group_down/3`.
1717
"""
1818
@type t :: %{
1919
:clock => Membrane.Clock.t(),
@@ -27,6 +27,7 @@ defmodule Membrane.Bin.CallbackContext do
2727
optional(:pad_options) => map(),
2828
optional(:members) => [Membrane.Child.name()],
2929
optional(:crash_initiator) => Membrane.Child.name(),
30+
optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
3031
optional(:start_of_stream_received?) => boolean()
3132
}
3233
end

lib/membrane/core/bin/callback_context.ex

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ defmodule Membrane.Core.Bin.CallbackContext do
33

44
@type optional_fields ::
55
[pad_options: map()]
6-
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
6+
| [
7+
members: [Membrane.Child.name()],
8+
crash_initiator: Membrane.Child.name(),
9+
crash_reason: :normal | :shutdown | {:shutdown, term()} | term()
10+
]
711
| [start_of_stream_received?: boolean()]
812

913
@spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) ::

lib/membrane/core/callback_handler.ex

+13
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Membrane.Core.CallbackHandler do
88
use Bunch
99

1010
alias Membrane.CallbackError
11+
alias Membrane.Core.Component
1112

1213
require Membrane.Logger
1314

@@ -191,6 +192,13 @@ defmodule Membrane.Core.CallbackHandler do
191192
was_handling_action? = state.handling_action?
192193
state = %{state | handling_action?: true}
193194

195+
# Updating :supplying_demand? flag value here is a temporal fix.
196+
# Setting it to `true` while handling actions causes postponing calls
197+
# of handle_redemand/2 and supply_demand/2 until a moment, when all
198+
# actions returned from the callback are handled
199+
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
200+
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state
201+
194202
state =
195203
Enum.reduce(actions, state, fn action, state ->
196204
try do
@@ -210,6 +218,11 @@ defmodule Membrane.Core.CallbackHandler do
210218
do: state,
211219
else: %{state | handling_action?: false}
212220

221+
state =
222+
if Component.is_element?(state) and not was_supplying_demand?,
223+
do: %{state | supplying_demand?: false},
224+
else: state
225+
213226
handler_module.handle_end_of_actions(state)
214227
end
215228
end

lib/membrane/core/child/pad_model.ex

-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ defmodule Membrane.Core.Child.PadModel do
4242
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
4343
options: %{optional(atom) => any},
4444
auto_demand_size: pos_integer() | nil,
45-
associated_pads: [Pad.ref()] | nil,
4645
sticky_events: [Membrane.Event.t()],
4746
stalker_metrics: %{atom => :atomics.atomics_ref()}
4847
}

lib/membrane/core/element.ex

+5-14
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,12 @@ defmodule Membrane.Core.Element do
158158
setup_incomplete?: false,
159159
effective_flow_control: :push,
160160
handling_action?: false,
161+
popping_auto_flow_queue?: false,
161162
pads_to_snapshot: MapSet.new(),
162-
stalker: options.stalker
163+
stalker: options.stalker,
164+
satisfied_auto_output_pads: MapSet.new(),
165+
awaiting_auto_input_pads: MapSet.new(),
166+
auto_input_pads: []
163167
}
164168
|> PadSpecHandler.init_pads()
165169

@@ -256,20 +260,7 @@ defmodule Membrane.Core.Element do
256260
end
257261

258262
defp do_handle_info(Message.new(:timer_tick, timer_id), state) do
259-
# Guarding the `TimerController.handle_tick/2` invocation is
260-
# required since there might be a case in which `handle_tick`
261-
# callback's implementation returns demand action.
262-
# In this scenario, without this guard, there would a possibility that
263-
# the `handle_buffer` would be called immediately, returning
264-
# some action that would affect the timer and the original state
265-
# of the timer, set with actions returned from `handle_tick`,
266-
# would be overwritten with that action.
267-
#
268-
# For more information see: https://github.com/membraneframework/membrane_core/issues/670
269-
state = %{state | supplying_demand?: true}
270263
state = TimerController.handle_tick(timer_id, state)
271-
state = %{state | supplying_demand?: false}
272-
state = Membrane.Core.Element.DemandHandler.handle_delayed_demands(state)
273264
{:noreply, state}
274265
end
275266

lib/membrane/core/element/action_handler.ex

+42-35
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ defmodule Membrane.Core.Element.ActionHandler do
2424
alias Membrane.Core.Element.{
2525
DemandController,
2626
DemandHandler,
27-
PadController,
2827
State,
2928
StreamFormatController
3029
}
3130

31+
alias Membrane.Core.Element.DemandController.AutoFlowUtils
3232
alias Membrane.Core.{Events, TimerController}
3333
alias Membrane.Element.Action
3434

@@ -38,22 +38,47 @@ defmodule Membrane.Core.Element.ActionHandler do
3838
require Membrane.Logger
3939

4040
@impl CallbackHandler
41-
def transform_actions(actions, callback, _handler_params, state) do
41+
def transform_actions(actions, _callback, _handler_params, state) do
4242
actions = join_buffers(actions)
43-
ensure_nothing_after_redemand(actions, callback, state)
4443
{actions, state}
4544
end
4645

4746
defguardp is_demand_size(size) when is_integer(size) or is_function(size)
4847

4948
@impl CallbackHandler
50-
def handle_end_of_actions(state) when not state.handling_action? do
51-
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
52-
|> Map.put(:pads_to_snapshot, MapSet.new())
49+
def handle_end_of_actions(state) do
50+
# Fixed order of handling demand of manual and auto pads would lead to
51+
# favoring manual pads over auto pads (or vice versa), especially after
52+
# introducting auto flow queues.
53+
manual_demands_first? = Enum.random([1, 2]) == 1
54+
55+
state =
56+
if manual_demands_first?,
57+
do: maybe_handle_delayed_demands(state),
58+
else: state
59+
60+
state = maybe_handle_pads_to_snapshot(state)
61+
62+
state =
63+
if manual_demands_first?,
64+
do: state,
65+
else: maybe_handle_delayed_demands(state)
66+
67+
state
5368
end
5469

55-
@impl CallbackHandler
56-
def handle_end_of_actions(state), do: state
70+
defp maybe_handle_delayed_demands(state) do
71+
with %{supplying_demand?: false} <- state do
72+
DemandHandler.handle_delayed_demands(state)
73+
end
74+
end
75+
76+
defp maybe_handle_pads_to_snapshot(state) do
77+
with %{handling_action?: false} <- state do
78+
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
79+
|> Map.put(:pads_to_snapshot, MapSet.new())
80+
end
81+
end
5782

5883
@impl CallbackHandler
5984
def handle_action({action, _}, :handle_init, _params, _state)
@@ -176,7 +201,11 @@ defmodule Membrane.Core.Element.ActionHandler do
176201
_other -> :output
177202
end
178203

179-
pads = state |> PadModel.filter_data(%{direction: dir}) |> Map.keys()
204+
pads =
205+
Enum.flat_map(state.pads_data, fn
206+
{pad_ref, %{direction: ^dir}} -> [pad_ref]
207+
_pad_entry -> []
208+
end)
180209

181210
Enum.reduce(pads, state, fn pad, state ->
182211
action =
@@ -280,30 +309,6 @@ defmodule Membrane.Core.Element.ActionHandler do
280309
)
281310
end
282311

283-
defp ensure_nothing_after_redemand(actions, callback, state) do
284-
{redemands, actions_after_redemands} =
285-
actions
286-
|> Enum.drop_while(fn
287-
{:redemand, _args} -> false
288-
_other_action -> true
289-
end)
290-
|> Enum.split_while(fn
291-
{:redemand, _args} -> true
292-
_other_action -> false
293-
end)
294-
295-
case {redemands, actions_after_redemands} do
296-
{_redemands, []} ->
297-
:ok
298-
299-
{[redemand | _redemands], _actions_after_redemands} ->
300-
raise ActionError,
301-
reason: :actions_after_redemand,
302-
action: redemand,
303-
callback: {state.module, callback}
304-
end
305-
end
306-
307312
@spec send_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
308313
defp send_buffer(_pad_ref, [], state) do
309314
state
@@ -466,8 +471,10 @@ defmodule Membrane.Core.Element.ActionHandler do
466471
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
467472
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
468473
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
469-
state = PadController.remove_pad_associations(pad_ref, state)
470-
PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
474+
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
475+
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
476+
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
477+
|> AutoFlowUtils.pop_queues_and_bump_demand()
471478
else
472479
%{direction: :input} ->
473480
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref

lib/membrane/core/element/atomic_demand.ex

+13-10
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do
139139
:ok
140140
end
141141

142-
@spec decrease(t, non_neg_integer()) :: t
142+
@spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t}
143143
def decrease(%__MODULE__{} = atomic_demand, value) do
144144
atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value))
145145

146146
if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do
147147
flush_buffered_decrementation(atomic_demand)
148148
else
149-
atomic_demand
149+
{:unchanged, atomic_demand}
150150
end
151151
end
152152

@@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do
164164

165165
atomic_demand = %{atomic_demand | buffered_decrementation: 0}
166166

167-
if not atomic_demand.toilet_overflowed? and
168-
get_receiver_status(atomic_demand) == {:resolved, :pull} and
169-
get_sender_status(atomic_demand) == {:resolved, :push} and
170-
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
171-
overflow(atomic_demand, atomic_demand_value)
172-
else
173-
atomic_demand
174-
end
167+
atomic_demand =
168+
if not atomic_demand.toilet_overflowed? and
169+
get_receiver_status(atomic_demand) == {:resolved, :pull} and
170+
get_sender_status(atomic_demand) == {:resolved, :push} and
171+
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
172+
overflow(atomic_demand, atomic_demand_value)
173+
else
174+
atomic_demand
175+
end
176+
177+
{{:decreased, atomic_demand_value}, atomic_demand}
175178
end
176179

177180
defp overflow(atomic_demand, atomic_demand_value) do

0 commit comments

Comments
 (0)