-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Forward timestamps #57
Changes from 8 commits
81933ba
0ac7da5
58039a7
c39aa63
bffb6fb
e0285c7
3ccd0bf
f3de578
cd2486d
7e7c7bb
d78762e
1473e39
9ebe020
1751941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -64,7 +64,8 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
|> Map.merge(%{ | ||||||
native: nil, | ||||||
queue: <<>>, | ||||||
input_stream_format_provided?: options.input_stream_format != nil | ||||||
input_stream_format_provided?: options.input_stream_format != nil, | ||||||
pts_queue: [] | ||||||
}) | ||||||
|
||||||
{[], state} | ||||||
|
@@ -122,16 +123,29 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
end | ||||||
|
||||||
@impl true | ||||||
def handle_buffer(:input, %Buffer{payload: payload}, _ctx, state) do | ||||||
def handle_buffer(:input, %Buffer{payload: payload, pts: input_pts}, _ctx, state) do | ||||||
input_frame_size = RawAudio.frame_size(state.input_stream_format) | ||||||
output_frame_size = RawAudio.frame_size(state.output_stream_format) | ||||||
ratio = output_frame_size / input_frame_size | ||||||
|
||||||
expected_output_frames_count = byte_size(payload) / input_frame_size * ratio | ||||||
|
||||||
state = | ||||||
Map.update!(state, :pts_queue, fn pts_queue -> | ||||||
pts_queue ++ [{input_pts, expected_output_frames_count}] | ||||||
end) | ||||||
|
||||||
conversion_result = | ||||||
convert!(state.native, RawAudio.frame_size(state.input_stream_format), payload, state.queue) | ||||||
convert!(state.native, input_frame_size, payload, state.queue) | ||||||
|
||||||
case conversion_result do | ||||||
{<<>>, queue} -> | ||||||
{[], %{state | queue: queue}} | ||||||
|
||||||
{converted, queue} -> | ||||||
{[buffer: {:output, %Buffer{payload: converted}}], %{state | queue: queue}} | ||||||
{state, out_pts} = update_pts_queue(state, byte_size(converted) / output_frame_size) | ||||||
|
||||||
{[buffer: {:output, %Buffer{payload: converted, pts: out_pts}}], %{state | queue: queue}} | ||||||
end | ||||||
end | ||||||
|
||||||
|
@@ -154,8 +168,15 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
{[end_of_stream: :output], %{state | queue: <<>>}} | ||||||
|
||||||
converted -> | ||||||
{[buffer: {:output, %Buffer{payload: converted}}, end_of_stream: :output], | ||||||
%{state | queue: <<>>}} | ||||||
converted_frames_count = | ||||||
byte_size(converted) / RawAudio.frame_size(state.output_stream_format) | ||||||
|
||||||
{state, out_pts} = update_pts_queue(state, converted_frames_count) | ||||||
|
||||||
{[ | ||||||
buffer: {:output, %Buffer{payload: converted, pts: out_pts}}, | ||||||
end_of_stream: :output | ||||||
], %{state | queue: <<>>}} | ||||||
end | ||||||
end | ||||||
|
||||||
|
@@ -212,4 +233,21 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||||
{:error, reason} -> raise "Error while flushing converter: #{inspect(reason)}" | ||||||
end | ||||||
end | ||||||
|
||||||
defp update_pts_queue(state, converted_frames_count) do | ||||||
{[{out_pts, expected_frames}], rest} = Enum.split(state.pts_queue, 1) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just match
Suggested change
|
||||||
|
||||||
cond do | ||||||
converted_frames_count < expected_frames -> | ||||||
state = | ||||||
Map.update!(state, :pts_queue, fn _pts_queue -> | ||||||
[{out_pts, expected_frames - converted_frames_count}] ++ rest | ||||||
end) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be |
||||||
|
||||||
{state, out_pts} | ||||||
|
||||||
converted_frames_count >= expected_frames -> | ||||||
{%{state | pts_queue: rest}, out_pts} | ||||||
end | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems like an if |
||||||
end | ||||||
end |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,8 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do | |
output_stream_format: @u8_format, | ||
frames_per_buffer: 2048, | ||
native: nil, | ||
queue: <<>> | ||
queue: <<>>, | ||
pts_queue: [] | ||
} | ||
} | ||
end | ||
|
@@ -156,7 +157,14 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do | |
|
||
assert {[], new_state} = @module.handle_buffer(:input, buffer, nil, state) | ||
|
||
assert new_state == %{state | queue: payload} | ||
# this is necesary to ignore state.pts_queue in the assertion | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can set it to |
||
assert new_state.frames_per_buffer == state.frames_per_buffer | ||
assert new_state.input_stream_format == state.input_stream_format | ||
assert new_state.input_stream_format_provided? == state.input_stream_format_provided? | ||
assert new_state.native == state.native | ||
assert new_state.output_stream_format == state.output_stream_format | ||
assert new_state.queue == payload | ||
|
||
refute_called(@native, :convert) | ||
end | ||
|
||
|
@@ -178,5 +186,21 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do | |
assert actions == [buffer: {:output, %Membrane.Buffer{payload: result}}] | ||
assert new_state == %{state | queue: <<0::2*8>>} | ||
end | ||
|
||
# test "timestamps forward test", %{state: initial_state} do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. leftover |
||
# state = %{ | ||
# initial_state | ||
# | queue: <<250, 250, 0>>, | ||
# native: :mock_handle, | ||
# input_stream_format: @s16le_format | ||
# } | ||
# payload = <<0::7*8>> | ||
# pts = 1000 | ||
# buffer = %Membrane.Buffer{payload: payload, pts: pts} | ||
# result = <<250, 0, 0, 0>> | ||
# mock(@native, [convert: 2], {:ok, result}) | ||
# assert {[buffer: {:output, %Membrane.Buffer{pts: ^pts}}], _state} = | ||
# @module.handle_buffer(:input, buffer, nil, state) | ||
# end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,63 @@ | ||||||
defmodule Membrane.FFmpeg.SWResample.PtsForwardTest do | ||||||
use ExUnit.Case | ||||||
|
||||||
import Membrane.ChildrenSpec | ||||||
import Membrane.Testing.Assertions | ||||||
|
||||||
alias Membrane.FFmpeg.SWResample.Converter | ||||||
alias Membrane.{RawAudio, Testing} | ||||||
|
||||||
test "pts forward test" do | ||||||
input_stream_format = %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 2} | ||||||
# input_stream_format = %RawAudio{sample_format: :u8, sample_rate: 8_000, channels: 1} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. more leftovers |
||||||
output_stream_format = %RawAudio{sample_format: :s32le, sample_rate: 32_000, channels: 2} | ||||||
|
||||||
# 32 frames * 2048 bytes | ||||||
fixture_path = Path.expand(Path.join(__DIR__, "/../fixtures/input_s16le_stereo_16khz.raw")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the run path in tests is always the project root, so it's safe to do
Suggested change
|
||||||
# fixture_path = Path.expand(Path.join(__DIR__, "/../fixtures/input_u8_mono_8khz.raw")) | ||||||
|
||||||
spec = [ | ||||||
child(:source, %Membrane.Testing.Source{output: buffers_from_file(fixture_path)}) | ||||||
|> child(:resampler, %Converter{ | ||||||
input_stream_format: input_stream_format, | ||||||
output_stream_format: output_stream_format | ||||||
}) | ||||||
|> child(:sink, Membrane.Testing.Sink) | ||||||
] | ||||||
|
||||||
pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) | ||||||
assert_start_of_stream(pipeline, :sink) | ||||||
|
||||||
# Enum.each(0..31, fn index -> | ||||||
# assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: out_pts}) | ||||||
# assert out_pts == index * 31250000 | ||||||
# end) | ||||||
|
||||||
assert_end_of_stream(pipeline, :sink) | ||||||
Testing.Pipeline.terminate(pipeline) | ||||||
end | ||||||
|
||||||
defp buffers_from_file(path) do | ||||||
binary = File.read!(path) | ||||||
|
||||||
split_binary(binary) | ||||||
|> Enum.with_index() | ||||||
|> Enum.map(fn {payload, index} -> | ||||||
%Membrane.Buffer{ | ||||||
payload: payload, | ||||||
pts: index * 31_250_000 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually pts_multiplayer can be a module attribute and you can use it here as well |
||||||
} | ||||||
end) | ||||||
end | ||||||
|
||||||
@spec split_binary(binary(), list(binary())) :: list(binary()) | ||||||
def split_binary(binary, acc \\ []) | ||||||
|
||||||
def split_binary(<<binary::binary-size(2048), rest::binary>>, acc) do | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be hard to have the chunk length differ from buffer to buffer? I guess we could have better coverage that way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but if it's too much work let's leave it as is |
||||||
split_binary(rest, acc ++ [binary]) | ||||||
end | ||||||
|
||||||
def split_binary(rest, acc) when byte_size(rest) <= 2048 do | ||||||
Enum.concat(acc, [rest]) | ||||||
end | ||||||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should round to an integer here