Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward timestamps #57

Merged
merged 14 commits into from
Apr 3, 2024
Merged

Forward timestamps #57

merged 14 commits into from
Apr 3, 2024

Conversation

bartkrak
Copy link
Contributor

@bartkrak bartkrak commented Mar 12, 2024

@spscream
Copy link

I have crash with the following pipeline opus_decode -> convert 48khz to 32khz -> g7221 encoder -> rtp, this merge fixes it. Thanks

Copy link
Member

@FelonEkonom FelonEkonom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, if passing pts from input to output is a good approach here, I recommend to ask somebody more familiar with this plugin, if it doesn't require more sophisticated logic (e.g. passing on output pts of the first buffer, which payload was part of state.queue)

Comment on lines 128 to 130
|> Map.merge(%{
current_pts: pts
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|> Map.merge(%{
current_pts: pts
})
|> Map.put(:current_pts, pts)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bartkrak actually, the right approach would be to add this field in handle_init and do %{state | current_pts: pts}. Adding fields to state in various callbacks makes it hard to follow.

Copy link
Member

@mat-hek mat-hek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we spoke, we must take into account the fact that the resampler may queue the stream

@bartkrak bartkrak requested a review from mat-hek March 21, 2024 18:06
output_frame_size = RawAudio.frame_size(state.output_stream_format)
ratio = output_frame_size / input_frame_size

expected_output_frames_count = byte_size(payload) / input_frame_size * ratio
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should round to an integer here

Comment on lines 243 to 245
Map.update!(state, :pts_queue, fn _pts_queue ->
[{out_pts, expected_frames - converted_frames_count}] ++ rest
end)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be %{state | pts_queue: ...

Comment on lines 240 to 251
cond do
converted_frames_count < expected_frames ->
state =
Map.update!(state, :pts_queue, fn _pts_queue ->
[{out_pts, expected_frames - converted_frames_count}] ++ rest
end)

{state, out_pts}

converted_frames_count >= expected_frames ->
{%{state | pts_queue: rest}, out_pts}
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like an if

@@ -157,7 +157,14 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do

assert {[], new_state} = @module.handle_buffer(:input, buffer, nil, state)

assert new_state == %{state | queue: payload}
# this is necesary to ignore state.pts_queue in the assertion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can set it to nil on both sides then

@@ -178,5 +186,21 @@ defmodule Membrane.FFmpeg.SWResample.ConverterTest do
assert actions == [buffer: {:output, %Membrane.Buffer{payload: result}}]
assert new_state == %{state | queue: <<0::2*8>>}
end

# test "timestamps forward test", %{state: initial_state} do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover


test "pts forward test" do
input_stream_format = %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 2}
# input_stream_format = %RawAudio{sample_format: :u8, sample_rate: 8_000, channels: 1}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more leftovers

@bartkrak bartkrak requested a review from mat-hek March 25, 2024 18:45
@@ -128,7 +128,7 @@ defmodule Membrane.FFmpeg.SWResample.Converter do
output_frame_size = RawAudio.frame_size(state.output_stream_format)
ratio = output_frame_size / input_frame_size

expected_output_frames_count = byte_size(payload) / input_frame_size * ratio
expected_output_frames_count = (byte_size(payload) / input_frame_size * ratio) |> trunc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's necessary to use round. Otherwise we'd expect 2137 frames if the float result is 2137.99

output_stream_format = %RawAudio{sample_format: :s32le, sample_rate: 32_000, channels: 2}

# 32 frames * 2048 bytes
fixture_path = Path.expand(Path.join(__DIR__, "/../fixtures/input_s16le_stereo_16khz.raw"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the run path in tests is always the project root, so it's safe to do

Suggested change
fixture_path = Path.expand(Path.join(__DIR__, "/../fixtures/input_s16le_stereo_16khz.raw"))
fixture_path = "test/fixtures/input_s16le_stereo_16khz.raw"


Enum.each(0..30, fn index ->
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: out_pts})
assert out_pts == index * 31_250_000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make this magic number a variable

@spec split_binary(binary(), list(binary())) :: list(binary())
def split_binary(binary, acc \\ [])

def split_binary(<<binary::binary-size(2048), rest::binary>>, acc) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be hard to have the chunk length differ from buffer to buffer? I guess we could have better coverage that way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if it's too much work let's leave it as is

@bartkrak bartkrak requested a review from mat-hek March 26, 2024 16:56
|> Enum.map(fn {payload, index} ->
%Membrane.Buffer{
payload: payload,
pts: index * 31_250_000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually pts_multiplayer can be a module attribute and you can use it here as well

assert out_pts == index * pts_multiplier
end)

assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: 937_500_000})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: 937_500_000})
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{pts: 30 * pts_multiplier)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using multiplication inside a match throws compile error

@bartkrak bartkrak requested a review from FelonEkonom April 2, 2024 12:39
Comment on lines 29 to 30
assert_start_of_stream(pipeline, :sink)
assert_sink_buffer(pipeline, :sink, _buffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second assertion makes the first one redundant

Comment on lines 127 to 131
input_frame_size = RawAudio.frame_size(state.input_stream_format)
output_frame_size = RawAudio.frame_size(state.output_stream_format)
ratio = output_frame_size / input_frame_size

expected_output_frames_count = (byte_size(payload) / input_frame_size * ratio) |> round()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is more precise to write (byte_size(payload) * output_frame_size / (input_frame_size * input_frame_size)) because of limitted accuracy of floats

@@ -212,4 +233,15 @@ defmodule Membrane.FFmpeg.SWResample.Converter do
{:error, reason} -> raise "Error while flushing converter: #{inspect(reason)}"
end
end

defp update_pts_queue(state, converted_frames_count) do
{[{out_pts, expected_frames}], rest} = Enum.split(state.pts_queue, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just match

Suggested change
{[{out_pts, expected_frames}], rest} = Enum.split(state.pts_queue, 1)
[{out_pts, expected_frames} | rest] = state.pts_queue

@bartkrak bartkrak requested a review from FelonEkonom April 2, 2024 18:46
@bartkrak bartkrak merged commit 47f76bd into master Apr 3, 2024
3 checks passed
@bartkrak bartkrak deleted the forward_timestamps branch April 3, 2024 12:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants