-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Forward timestamps #57
Conversation
I have crash with the following pipeline opus_decode -> convert 48khz to 32khz -> g7221 encoder -> rtp, this merge fixes it. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, if passing pts
from input to output is a good approach here, I recommend to ask somebody more familiar with this plugin, if it doesn't require more sophisticated logic (e.g. passing on output pts
of the first buffer, which payload was part of state.queue
)
|> Map.merge(%{ | ||
current_pts: pts | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|> Map.merge(%{ | |
current_pts: pts | |
}) | |
|> Map.put(:current_pts, pts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bartkrak actually, the right approach would be to add this field in handle_init
and do %{state | current_pts: pts}
. Adding fields to state in various callbacks makes it hard to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we spoke, we must take into account the fact that the resampler may queue the stream
output_frame_size = RawAudio.frame_size(state.output_stream_format) | ||
ratio = output_frame_size / input_frame_size | ||
|
||
expected_output_frames_count = byte_size(payload) / input_frame_size * ratio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should round to an integer here
Map.update!(state, :pts_queue, fn _pts_queue -> | ||
[{out_pts, expected_frames - converted_frames_count}] ++ rest | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be %{state | pts_queue: ...
cond do | ||
converted_frames_count < expected_frames -> | ||
state = | ||
Map.update!(state, :pts_queue, fn _pts_queue -> | ||
[{out_pts, expected_frames - converted_frames_count}] ++ rest | ||
end) | ||
|
||
{state, out_pts} | ||
|
||
converted_frames_count >= expected_frames -> | ||
{%{state | pts_queue: rest}, out_pts} | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like an if
@@ -157,7 +157,14 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do | |||
|
|||
assert {[], new_state} = @module.handle_buffer(:input, buffer, nil, state) | |||
|
|||
assert new_state == %{state | queue: payload} | |||
# this is necesary to ignore state.pts_queue in the assertion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can set it to nil
on both sides then
@@ -178,5 +186,21 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do | |||
assert actions == [buffer: {:output, %Membrane.Buffer{payload: result}}] | |||
assert new_state == %{state | queue: <<0::2*8>>} | |||
end | |||
|
|||
# test "timestamps forward test", %{state: initial_state} do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover
|
||
test "pts forward test" do | ||
input_stream_format = %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 2} | ||
# input_stream_format = %RawAudio{sample_format: :u8, sample_rate: 8_000, channels: 1} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more leftovers
@@ -128,7 +128,7 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||
output_frame_size = RawAudio.frame_size(state.output_stream_format) | |||
ratio = output_frame_size / input_frame_size | |||
|
|||
expected_output_frames_count = byte_size(payload) / input_frame_size * ratio | |||
expected_output_frames_count = (byte_size(payload) / input_frame_size * ratio) |> trunc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's necessary to use round
. Otherwise we'd expect 2137 frames if the float result is 2137.99
output_stream_format = %RawAudio{sample_format: :s32le, sample_rate: 32_000, channels: 2} | ||
|
||
# 32 frames * 2048 bytes | ||
fixture_path = Path.expand(Path.join(__DIR__, "/../fixtures/input_s16le_stereo_16khz.raw")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the run path in tests is always the project root, so it's safe to do
fixture_path = Path.expand(Path.join(__DIR__, "/../fixtures/input_s16le_stereo_16khz.raw")) | |
fixture_path = "test/fixtures/input_s16le_stereo_16khz.raw" |
|
||
Enum.each(0..30, fn index -> | ||
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: out_pts}) | ||
assert out_pts == index * 31_250_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make this magic number a variable
@spec split_binary(binary(), list(binary())) :: list(binary()) | ||
def split_binary(binary, acc \\ []) | ||
|
||
def split_binary(<<binary::binary-size(2048), rest::binary>>, acc) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be hard to have the chunk length differ from buffer to buffer? I guess we could have better coverage that way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if it's too much work let's leave it as is
|> Enum.map(fn {payload, index} -> | ||
%Membrane.Buffer{ | ||
payload: payload, | ||
pts: index * 31_250_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually pts_multiplayer can be a module attribute and you can use it here as well
assert out_pts == index * pts_multiplier | ||
end) | ||
|
||
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: 937_500_000}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: 937_500_000}) | |
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: 30 * pts_multiplier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using multiplication inside a match throws compile error
assert_start_of_stream(pipeline, :sink) | ||
assert_sink_buffer(pipeline, :sink, _buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second assertion makes the first one redundant
input_frame_size = RawAudio.frame_size(state.input_stream_format) | ||
output_frame_size = RawAudio.frame_size(state.output_stream_format) | ||
ratio = output_frame_size / input_frame_size | ||
|
||
expected_output_frames_count = (byte_size(payload) / input_frame_size * ratio) |> round() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more precise to write (byte_size(payload) * output_frame_size / (input_frame_size * input_frame_size))
because of limitted accuracy of floats
@@ -212,4 +233,15 @@ defmodule Membrane.FFmpeg.SWResample.Converter do | |||
{:error, reason} -> raise "Error while flushing converter: #{inspect(reason)}" | |||
end | |||
end | |||
|
|||
defp update_pts_queue(state, converted_frames_count) do | |||
{[{out_pts, expected_frames}], rest} = Enum.split(state.pts_queue, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just match
{[{out_pts, expected_frames}], rest} = Enum.split(state.pts_queue, 1) | |
[{out_pts, expected_frames} | rest] = state.pts_queue |
partially solves: membraneframework/membrane_core#735