Add tests for TWCCRecorder.get_feedback/1
LVala committed Jan 31, 2024
1 parent a199e0c commit 42a774b
Showing 2 changed files with 206 additions and 53 deletions.
103 changes: 51 additions & 52 deletions lib/ex_webrtc/peer_connection/twcc_recorder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,32 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do
defmodule Timer do
@moduledoc false

@opaque t() :: %__MODULE__{base: non_neg_integer()}
@packet_window 500 * 4

@enforce_keys [:base]
defstruct @enforce_keys

@spec new() :: t()
def packet_window, do: @packet_window

def new do
time = System.monotonic_time(:microsecond)
time = if(time < 0, do: -time, else: 0)
%__MODULE__{base: time}

@spec get_time(t()) :: non_neg_integer()
# timestamps are stored as multiples of 250 microseconds
def get_time(%__MODULE__{base: base}) do
# should be always positive thanks to this
# TODO: storing such a huge values might be inefficient
System.monotonic_time(:microsecond) + base
# timestamps should be always positive thanks to this
(System.monotonic_time(:microsecond) + base)
|> round_to_250()

@spec to_ref(non_neg_integer()) :: non_neg_integer()
def to_ref(timestamp), do: div(timestamp, 1000 * 64)
# to_ref and get_delta require timestamp returned by get_time
def to_ref(timestamp), do: div(timestamp, 64 * 4)
def from_ref(timestamp), do: timestamp * 64 * 4
def get_delta(prev_ts, cur_ts), do: cur_ts - prev_ts

@spec get_delta(non_neg_integer(), non_neg_integer()) :: non_neg_integer()
def get_delta(prev_ts, cur_ts) do
delta = cur_ts - prev_ts
# thanks to this delta is rounded to the closest value
cond do
delta < 0 -> delta - 125
true -> delta + 125
|> div(250)
defp round_to_250(val), do: div(val + 125, 250)

import Bitwise
Expand All @@ -47,10 +41,10 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do
@uint8_range 0..255
@int16_range -32_768..32_767

@max_seq_num 0xFFFF
@max_seq_no 0xFFFF
@max_fb_pkt_count 0xFF
@max_ref_time 0xFFFFFF
@breakpoint 0x7FFF
# packet window in microseconds
@packet_window 500_000

@type t() :: %__MODULE__{
media_ssrc: non_neg_integer(),
Expand All @@ -65,7 +59,7 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do
# start, base - inclusive, end - exclusive
# start, end - actual range where map values might be set
# base - from where packets should be added to next feedback
# if end_seq_no == start_seq_no, then no packets are available
# if end == start, no packets are available
@enforce_keys [:media_ssrc, :sender_ssrc]
defstruct @enforce_keys ++
Expand Down Expand Up @@ -125,21 +119,19 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do

defp unroll(seq_no, end_seq_no) do
end_rolled = roll(end_seq_no)
end_rolled = end_seq_no &&& @max_seq_no
delta = seq_no - end_rolled

delta =
cond do
delta in -@breakpoint..@breakpoint -> delta
delta < -@breakpoint -> delta + @max_seq_num + 1
delta > @breakpoint -> delta - @max_seq_num - 1
delta < -@breakpoint -> delta + @max_seq_no + 1
delta > @breakpoint -> delta - @max_seq_no - 1

end_seq_no + delta

defp roll(seq_no), do: seq_no &&& @max_seq_num

defp remove_old_packets(recorder, cur_timestamp) do
base_seq_no: base_seq_no,
Expand All @@ -148,16 +140,8 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do
timestamps: timestamps
} = recorder

min_timestamp = cur_timestamp - @packet_window

last_old =
Enum.reduce_while(start_seq_no..(end_seq_no - 1), nil, fn i, last_old ->
case Map.fetch(timestamps, i) do
{:ok, timestamp} when timestamp < min_timestamp -> {:cont, i}
{:ok, _timestamp} -> {:halt, last_old}
:error -> {:cont, last_old}
min_ts = cur_timestamp - Timer.packet_window()
last_old = find_last_old(timestamps, min_ts, start_seq_no, end_seq_no)

if is_nil(last_old) do
Expand All @@ -168,7 +152,6 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do

start_seq_no = last_old + 1

base_seq_no = if start_seq_no > base_seq_no, do: start_seq_no, else: base_seq_no

Expand All @@ -181,6 +164,22 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do

defp find_last_old(timestamps, min_ts, start_no, end_no, last_old \\ nil)
defp find_last_old(_timestamps, _min_ts, end_no, end_no, last_old), do: last_old

defp find_last_old(timestamps, min_ts, start_no, end_no, last_old) do
case Map.fetch(timestamps, start_no) do
{:ok, timestamp} when timestamp < min_ts ->
find_last_old(timestamps, min_ts, start_no + 1, end_no, start_no)

{:ok, _timestamp} ->

:error ->
find_last_old(timestamps, min_ts, start_no + 1, end_no, last_old)

@spec get_feedback(t()) :: {t(), [CC.t()]}
def get_feedback(recorder, feedbacks \\ [])

Expand All @@ -197,23 +196,23 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do
timestamps: timestamps
} = recorder

base_timestamp =
ref_timestamp =
base_seq_no..(end_seq_no - 1)
|> Enum.find_value(&Map.get(timestamps, &1))

# TODO: fix Timer.to_ref (we need to pass valid base_ts to add_packets)
ref_timestamp = Timer.to_ref(base_timestamp)
|> Timer.to_ref()

{chunks, deltas, new_base} =
add_packets(timestamps, base_seq_no, end_seq_no, base_timestamp)
add_packets(timestamps, base_seq_no, end_seq_no, Timer.from_ref(ref_timestamp))

# NOTICE: packet_status_count larger than max_uint16 are not handled
# Pion also caps max number of not_received packets at the beginning
feedback = %CC{
media_ssrc: media_ssrc,
sender_ssrc: sender_ssrc,
fb_pkt_count: fb_pkt_count,
base_sequence_number: roll(base_seq_no),
fb_pkt_count: fb_pkt_count &&& @max_fb_pkt_count,
base_sequence_number: base_seq_no &&& @max_seq_no,
packet_status_count: new_base - base_seq_no,
reference_time: ref_timestamp,
reference_time: ref_timestamp &&& @max_ref_time,
packet_chunks: Enum.reverse(chunks) |>,
recv_deltas: Enum.reverse(deltas)
Expand All @@ -224,7 +223,8 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do
base_seq_no: new_base

# TODO: handle feedbacks with no packets?
# NOTICE: should we handle feedbacks with no packets?
# I don't think such case should ever occur
get_feedback(recorder, [feedback | feedbacks])

Expand Down Expand Up @@ -253,7 +253,7 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do
{chunks, deltas, base_no}
chunks = add_to_chunk(symbol, chunks)
deltas = [delta | deltas]
deltas = if is_nil(delta), do: deltas, else: [delta | deltas]
add_packets(timestamps, base_no + 1, end_no, prev_ts, chunks, deltas)
Expand Down Expand Up @@ -290,10 +290,9 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorder do

defp encode_chunk({symbols, _large?, _mixed?}) do
# only the last packet's length might be different than 7 or 14
# so I add padding of :not_received symbols
# but the packet count in feedback header does not include the padding
# so I assume it's fine and the padding is discarded by chrome?
# only the last chunk's length might be different than 7 or 14
# so "padding" of :not_received symbols is added
# but the packet count in feedback header does not include the "padding"
len = length(symbols)

pad_len =
Expand Down
156 changes: 155 additions & 1 deletion test/ex_webrtc/peer_connection/twcc_recorder_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule ExWebRTC.PeerConnection.TWCCRecorderTest do
use ExUnit.Case, async: true

alias ExRTCP.Packet.TransportFeedback.CC
alias ExWebRTC.PeerConnection.TWCCRecorder

@max_seq_no 0xFFFF
Expand Down Expand Up @@ -188,7 +189,160 @@ defmodule ExWebRTC.PeerConnection.TWCCRecorderTest do

describe "get_feedback/1" do
test "" do
test "subsequent packets in order" do
base_no = 578
base_ts = 1_300

timestamps = %{
base_no => base_ts,
(base_no + 1) => base_ts,
(base_no + 2) => base_ts + 20,
(base_no + 3) => base_ts + 32

recorder = %TWCCRecorder{
| base_seq_no: base_no,
start_seq_no: base_no,
end_seq_no: base_no + 4,
timestamps: timestamps

assert {recorder, [feedback]} = TWCCRecorder.get_feedback(recorder)

assert %CC{
base_sequence_number: ^base_no,
reference_time: 5,
fb_pkt_count: 0,
packet_status_count: 4,
packet_chunks: [%CC.RunLength{status_symbol: :small_delta, run_length: 4}],
recv_deltas: [20, 0, 20, 12]
} = feedback

# no new packets -> no feedback
assert {_recorder, []} = TWCCRecorder.get_feedback(recorder)

test "packets out of order, with gaps" do
base_no = 578
base_ts = 1_250

timestamps = %{
base_no => base_ts + 20,
(base_no + 1) => base_ts + 25,
(base_no + 2) => base_ts + 8,
(base_no + 3) => base_ts,
(base_no + 5) => base_ts + 25

recorder = %TWCCRecorder{
| base_seq_no: base_no,
start_seq_no: base_no,
end_seq_no: base_no + 6,
timestamps: timestamps

assert {recorder, [feedback]} = TWCCRecorder.get_feedback(recorder)

symbols = [

assert %CC{
base_sequence_number: ^base_no,
reference_time: 4,
fb_pkt_count: 0,
packet_status_count: 6,
packet_chunks: [%CC.StatusVector{symbols: ^symbols}],
recv_deltas: [246, 5, -17, -8, 25]
} = feedback

assert {_recorder, []} = TWCCRecorder.get_feedback(recorder)

test "mixed chunks" do
base_no = 578
end_no = 634
packet_num = end_no - base_no + 1

recorder = TWCCRecorder.record_packet(@recorder, base_no)

recorder =
Enum.reduce((base_no + 2)..end_no, recorder, fn i, recorder ->
TWCCRecorder.record_packet(recorder, i)

assert {recorder, [feedback]} = TWCCRecorder.get_feedback(recorder)

assert %CC{
base_sequence_number: ^base_no,
fb_pkt_count: 0,
packet_status_count: ^packet_num,
packet_chunks: [chunk1, chunk2],
recv_deltas: deltas
} = feedback

symbols = [:small_delta, :not_received] ++ List.duplicate(:small_delta, 12)
assert %CC.StatusVector{symbols: ^symbols} = chunk1

run_length = packet_num - 14

assert %CC.RunLength{
status_symbol: :small_delta,
run_length: ^run_length
} = chunk2

assert length(deltas) == packet_num - 1

assert {_recorder, []} = TWCCRecorder.get_feedback(recorder)

test "split into two feedbacks" do
base_no = 578

recorder =
|> TWCCRecorder.record_packet(base_no)
|> TWCCRecorder.record_packet(base_no + 1)
|> TWCCRecorder.record_packet(base_no + 2)

# simulate huge delta between 3rd and 4th packet
base_no2 = base_no + 2
timestamps = Map.update!(recorder.timestamps, base_no2, &(&1 + 35_000))
recorder = %{recorder | timestamps: timestamps}

assert {recorder, [feedback1, feedback2]} = TWCCRecorder.get_feedback(recorder)

assert %CC{
base_sequence_number: ^base_no,
fb_pkt_count: 0,
packet_status_count: 2,
reference_time: ref_time1,
packet_chunks: [%CC.RunLength{status_symbol: :small_delta, run_length: 2}],
recv_deltas: [_d1, _d2]
} = feedback1

assert %CC{
base_sequence_number: ^base_no2,
fb_pkt_count: 1,
packet_status_count: 1,
reference_time: ref_time2,
packet_chunks: [%CC.RunLength{status_symbol: :small_delta, run_length: 1}],
recv_deltas: [_d3]
} = feedback2

# ref times should differ by about 136 * 64 ms
refute_in_delta ref_time1, ref_time2, 133
assert_in_delta ref_time1, ref_time2, 143

assert {_recorder, []} = TWCCRecorder.get_feedback(recorder)

