-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Forward timestamps #57
Changes from 10 commits
81933ba
0ac7da5
58039a7
c39aa63
bffb6fb
e0285c7
3ccd0bf
f3de578
cd2486d
7e7c7bb
d78762e
1473e39
9ebe020
1751941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -64,7 +64,8 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
|> Map.merge(%{ | ||||||
native: nil, | ||||||
queue: <<>>, | ||||||
input_stream_format_provided?: options.input_stream_format != nil | ||||||
input_stream_format_provided?: options.input_stream_format != nil, | ||||||
pts_queue: [] | ||||||
}) | ||||||
|
||||||
{[], state} | ||||||
|
@@ -122,16 +123,29 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
end | ||||||
|
||||||
@impl true | ||||||
def handle_buffer(:input, %Buffer{payload: payload}, _ctx, state) do | ||||||
def handle_buffer(:input, %Buffer{payload: payload, pts: input_pts}, _ctx, state) do | ||||||
input_frame_size = RawAudio.frame_size(state.input_stream_format) | ||||||
output_frame_size = RawAudio.frame_size(state.output_stream_format) | ||||||
ratio = output_frame_size / input_frame_size | ||||||
|
||||||
expected_output_frames_count = (byte_size(payload) / input_frame_size * ratio) |> trunc() | ||||||
|
||||||
state = | ||||||
Map.update!(state, :pts_queue, fn pts_queue -> | ||||||
pts_queue ++ [{input_pts, expected_output_frames_count}] | ||||||
end) | ||||||
|
||||||
conversion_result = | ||||||
convert!(state.native, RawAudio.frame_size(state.input_stream_format), payload, state.queue) | ||||||
convert!(state.native, input_frame_size, payload, state.queue) | ||||||
|
||||||
case conversion_result do | ||||||
{<<>>, queue} -> | ||||||
{[], %{state | queue: queue}} | ||||||
|
||||||
{converted, queue} -> | ||||||
{[buffer: {:output, %Buffer{payload: converted}}], %{state | queue: queue}} | ||||||
{state, out_pts} = update_pts_queue(state, byte_size(converted) / output_frame_size) | ||||||
|
||||||
{[buffer: {:output, %Buffer{payload: converted, pts: out_pts}}], %{state | queue: queue}} | ||||||
end | ||||||
end | ||||||
|
||||||
|
@@ -154,8 +168,15 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
{[end_of_stream: :output], %{state | queue: <<>>}} | ||||||
|
||||||
converted -> | ||||||
{[buffer: {:output, %Buffer{payload: converted}}, end_of_stream: :output], | ||||||
%{state | queue: <<>>}} | ||||||
converted_frames_count = | ||||||
byte_size(converted) / RawAudio.frame_size(state.output_stream_format) | ||||||
|
||||||
{state, out_pts} = update_pts_queue(state, converted_frames_count) | ||||||
|
||||||
{[ | ||||||
buffer: {:output, %Buffer{payload: converted, pts: out_pts}}, | ||||||
end_of_stream: :output | ||||||
], %{state | queue: <<>>}} | ||||||
end | ||||||
end | ||||||
|
||||||
|
@@ -212,4 +233,15 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
{:error, reason} -> raise "Error while flushing converter: #{inspect(reason)}" | ||||||
end | ||||||
end | ||||||
|
||||||
defp update_pts_queue(state, converted_frames_count) do | ||||||
{[{out_pts, expected_frames}], rest} = Enum.split(state.pts_queue, 1) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just match
Suggested change
|
||||||
|
||||||
if converted_frames_count < expected_frames do | ||||||
{%{state | pts_queue: [{out_pts, expected_frames - converted_frames_count}] ++ rest}, | ||||||
out_pts} | ||||||
else | ||||||
{%{state | pts_queue: rest}, out_pts} | ||||||
end | ||||||
end | ||||||
end |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,64 @@ | ||||||
defmodule Membrane.FFmpeg.SWResample.PtsForwardTest do | ||||||
use ExUnit.Case | ||||||
|
||||||
import Membrane.ChildrenSpec | ||||||
import Membrane.Testing.Assertions | ||||||
|
||||||
alias Membrane.FFmpeg.SWResample.Converter | ||||||
alias Membrane.{RawAudio, Testing} | ||||||
|
||||||
test "pts forward test" do | ||||||
input_stream_format = %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 2} | ||||||
output_stream_format = %RawAudio{sample_format: :s32le, sample_rate: 32_000, channels: 2} | ||||||
|
||||||
# 32 frames * 2048 bytes | ||||||
fixture_path = Path.expand(Path.join(__DIR__, "/../fixtures/input_s16le_stereo_16khz.raw")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the run path in tests is always the project root, so it's safe to do
Suggested change
|
||||||
|
||||||
spec = [ | ||||||
child(:source, %Membrane.Testing.Source{output: buffers_from_file(fixture_path)}) | ||||||
|> child(:resampler, %Converter{ | ||||||
input_stream_format: input_stream_format, | ||||||
output_stream_format: output_stream_format | ||||||
}) | ||||||
|> child(:sink, Membrane.Testing.Sink) | ||||||
] | ||||||
|
||||||
pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) | ||||||
assert_start_of_stream(pipeline, :sink) | ||||||
assert_sink_buffer(pipeline, :sink, _buffer) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The second assertion makes the first one redundant |
||||||
|
||||||
Enum.each(0..30, fn index -> | ||||||
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: out_pts}) | ||||||
assert out_pts == index * 31_250_000 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's make this magic number a variable |
||||||
end) | ||||||
|
||||||
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: 937_500_000}) | ||||||
mat-hek marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. using multiplication inside a match throws compile error |
||||||
|
||||||
assert_end_of_stream(pipeline, :sink) | ||||||
Testing.Pipeline.terminate(pipeline) | ||||||
end | ||||||
|
||||||
defp buffers_from_file(path) do | ||||||
binary = File.read!(path) | ||||||
|
||||||
split_binary(binary) | ||||||
|> Enum.with_index() | ||||||
|> Enum.map(fn {payload, index} -> | ||||||
%Membrane.Buffer{ | ||||||
payload: payload, | ||||||
pts: index * 31_250_000 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually pts_multiplayer can be a module attribute and you can use it here as well |
||||||
} | ||||||
end) | ||||||
end | ||||||
|
||||||
@spec split_binary(binary(), list(binary())) :: list(binary()) | ||||||
def split_binary(binary, acc \\ []) | ||||||
|
||||||
def split_binary(<<binary::binary-size(2048), rest::binary>>, acc) do | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be hard to have the chunk length differ from buffer to buffer? I guess we could have better coverage that way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but if it's too much work let's leave it as is |
||||||
split_binary(rest, [binary] ++ acc) | ||||||
end | ||||||
|
||||||
def split_binary(rest, acc) when byte_size(rest) <= 2048 do | ||||||
Enum.reverse(acc) ++ [rest] | ||||||
end | ||||||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's necessary to use
round
. Otherwise we'd expect 2137 frames if the float result is 2137.99