Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify pad codec #114

Merged
merged 11 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep
```elixir
defp deps do
[
{:membrane_mp4_plugin, "~> 0.35.0"}
{:membrane_mp4_plugin, "~> 0.35.1"}
]
end
```
Expand Down
4 changes: 2 additions & 2 deletions examples/demuxer_isom.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Example do
hackney_opts: [follow_redirect: true]
})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(:output, options: [kind: :video])
|> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:sink_video, %Membrane.File.Sink{location: @output_video}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(:output, options: [kind: :audio])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
Expand Down
220 changes: 199 additions & 21 deletions lib/membrane_mp4/demuxer/isom.ex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the moduledoc

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,22 @@ defmodule Membrane.MP4.Demuxer.ISOM do
The MP4 must have `fast start` enabled, i.e. the `moov` box must precede the `mdat` box.
Once the Demuxer identifies the tracks in the MP4, `t:new_tracks_t/0` notification is sent for each of the tracks.

All the tracks in the MP4 must have a corresponding output pad linked (`Pad.ref(:output, track_id)`).
All pads has to be linked either before `handle_playing/2` callback or after the Element sends `{:new_tracks, ...}`
notification.

Number of pads has to be equal to the number of demuxed tracks.

If the demuxed data contains only one track, linked pad doesn't have to specify `:kind` option.

If there are more than one track and pads are linked before `handle_playing/2`, every pad has to specify `:kind`
option.

If any of pads isn't linked before `handle_playing/2`, #{inspect(__MODULE__)} will send `{:new_tracks, ...}`
notification to the parent. Otherwise, if any of them is linked before `handle_playing/3`, this notification won't
be sent.

If pads are linked after the `{:new_tracks, ...}` notfitaction, their references must match MP4 tracks ids
(`Pad.ref(:output, track_id)`).
"""
use Membrane.Filter

Expand Down Expand Up @@ -35,7 +50,15 @@ defmodule Membrane.MP4.Demuxer.ISOM do
%Membrane.Opus{self_delimiting?: false}
),
availability: :on_request,
flow_control: :auto
options: [
kind: [
spec: :video | :audio | nil,
default: nil,
description: """
Specifies, what kind of data can be handled by a pad.
"""
]
]

def_options optimize_for_non_fast_start?: [
default: false,
Expand Down Expand Up @@ -82,7 +105,10 @@ defmodule Membrane.MP4.Demuxer.ISOM do
boxes_size: 0,
mdat_beginning: nil,
mdat_size: nil,
mdat_header_size: nil
mdat_header_size: nil,
track_to_pad_id: %{},
track_notifications_sent?: false,
pads_linked_before_notification?: false
}

{[], state}
Expand Down Expand Up @@ -147,7 +173,7 @@ defmodule Membrane.MP4.Demuxer.ISOM do
state.partial <> buffer.payload
)

buffers = get_buffer_actions(samples)
buffers = get_buffer_actions(samples, state)

{buffers, %{state | samples_info: samples_info, partial: rest}}
end
Expand Down Expand Up @@ -356,22 +382,26 @@ defmodule Membrane.MP4.Demuxer.ISOM do

state = %{state | samples_info: samples_info, partial: rest}

state = match_tracks_with_pads(ctx, state)

all_pads_connected? = all_pads_connected?(ctx, state)

{buffers, state} =
if all_pads_connected? do
{get_buffer_actions(samples), state}
{get_buffer_actions(samples, state), state}
else
{[], store_samples(state, samples)}
end

notifications = get_track_notifications(state)
notifications = maybe_get_track_notifications(state)

stream_format = if all_pads_connected?, do: get_stream_format(state), else: []

state =
%{
state
| all_pads_connected?: all_pads_connected?
| all_pads_connected?: all_pads_connected?,
track_notifications_sent?: true
}
|> update_fsm_state()

Expand All @@ -385,9 +415,10 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end)
end

defp get_buffer_actions(samples) do
defp get_buffer_actions(samples, state) do
Enum.map(samples, fn {buffer, track_id} ->
{:buffer, {Pad.ref(:output, track_id), buffer}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffer}}
end)
end

Expand All @@ -398,12 +429,98 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end
end

defp get_track_notifications(state) do
defp match_tracks_with_pads(ctx, state) do
sample_tables = state.samples_info.sample_tables

output_pads_data =
ctx.pads
|> Map.values()
|> Enum.reject(fn %{ref: pad_ref} -> pad_ref == :input end)

if length(output_pads_data) not in [0, map_size(sample_tables)] do
raise_pads_not_matching_codecs_error!(ctx, state)
end

track_to_pad_id =
case output_pads_data do
[] ->
sample_tables
|> Map.new(fn {track_id, _table} -> {track_id, track_id} end)

[pad_data] ->
{track_id, table} = Enum.at(sample_tables, 0)

if pad_data.options.kind not in [
nil,
sample_description_to_kind(table.sample_description)
] do
raise_pads_not_matching_codecs_error!(ctx, state)
end

%{track_id => pad_data_to_pad_id(pad_data)}

_many ->
kind_to_pads_data = output_pads_data |> Enum.group_by(& &1.options.kind)

kind_to_tracks =
sample_tables
|> Enum.group_by(
fn {_track_id, table} -> sample_description_to_kind(table.sample_description) end,
fn {track_id, _table} -> track_id end
)

raise? =
Enum.any?(kind_to_pads_data, fn {kind, pads} ->
length(pads) != length(kind_to_tracks[kind])
end)

if raise?, do: raise_pads_not_matching_codecs_error!(ctx, state)

kind_to_tracks
|> Enum.flat_map(fn {kind, tracks} ->
pad_refs = kind_to_pads_data[kind] |> Enum.map(&pad_data_to_pad_id/1)
Enum.zip(tracks, pad_refs)
end)
|> Map.new()
end

%{state | track_to_pad_id: Map.new(track_to_pad_id)}
end

defp pad_data_to_pad_id(%{ref: Pad.ref(_name, id)}), do: id

@spec raise_pads_not_matching_codecs_error!(map(), map()) :: no_return()
defp raise_pads_not_matching_codecs_error!(ctx, state) do
pads_kinds =
ctx.pads
|> Enum.flat_map(fn
{:input, _pad_data} -> []
{_pad_ref, %{options: %{kind: kind}}} -> [kind]
end)

tracks_codecs =
state.samples_info.sample_tables
|> Enum.map(fn {_track, table} -> table.sample_description.__struct__ end)

raise """
Pads kinds don't match with tracks codecs. Pads kinds are #{inspect(pads_kinds)}. \
Tracks codecs are #{inspect(tracks_codecs)}
"""
end

defp sample_description_to_kind(%Membrane.H264{}), do: :video
defp sample_description_to_kind(%Membrane.H265{}), do: :video
defp sample_description_to_kind(%Membrane.AAC{}), do: :audio
defp sample_description_to_kind(%Membrane.Opus{}), do: :audio

defp maybe_get_track_notifications(%{pads_linked_before_notification?: true}), do: []

defp maybe_get_track_notifications(%{pads_linked_before_notification?: false} = state) do
new_tracks =
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
content = table.sample_description
{track_id, content}
pad_id = state.track_to_pad_id[track_id]
{pad_id, table.sample_description}
end)

[{:notify_parent, {:new_tracks, new_tracks}}]
Expand All @@ -412,7 +529,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
defp get_stream_format(state) do
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
{:stream_format, {Pad.ref(:output, track_id), table.sample_description}}
pad_id = state.track_to_pad_id[track_id]
{:stream_format, {Pad.ref(:output, pad_id), table.sample_description}}
end)
end

Expand All @@ -425,7 +543,23 @@ defmodule Membrane.MP4.Demuxer.ISOM do
raise "All tracks have corresponding pad already connected"
end

def handle_pad_added(Pad.ref(:output, _track_id), ctx, state) do
def handle_pad_added(Pad.ref(:output, _track_id) = pad_ref, ctx, state) do
state =
case ctx.playback do
:stopped ->
%{state | pads_linked_before_notification?: true}

:playing when state.track_notifications_sent? ->
state

:playing ->
raise """
Pads can be linked either before #{inspect(__MODULE__)} enters :playing playback or after it \
sends {:new_tracks, ...} notification
"""
end

:ok = validate_pad_kind!(pad_ref, ctx.pad_options.kind, ctx, state)
all_pads_connected? = all_pads_connected?(ctx, state)

{actions, state} =
Expand All @@ -444,6 +578,55 @@ defmodule Membrane.MP4.Demuxer.ISOM do
{actions, state}
end

defp validate_pad_kind!(pad_ref, pad_kind, ctx, state) do
allowed_kinds = [nil, :audio, :video]

if pad_kind not in allowed_kinds do
raise """
Pad #{inspect(pad_ref)} has :kind option set to #{inspect(pad_kind)}, while it has te be one of \
#{[:audio, :video] |> inspect()} or be unspecified.
"""
end

if not state.track_notifications_sent? and
Enum.count(ctx.pads, &match?({Pad.ref(:output, _id), %{options: %{kind: nil}}}, &1)) > 1 do
raise """
If pads are linked before :new_tracks notifications and there are more then one of them, pad option \
:kind has to be specyfied.
"""
end

if state.track_notifications_sent? do
Pad.ref(:output, pad_id) = pad_ref

related_track =
state.track_to_pad_id
|> Map.keys()
|> Enum.find(&(state.track_to_pad_id[&1] == pad_id))

if related_track == nil do
raise """
Pad #{inspect(pad_ref)} doesn't have a related track. If you link pads after #{inspect(__MODULE__)} \
sent the track notification, you have to restrict yourself to the pad occuring in this notification. \
Tracks, that occured in this notification are: #{Map.keys(state.track_to_pad_id) |> inspect()}
"""
end

track_kind =
state.samples_info.sample_tables[related_track].sample_description
|> sample_description_to_kind()

if pad_kind != nil and pad_kind != track_kind do
raise """
Pad option :kind must match with the kind of the related track or be equal nil, but pad #{inspect(pad_ref)} \
kind is #{inspect(pad_kind)}, while the related track kind is #{inspect(track_kind)}
"""
end
end

:ok
end

@impl true
def handle_end_of_stream(:input, _ctx, %{all_pads_connected?: false} = state) do
{[], %{state | end_of_stream?: true}}
Expand All @@ -465,12 +648,6 @@ defmodule Membrane.MP4.Demuxer.ISOM do
_pad -> []
end)

Enum.each(pads, fn pad ->
if pad not in tracks do
raise "An output pad connected with #{pad} id, however no matching track exists"
end
end)

Range.size(tracks) == length(pads)
end

Expand All @@ -482,7 +659,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
|> Enum.reverse()
|> Enum.map(fn {buffer, ^track_id} -> buffer end)

{:buffer, {Pad.ref(:output, track_id), buffers}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffers}}
end)

state = %{state | buffered_samples: %{}}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.MP4.Plugin.MixProject do
use Mix.Project

@version "0.35.0"
@version "0.35.1"
@github_url "https://github.com/membraneframework/membrane_mp4_plugin"

def project do
Expand Down
13 changes: 6 additions & 7 deletions test/membrane_mp4/demuxer/isom/demuxer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -284,28 +284,27 @@ defmodule Membrane.MP4.Demuxer.ISOM.DemuxerTest do
end

defp start_testing_pipeline!(opts) do
structure = [
spec =
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> child(:sink, %Membrane.File.Sink{location: opts[:output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.start_link_supervised!(spec: spec)
end

defp start_testing_pipeline_with_two_tracks!(opts) do
structure = [
spec = [
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(:output, options: [kind: :video])
|> child(:video_sink, %Membrane.File.Sink{location: opts[:video_output_file]}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [kind: :audio])
|> child(:audio_sink, %Membrane.File.Sink{location: opts[:audio_output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.start_link_supervised!(spec: spec)
end

defp start_remote_pipeline!(opts) do
Expand Down
Loading