Skip to content

Commit

Permalink
Use metadata field if present
Browse files Browse the repository at this point in the history
  • Loading branch information
Noarkhh committed May 13, 2024
1 parent cc35d3a commit 009792d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
39 changes: 31 additions & 8 deletions lib/membrane_opus/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Membrane.Opus.Parser do

use Membrane.Filter

require Membrane.Logger
alias __MODULE__.{Delimitation, FrameLengths}
alias Membrane.{Buffer, Opus, RemoteStream}
alias Membrane.Opus.Util
Expand Down Expand Up @@ -48,8 +49,9 @@ defmodule Membrane.Opus.Parser do
default: false,
description: """
If this is set to true parser will try to generate timestamps
starting from 0 and increasing them by frame duration,
otherwise it will pass pts from input to output, even if it's nil.
starting from 0 and increasing them by frame duration and validating them
based on `ogg_page_pts` metadata field (if present).
Otherwise it will pass pts from input to output, even if it's nil.
"""
]

Expand Down Expand Up @@ -80,33 +82,54 @@ defmodule Membrane.Opus.Parser do

defp set_current_pts(
%{generate_best_effort_timestamps?: true, current_pts: nil} = state,
_input_pts
_input_pts,
_metadata
) do
%{state | current_pts: 0}
end

defp set_current_pts(%{generate_best_effort_timestamps?: false, queue: <<>>} = state, input_pts) do
defp set_current_pts(
%{generate_best_effort_timestamps?: true, current_pts: current_pts} = state,
_input_pts,
%{ogg_page_pts: ogg_page_pts} = _metadata
) do
if current_pts != ogg_page_pts do
Membrane.Logger.warning(
"Best effort PTS calculated from frame durations (#{current_pts}) differs from the one based on OGG granule position (#{ogg_page_pts}), assuming the latter one as correct."
)
end

%{state | current_pts: ogg_page_pts}
end

defp set_current_pts(
%{generate_best_effort_timestamps?: false, queue: <<>>} = state,
input_pts,
_metadata
) do
%{state | current_pts: input_pts}
end

defp set_current_pts(state, _input_pts), do: state
defp set_current_pts(state, _input_pts, _metadata), do: state

@impl true
def handle_buffer(:input, %Buffer{payload: data, pts: input_pts}, ctx, state) do
def handle_buffer(:input, %Buffer{payload: data, pts: pts, metadata: metadata}, ctx, state) do
{delimitation_processor, self_delimiting?} =
Delimitation.get_processor(state.delimitation, state.input_delimitted?)

check_pts_integrity? = state.queue != <<>> and not state.generate_best_effort_timestamps?

state = set_current_pts(state, pts, metadata)

{:ok, queue, packets, channels, state} =
maybe_parse(
state.queue <> data,
delimitation_processor,
set_current_pts(state, input_pts)
state
)

if check_pts_integrity? do
Util.validate_pts_integrity(packets, input_pts)
Util.validate_pts_integrity(packets, pts)
end

stream_format = %Opus{
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane_opus/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ defmodule Membrane.Opus.Util do
end
end

@spec validate_pts_integrity([Membrane.Buffer.t()], integer()) :: :ok
@spec validate_pts_integrity([Membrane.Buffer.t()], integer() | nil) :: :ok
def validate_pts_integrity(packets, input_pts) do
cond do
input_pts == nil ->
:ok

length(packets) < 2 or Enum.at(packets, 1).pts == input_pts ->
:ok

Expand Down

0 comments on commit 009792d

Please sign in to comment.