From 5bfecdf25440bc32d0d3e3c5de8368fa042e576b Mon Sep 17 00:00:00 2001 From: noarkhh Date: Thu, 30 Jan 2025 19:34:16 +0100 Subject: [PATCH 1/5] Jitter buffer incorporation prototype --- lib/membrane/rtp/demuxer.ex | 330 ++++++++++++++++-- .../rtp/jitter_buffer/buffer_store.ex | 76 ++-- 2 files changed, 330 insertions(+), 76 deletions(-) diff --git a/lib/membrane/rtp/demuxer.ex b/lib/membrane/rtp/demuxer.ex index 54e47d96..ead95239 100644 --- a/lib/membrane/rtp/demuxer.ex +++ b/lib/membrane/rtp/demuxer.ex @@ -13,7 +13,11 @@ defmodule Membrane.RTP.Demuxer do require Membrane.Pad require Membrane.Logger - alias Membrane.{Pad, RemoteStream, RTCP, RTP} + alias Membrane.Element.CallbackContext + alias Membrane.{Pad, RemoteStream, RTCP, RTP, Time} + alias Membrane.RTP.JitterBuffer.{BufferStore, Record} + + @max_timestamp Bitwise.bsl(1, 32) - 1 @typedoc """ Metadata present in each output buffer. The `ExRTP.Packet.t()` struct contains @@ -54,6 +58,18 @@ defmodule Membrane.RTP.Demuxer do then most likely it should be used, since it's unique for each stream. If it's not known (for example when the pad is being linked upfront), encoding or payload type should be provided, and the first identified stream of given encoding or payload type will be sent on this pad. """ + ], + clock_rate: [ + spec: RTP.clock_rate() | nil, + default: nil + ], + use_jitter_buffer: [ + spec: boolean(), + default: true + ], + jitter_buffer_latency: [ + spec: Membrane.Time.t(), + default: Membrane.Time.milliseconds(200) ] ] @@ -80,17 +96,42 @@ defmodule Membrane.RTP.Demuxer do alias Membrane.RTP @type t :: %__MODULE__{ - queued_buffers: [ExRTP.Packet.t()], + buffer_store: RTP.JitterBuffer.BufferStore.t(), end_of_stream_buffered: boolean(), phase: :waiting_for_link | :linked | :timed_out, payload_type: RTP.payload_type(), link_timer: reference() | nil, - pad: Pad.ref() | nil + pad: Pad.ref() | nil, + use_jitter_buffer: boolean() | nil, + clock_rate: non_neg_integer() | nil, + jitter_buffer_latency: Membrane.Time.t() | nil, + initial_latency_waiting: boolean(), + initialization_time: Membrane.Time.t(), + max_latency_timer: reference() | nil, + timestamp_base: Membrane.Time.t(), + previous_timestamp: Membrane.Time.t() } - @enforce_keys [:payload_type, :phase, :link_timer, :pad] + @enforce_keys [ + :payload_type, + :phase, + :link_timer, + :pad, + :timestamp_base, + :previous_timestamp, + :initialization_time + ] - defstruct @enforce_keys ++ [queued_buffers: [], end_of_stream_buffered: false] + defstruct @enforce_keys ++ + [ + buffer_store: %RTP.JitterBuffer.BufferStore{}, + end_of_stream_buffered: false, + initial_latency_waiting: true, + max_latency_timer: nil, + clock_rate: nil, + jitter_buffer_latency: nil, + use_jitter_buffer: nil + ] end @type t :: %__MODULE__{ @@ -115,9 +156,9 @@ defmodule Membrane.RTP.Demuxer do end @impl true - def handle_buffer(:input, %Membrane.Buffer{payload: payload}, _ctx, state) do + def handle_buffer(:input, %Membrane.Buffer{payload: payload}, ctx, state) do case classify_packet(payload) do - :rtp -> handle_rtp_packet(payload, state) + :rtp -> handle_rtp_packet(payload, ctx, state) :rtcp -> handle_rtcp_packets(payload, state) end end @@ -141,18 +182,49 @@ defmodule Membrane.RTP.Demuxer do %{phase: :waiting_for_link} -> Process.cancel_timer(stream_state.link_timer) - buffer_action = [buffer: {pad, Enum.reverse(stream_state.queued_buffers)}] + %{clock_rate: clock_rate} = + RTP.PayloadFormat.resolve( + payload_type: stream_state.payload_type, + clock_rate: ctx.pad_options.clock_rate, + payload_type_mapping: state.payload_type_mapping + ) + + stream_state = %State.StreamState{ + stream_state + | clock_rate: clock_rate, + jitter_buffer_latency: ctx.pad_options.jitter_buffer_latency, + use_jitter_buffer: ctx.pad_options.use_jitter_buffer + } + + time_since_initialization = Time.monotonic_time() - stream_state.initialization_time - end_of_stream_action = - if stream_state.end_of_stream_buffered, do: [end_of_stream: pad], else: [] + if time_since_initialization < stream_state.jitter_buffer_latency do + initial_latency_left = stream_state.jitter_buffer_latency - time_since_initialization + + Process.send_after( + self(), + {:initial_latency_passed, matching_stream_ssrc}, + initial_latency_left + ) + end + + stream_state = %State.StreamState{stream_state | phase: :linked, pad: pad} + {buffer_actions, stream_state} = send_buffers(matching_stream_ssrc, stream_state) + + {end_of_stream_actions, stream_state} = + if stream_state.end_of_stream_buffered do + get_end_of_stream_actions(stream_state) + else + {[], stream_state} + end state = put_in( state.stream_states[matching_stream_ssrc], - %{stream_state | phase: :linked, queued_buffers: [], pad: pad} + %State.StreamState{stream_state | phase: :linked, pad: pad} ) - {[stream_format: {pad, %RTP{}}] ++ buffer_action ++ end_of_stream_action, state} + {[stream_format: {pad, %RTP{}}] ++ buffer_actions ++ end_of_stream_actions, state} %{phase: :timed_out} -> Membrane.Logger.warning( @@ -179,19 +251,40 @@ defmodule Membrane.RTP.Demuxer do end state = put_in(state.stream_states[ssrc].phase, :timed_out) - state = put_in(state.stream_states[ssrc].queued_buffers, []) + state = put_in(state.stream_states[ssrc].buffer_store, %BufferStore{}) {[], state} end + @impl true + def handle_info({:initial_latency_passed, ssrc}, _context, state) do + {actions, stream_state} = + send_buffers(ssrc, %State.StreamState{ + state.stream_states[ssrc] + | initial_latency_waiting: false + }) + + state = put_in(state.stream_states[ssrc], stream_state) + {actions, state} + end + + @impl true + def handle_info({:send_buffers, ssrc}, _context, state) do + {actions, stream_state} = + send_buffers(ssrc, %State.StreamState{state.stream_states[ssrc] | max_latency_timer: nil}) + + state = put_in(state.stream_states[ssrc], stream_state) + {actions, state} + end + @impl true def handle_end_of_stream(:input, _ctx, state) do - state = - state.stream_states - |> Enum.reduce(state, fn {ssrc, _stream_state}, state -> - put_in(state.stream_states[ssrc].end_of_stream_buffered, true) - end) + state.stream_states + |> Enum.flat_map_reduce(state, fn {ssrc, _stream_state}, state -> + {actions, stream_state} = get_end_of_stream_actions(state.stream_states[ssrc]) - {[forward: :end_of_stream], state} + state = put_in(state.stream_states[ssrc], %{stream_state | end_of_stream_buffered: true}) + {actions, state} + end) end @spec classify_packet(binary()) :: :rtp | :rtcp @@ -201,16 +294,16 @@ defmodule Membrane.RTP.Demuxer do else: :rtp end - @spec handle_rtp_packet(binary(), State.t()) :: + @spec handle_rtp_packet(binary(), CallbackContext.t(), State.t()) :: {[Membrane.Element.Action.t()], State.t()} - defp handle_rtp_packet(raw_rtp_packet, state) do + defp handle_rtp_packet(raw_rtp_packet, ctx, state) do {:ok, packet} = ExRTP.Packet.decode(raw_rtp_packet) {new_stream_actions, state} = if Map.has_key?(state.stream_states, packet.ssrc) do {[], state} else - initialize_new_stream_state(packet, state) + initialize_new_stream_state(packet, ctx, state) end buffer = %Membrane.Buffer{ @@ -218,34 +311,96 @@ defmodule Membrane.RTP.Demuxer do metadata: %{rtp: %{packet | payload: <<>>}} } - {buffer_actions, state} = - case state.stream_states[packet.ssrc].phase do - :waiting_for_link -> - state = update_in(state.stream_states[packet.ssrc].queued_buffers, &[buffer | &1]) - {[], state} + stream_state = + state.stream_states[packet.ssrc] + |> insert_buffer_into_buffer_store(buffer) - :linked -> - buffer_action = - {:buffer, {state.stream_states[packet.ssrc].pad, buffer}} + {buffer_actions, stream_state} = + maybe_send_buffers(packet.ssrc, stream_state) - {[buffer_action], state} + state = put_in(state.stream_states[packet.ssrc], stream_state) + # {buffer_actions, state} = + # case stream_state.phase do + # :waiting_for_link -> + # stream_state = + # case BufferStore.insert_buffer(stream_state.buffer_store, buffer) do + # {:ok, buffer_store} -> + # %State.StreamState{stream_state | buffer_store: buffer_store} - :timed_out -> - {[], state} - end + # {:error, :late_packet} -> + # Membrane.Logger.debug("Late packet has arrived") + # stream_state + # end + + # state = put_in(state.stream_states[packet.ssrc], stream_state) + # {[], state} + + # :linked -> + # buffer_action = + # {:buffer, {state.stream_states[packet.ssrc].pad, buffer}} + + # {[buffer_action], state} + + # :timed_out -> + # {[], state} + # end {new_stream_actions ++ buffer_actions, state} end + @spec insert_buffer_into_buffer_store(State.StreamState.t(), Membrane.Buffer.t()) :: + State.StreamState.t() + defp insert_buffer_into_buffer_store(stream_state, buffer) do + case stream_state.phase do + :timed_out -> + stream_state + + phase when phase in [:waiting_for_link, :linked] -> + case BufferStore.insert_buffer(stream_state.buffer_store, buffer) do + {:ok, buffer_store} -> + %State.StreamState{stream_state | buffer_store: buffer_store} + + {:error, :late_packet} -> + Membrane.Logger.debug("Late packet has arrived") + stream_state + end + end + end + + @spec maybe_send_buffers(RTP.ssrc(), State.StreamState.t()) :: + {[Membrane.Element.Action.t()], State.StreamState.t()} + defp maybe_send_buffers(ssrc, stream_state) do + if stream_state.phase == :linked and not stream_state.initial_latency_waiting do + send_buffers(ssrc, stream_state) + else + {[], stream_state} + end + end + + @spec get_end_of_stream_actions(State.StreamState.t()) :: + {[Membrane.Element.Action.t()], State.StreamState.t()} + defp get_end_of_stream_actions(%State.StreamState{pad: nil} = stream_state) do + {[], stream_state} + end + + defp get_end_of_stream_actions(stream_state) do + {buffer_actions, stream_state} = + stream_state.buffer_store + |> BufferStore.dump() + |> Enum.flat_map_reduce(stream_state, &record_to_actions/2) + + {buffer_actions ++ [end_of_stream: stream_state.pad], stream_state} + end + @spec handle_rtcp_packets(binary(), State.t()) :: {[Membrane.Element.Action.t()], State.t()} defp handle_rtcp_packets(rtcp_packets, state) do Membrane.Logger.debug_verbose("Received RTCP Packet(s): #{inspect(rtcp_packets)}") {[], state} end - @spec initialize_new_stream_state(ExRTP.Packet.t(), State.t()) :: + @spec initialize_new_stream_state(ExRTP.Packet.t(), CallbackContext.t(), State.t()) :: {[Membrane.Element.Action.t()], State.t()} - defp initialize_new_stream_state(packet, state) do + defp initialize_new_stream_state(packet, ctx, state) do case find_matching_pad_for_stream( packet, state.pads_waiting_for_stream, @@ -261,7 +416,10 @@ defmodule Membrane.RTP.Demuxer do Membrane.Time.as_milliseconds(state.not_linked_pad_handling.timeout, :round) ), payload_type: packet.payload_type, - pad: nil + pad: nil, + timestamp_base: packet.timestamp, + previous_timestamp: packet.timestamp, + initialization_time: Membrane.Time.monotonic_time() } state = put_in(state.stream_states[packet.ssrc], stream_state) @@ -269,13 +427,34 @@ defmodule Membrane.RTP.Demuxer do {[notify_parent: create_new_stream_notification(packet)], state} pad_waiting_for_stream -> + pad_options = ctx.pads[pad_waiting_for_stream].options + + %{clock_rate: clock_rate} = + RTP.PayloadFormat.resolve( + payload_type: packet.payload_type, + clock_rate: pad_options.clock_rate, + payload_type_mapping: state.payload_type_mapping + ) + stream_state = %State.StreamState{ phase: :linked, link_timer: nil, payload_type: packet.payload_type, - pad: pad_waiting_for_stream + pad: pad_waiting_for_stream, + timestamp_base: packet.timestamp, + previous_timestamp: packet.timestamp, + initialization_time: Membrane.Time.monotonic_time(), + jitter_buffer_latency: pad_options.jitter_buffer_latency, + use_jitter_buffer: pad_options.use_jitter_buffer, + clock_rate: clock_rate } + Process.send_after( + self(), + {:initial_latency_passed, packet.ssrc}, + pad_options.jitter_buffer_latency + ) + state = put_in(state.stream_states[packet.ssrc], stream_state) {_stream_id, state} = pop_in(state.pads_waiting_for_stream[pad_waiting_for_stream]) @@ -360,4 +539,79 @@ defmodule Membrane.RTP.Demuxer do {:new_rtp_stream, %{ssrc: packet.ssrc, payload_type: packet.payload_type, extensions: extensions}} end + + @spec send_buffers(RTP.ssrc(), State.StreamState.t()) :: + {[Membrane.Element.Action.t()], State.StreamState.t()} + defp send_buffers(ssrc, stream_state) do + # Flushes buffers that stayed in queue longer than latency and any gaps before them + {too_old_records, buffer_store} = + BufferStore.flush_older_than(stream_state.buffer_store, stream_state.jitter_buffer_latency) + + # Additionally, flush buffers as long as there are no gaps + {buffers, buffer_store} = BufferStore.flush_ordered(buffer_store) + + {actions, stream_state} = + (too_old_records ++ buffers) |> Enum.flat_map_reduce(stream_state, &record_to_actions/2) + + stream_state = set_timer(ssrc, %{stream_state | buffer_store: buffer_store}) + + {actions, stream_state} + end + + @spec set_timer(RTP.ssrc(), State.StreamState.t()) :: State.StreamState.t() + defp set_timer( + ssrc, + %State.StreamState{max_latency_timer: nil, jitter_buffer_latency: latency} = + stream_state + ) do + new_timer = + case BufferStore.first_record_timestamp(stream_state.buffer_store) do + nil -> + nil + + buffer_ts -> + since_insertion = Time.monotonic_time() - buffer_ts + send_after_time = max(0, latency - since_insertion) |> Time.as_milliseconds(:round) + Process.send_after(self(), {:send_buffers, ssrc}, send_after_time) + end + + %State.StreamState{stream_state | max_latency_timer: new_timer} + end + + defp set_timer(_ssrc, %State.StreamState{max_latency_timer: timer} = stream_state) + when timer != nil do + stream_state + end + + defp record_to_actions(nil, stream_state) do + action = [event: {:output, %Membrane.Event.Discontinuity{}}] + {action, stream_state} + end + + defp record_to_actions(%Record{buffer: buffer}, stream_state) do + rtp_timestamp = buffer.metadata.rtp.timestamp + + # timestamps in RTP don't have to be monotonic therefore there can be + # a situation where in 2 consecutive packets the latter packet will have smaller timestamp + # than the previous one while not overflowing the timestamp number + # https://datatracker.ietf.org/doc/html/rfc3550#section-5.1 + + timestamp_base = + case RTP.Utils.from_which_rollover( + stream_state.previous_timestamp, + rtp_timestamp, + @max_timestamp + ) do + :next -> stream_state.timestamp_base - @max_timestamp + :previous -> stream_state.timestamp_base + @max_timestamp + :current -> stream_state.timestamp_base + end + + timestamp = div(Time.seconds(rtp_timestamp - timestamp_base), stream_state.clock_rate) + buffer = %Membrane.Buffer{buffer | pts: timestamp} + # An empty buffer is usually a WebRTC probing packet that should be skipped. + actions = if buffer.payload == <<>>, do: [], else: [buffer: {stream_state.pad, buffer}] + state = %{stream_state | timestamp_base: timestamp_base, previous_timestamp: rtp_timestamp} + {actions, state} + end end diff --git a/lib/membrane/rtp/jitter_buffer/buffer_store.ex b/lib/membrane/rtp/jitter_buffer/buffer_store.ex index f0f77d9f..0b247236 100644 --- a/lib/membrane/rtp/jitter_buffer/buffer_store.ex +++ b/lib/membrane/rtp/jitter_buffer/buffer_store.ex @@ -61,44 +61,6 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do do_insert_buffer(store, buffer, seq_num) end - @spec do_insert_buffer(t(), Buffer.t(), RTP.Header.sequence_number_t()) :: - {:ok, t()} | {:error, insert_error()} - defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, 0) do - store = add_record(store, Record.new(buffer, @seq_number_limit), :next) - {:ok, %__MODULE__{store | flush_index: @seq_number_limit - 1}} - end - - defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, seq_num) do - store = add_record(store, Record.new(buffer, seq_num), :current) - {:ok, %__MODULE__{store | flush_index: seq_num - 1}} - end - - defp do_insert_buffer( - %__MODULE__{ - flush_index: flush_index, - highest_incoming_index: highest_incoming_index, - rollover_count: roc - } = store, - buffer, - seq_num - ) do - highest_seq_num = rem(highest_incoming_index, @seq_number_limit) - - {rollover, index} = - case Utils.from_which_rollover(highest_seq_num, seq_num, @seq_number_limit) do - :current -> {:current, seq_num + roc * @seq_number_limit} - :previous -> {:previous, seq_num + (roc - 1) * @seq_number_limit} - :next -> {:next, seq_num + (roc + 1) * @seq_number_limit} - end - - if fresh_packet?(flush_index, index) do - record = Record.new(buffer, index) - {:ok, add_record(store, record, rollover)} - else - {:error, :late_packet} - end - end - @doc """ Flushes the store to the buffer with the next sequence number. @@ -176,6 +138,44 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do end end + @spec do_insert_buffer(t(), Buffer.t(), RTP.Header.sequence_number_t()) :: + {:ok, t()} | {:error, insert_error()} + defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, 0) do + store = add_record(store, Record.new(buffer, @seq_number_limit), :next) + {:ok, %__MODULE__{store | flush_index: @seq_number_limit - 1}} + end + + defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, seq_num) do + store = add_record(store, Record.new(buffer, seq_num), :current) + {:ok, %__MODULE__{store | flush_index: seq_num - 1}} + end + + defp do_insert_buffer( + %__MODULE__{ + flush_index: flush_index, + highest_incoming_index: highest_incoming_index, + rollover_count: roc + } = store, + buffer, + seq_num + ) do + highest_seq_num = rem(highest_incoming_index, @seq_number_limit) + + {rollover, index} = + case Utils.from_which_rollover(highest_seq_num, seq_num, @seq_number_limit) do + :current -> {:current, seq_num + roc * @seq_number_limit} + :previous -> {:previous, seq_num + (roc - 1) * @seq_number_limit} + :next -> {:next, seq_num + (roc + 1) * @seq_number_limit} + end + + if fresh_packet?(flush_index, index) do + record = Record.new(buffer, index) + {:ok, add_record(store, record, rollover)} + else + {:error, :late_packet} + end + end + defp fresh_packet?(flush_index, index), do: index > flush_index @spec flush_while(t, (t, Record.t() -> boolean), [Record.t() | nil]) :: From 4cfb754e017c51b4e6f4e68cb0a7e2b0beddd25a Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 5 Mar 2025 16:12:22 +0100 Subject: [PATCH 2/5] Integrate jitter buffer with the demuxer --- lib/membrane/rtp/demuxer.ex | 237 ++++++++---------- lib/membrane/rtp/jitter_buffer.ex | 5 +- .../rtp/jitter_buffer/buffer_store.ex | 2 +- lib/membrane/rtp/muxer.ex | 7 +- .../rtp/demuxer_muxer_integration_test.exs | 30 ++- test/membrane/rtp/demuxer_test.exs | 63 ++++- .../rtp/muxer_demuxer_integration_test.exs | 4 - 7 files changed, 193 insertions(+), 155 deletions(-) diff --git a/lib/membrane/rtp/demuxer.ex b/lib/membrane/rtp/demuxer.ex index ead95239..a2f54ba0 100644 --- a/lib/membrane/rtp/demuxer.ex +++ b/lib/membrane/rtp/demuxer.ex @@ -98,11 +98,10 @@ defmodule Membrane.RTP.Demuxer do @type t :: %__MODULE__{ buffer_store: RTP.JitterBuffer.BufferStore.t(), end_of_stream_buffered: boolean(), - phase: :waiting_for_link | :linked | :timed_out, + phase: :waiting_for_matching_pad | :matched_with_pad | :timed_out, + pad_match_timer: reference() | nil, payload_type: RTP.payload_type(), - link_timer: reference() | nil, pad: Pad.ref() | nil, - use_jitter_buffer: boolean() | nil, clock_rate: non_neg_integer() | nil, jitter_buffer_latency: Membrane.Time.t() | nil, initial_latency_waiting: boolean(), @@ -115,9 +114,10 @@ defmodule Membrane.RTP.Demuxer do @enforce_keys [ :payload_type, :phase, - :link_timer, + :pad_match_timer, :pad, :timestamp_base, + :initial_latency_waiting, :previous_timestamp, :initialization_time ] @@ -126,11 +126,9 @@ defmodule Membrane.RTP.Demuxer do [ buffer_store: %RTP.JitterBuffer.BufferStore{}, end_of_stream_buffered: false, - initial_latency_waiting: true, max_latency_timer: nil, clock_rate: nil, - jitter_buffer_latency: nil, - use_jitter_buffer: nil + jitter_buffer_latency: nil ] end @@ -179,8 +177,15 @@ defmodule Membrane.RTP.Demuxer do state = put_in(state.pads_waiting_for_stream[pad], ctx.pad_options.stream_id) {[], state} - %{phase: :waiting_for_link} -> - Process.cancel_timer(stream_state.link_timer) + %{phase: :timed_out} -> + Membrane.Logger.warning( + "Connected a pad corresponding to a timed out stream, sending end_of_stream" + ) + + {[end_of_stream: pad], state} + + %{phase: :waiting_for_matching_pad} -> + Process.cancel_timer(stream_state.pad_match_timer) %{clock_rate: clock_rate} = RTP.PayloadFormat.resolve( @@ -189,26 +194,30 @@ defmodule Membrane.RTP.Demuxer do payload_type_mapping: state.payload_type_mapping ) + jitter_buffer_latency = + if ctx.pad_options.use_jitter_buffer, do: ctx.pad_options.jitter_buffer_latency, else: 0 + stream_state = %State.StreamState{ stream_state - | clock_rate: clock_rate, - jitter_buffer_latency: ctx.pad_options.jitter_buffer_latency, - use_jitter_buffer: ctx.pad_options.use_jitter_buffer + | phase: :matched_with_pad, + pad: pad, + clock_rate: clock_rate, + jitter_buffer_latency: jitter_buffer_latency } time_since_initialization = Time.monotonic_time() - stream_state.initialization_time - if time_since_initialization < stream_state.jitter_buffer_latency do + if time_since_initialization < stream_state.jitter_buffer_latency and + ctx.pad_options.use_jitter_buffer do initial_latency_left = stream_state.jitter_buffer_latency - time_since_initialization Process.send_after( self(), {:initial_latency_passed, matching_stream_ssrc}, - initial_latency_left + Membrane.Time.as_milliseconds(initial_latency_left, :round) ) end - stream_state = %State.StreamState{stream_state | phase: :linked, pad: pad} {buffer_actions, stream_state} = send_buffers(matching_stream_ssrc, stream_state) {end_of_stream_actions, stream_state} = @@ -218,25 +227,14 @@ defmodule Membrane.RTP.Demuxer do {[], stream_state} end - state = - put_in( - state.stream_states[matching_stream_ssrc], - %State.StreamState{stream_state | phase: :linked, pad: pad} - ) + state = put_in(state.stream_states[matching_stream_ssrc], stream_state) {[stream_format: {pad, %RTP{}}] ++ buffer_actions ++ end_of_stream_actions, state} - - %{phase: :timed_out} -> - Membrane.Logger.warning( - "Connected a pad corresponding to a timed out stream, sending end_of_stream" - ) - - {[end_of_stream: pad], state} end end @impl true - def handle_info({:link_timeout, ssrc}, _ctx, state) do + def handle_info({:pad_match_timeout, ssrc}, _ctx, state) do case state.not_linked_pad_handling.action do :raise -> raise "Pad corresponding to ssrc #{ssrc} not connected within specified timeout" @@ -303,7 +301,12 @@ defmodule Membrane.RTP.Demuxer do if Map.has_key?(state.stream_states, packet.ssrc) do {[], state} else - initialize_new_stream_state(packet, ctx, state) + find_matching_pad_for_stream( + packet, + state.pads_waiting_for_stream, + state.payload_type_mapping + ) + |> initialize_new_stream_state(packet, ctx, state) end buffer = %Membrane.Buffer{ @@ -311,39 +314,11 @@ defmodule Membrane.RTP.Demuxer do metadata: %{rtp: %{packet | payload: <<>>}} } - stream_state = - state.stream_states[packet.ssrc] - |> insert_buffer_into_buffer_store(buffer) + stream_state = insert_buffer_into_buffer_store(state.stream_states[packet.ssrc], buffer) - {buffer_actions, stream_state} = - maybe_send_buffers(packet.ssrc, stream_state) + {buffer_actions, stream_state} = maybe_send_buffers(packet.ssrc, stream_state) state = put_in(state.stream_states[packet.ssrc], stream_state) - # {buffer_actions, state} = - # case stream_state.phase do - # :waiting_for_link -> - # stream_state = - # case BufferStore.insert_buffer(stream_state.buffer_store, buffer) do - # {:ok, buffer_store} -> - # %State.StreamState{stream_state | buffer_store: buffer_store} - - # {:error, :late_packet} -> - # Membrane.Logger.debug("Late packet has arrived") - # stream_state - # end - - # state = put_in(state.stream_states[packet.ssrc], stream_state) - # {[], state} - - # :linked -> - # buffer_action = - # {:buffer, {state.stream_states[packet.ssrc].pad, buffer}} - - # {[buffer_action], state} - - # :timed_out -> - # {[], state} - # end {new_stream_actions ++ buffer_actions, state} end @@ -355,7 +330,7 @@ defmodule Membrane.RTP.Demuxer do :timed_out -> stream_state - phase when phase in [:waiting_for_link, :linked] -> + phase when phase in [:waiting_for_matching_pad, :matched_with_pad] -> case BufferStore.insert_buffer(stream_state.buffer_store, buffer) do {:ok, buffer_store} -> %State.StreamState{stream_state | buffer_store: buffer_store} @@ -370,7 +345,7 @@ defmodule Membrane.RTP.Demuxer do @spec maybe_send_buffers(RTP.ssrc(), State.StreamState.t()) :: {[Membrane.Element.Action.t()], State.StreamState.t()} defp maybe_send_buffers(ssrc, stream_state) do - if stream_state.phase == :linked and not stream_state.initial_latency_waiting do + if stream_state.phase == :matched_with_pad and not stream_state.initial_latency_waiting do send_buffers(ssrc, stream_state) else {[], stream_state} @@ -389,7 +364,8 @@ defmodule Membrane.RTP.Demuxer do |> BufferStore.dump() |> Enum.flat_map_reduce(stream_state, &record_to_actions/2) - {buffer_actions ++ [end_of_stream: stream_state.pad], stream_state} + {buffer_actions ++ [end_of_stream: stream_state.pad], + %State.StreamState{stream_state | buffer_store: %BufferStore{}}} end @spec handle_rtcp_packets(binary(), State.t()) :: {[Membrane.Element.Action.t()], State.t()} @@ -398,68 +374,72 @@ defmodule Membrane.RTP.Demuxer do {[], state} end - @spec initialize_new_stream_state(ExRTP.Packet.t(), CallbackContext.t(), State.t()) :: - {[Membrane.Element.Action.t()], State.t()} - defp initialize_new_stream_state(packet, ctx, state) do - case find_matching_pad_for_stream( - packet, - state.pads_waiting_for_stream, - state.payload_type_mapping - ) do - nil -> - stream_state = %State.StreamState{ - phase: :waiting_for_link, - link_timer: - Process.send_after( - self(), - {:link_timeout, packet.ssrc}, - Membrane.Time.as_milliseconds(state.not_linked_pad_handling.timeout, :round) - ), - payload_type: packet.payload_type, - pad: nil, - timestamp_base: packet.timestamp, - previous_timestamp: packet.timestamp, - initialization_time: Membrane.Time.monotonic_time() - } + @spec initialize_new_stream_state( + matching_pad :: Pad.ref() | nil, + ExRTP.Packet.t(), + CallbackContext.t(), + State.t() + ) :: {[Membrane.Element.Action.t()], State.t()} + defp initialize_new_stream_state(nil, packet, _ctx, state) do + stream_state = %State.StreamState{ + phase: :waiting_for_matching_pad, + pad_match_timer: + Process.send_after( + self(), + {:pad_match_timeout, packet.ssrc}, + Membrane.Time.as_milliseconds(state.not_linked_pad_handling.timeout, :round) + ), + payload_type: packet.payload_type, + pad: nil, + timestamp_base: packet.timestamp, + previous_timestamp: packet.timestamp, + initialization_time: Membrane.Time.monotonic_time(), + initial_latency_waiting: true + } - state = put_in(state.stream_states[packet.ssrc], stream_state) + state = put_in(state.stream_states[packet.ssrc], stream_state) - {[notify_parent: create_new_stream_notification(packet)], state} + {[notify_parent: create_new_stream_notification(packet)], state} + end - pad_waiting_for_stream -> - pad_options = ctx.pads[pad_waiting_for_stream].options + defp initialize_new_stream_state(pad_waiting_for_stream, packet, ctx, state) do + pad_options = ctx.pads[pad_waiting_for_stream].options - %{clock_rate: clock_rate} = - RTP.PayloadFormat.resolve( - payload_type: packet.payload_type, - clock_rate: pad_options.clock_rate, - payload_type_mapping: state.payload_type_mapping - ) - - stream_state = %State.StreamState{ - phase: :linked, - link_timer: nil, - payload_type: packet.payload_type, - pad: pad_waiting_for_stream, - timestamp_base: packet.timestamp, - previous_timestamp: packet.timestamp, - initialization_time: Membrane.Time.monotonic_time(), - jitter_buffer_latency: pad_options.jitter_buffer_latency, - use_jitter_buffer: pad_options.use_jitter_buffer, - clock_rate: clock_rate - } + jitter_buffer_latency = + if pad_options.use_jitter_buffer, do: pad_options.jitter_buffer_latency, else: 0 - Process.send_after( - self(), - {:initial_latency_passed, packet.ssrc}, - pad_options.jitter_buffer_latency - ) + %{clock_rate: clock_rate} = + RTP.PayloadFormat.resolve( + payload_type: packet.payload_type, + clock_rate: pad_options.clock_rate, + payload_type_mapping: state.payload_type_mapping + ) - state = put_in(state.stream_states[packet.ssrc], stream_state) - {_stream_id, state} = pop_in(state.pads_waiting_for_stream[pad_waiting_for_stream]) + stream_state = %State.StreamState{ + phase: :matched_with_pad, + pad_match_timer: nil, + payload_type: packet.payload_type, + pad: pad_waiting_for_stream, + timestamp_base: packet.timestamp, + previous_timestamp: packet.timestamp, + initialization_time: Membrane.Time.monotonic_time(), + jitter_buffer_latency: jitter_buffer_latency, + clock_rate: clock_rate, + initial_latency_waiting: pad_options.use_jitter_buffer + } - {[stream_format: {pad_waiting_for_stream, %RTP{}}], state} + if pad_options.use_jitter_buffer do + Process.send_after( + self(), + {:initial_latency_passed, packet.ssrc}, + Membrane.Time.as_milliseconds(pad_options.jitter_buffer_latency, :round) + ) end + + state = put_in(state.stream_states[packet.ssrc], stream_state) + {_stream_id, state} = pop_in(state.pads_waiting_for_stream[pad_waiting_for_stream]) + + {[stream_format: {pad_waiting_for_stream, %RTP{}}], state} end @spec find_matching_pad_for_stream( @@ -543,17 +523,16 @@ defmodule Membrane.RTP.Demuxer do @spec send_buffers(RTP.ssrc(), State.StreamState.t()) :: {[Membrane.Element.Action.t()], State.StreamState.t()} defp send_buffers(ssrc, stream_state) do - # Flushes buffers that stayed in queue longer than latency and any gaps before them {too_old_records, buffer_store} = BufferStore.flush_older_than(stream_state.buffer_store, stream_state.jitter_buffer_latency) - # Additionally, flush buffers as long as there are no gaps {buffers, buffer_store} = BufferStore.flush_ordered(buffer_store) + stream_state = %{stream_state | buffer_store: buffer_store} {actions, stream_state} = (too_old_records ++ buffers) |> Enum.flat_map_reduce(stream_state, &record_to_actions/2) - stream_state = set_timer(ssrc, %{stream_state | buffer_store: buffer_store}) + stream_state = set_timer(ssrc, stream_state) {actions, stream_state} end @@ -561,9 +540,9 @@ defmodule Membrane.RTP.Demuxer do @spec set_timer(RTP.ssrc(), State.StreamState.t()) :: State.StreamState.t() defp set_timer( ssrc, - %State.StreamState{max_latency_timer: nil, jitter_buffer_latency: latency} = - stream_state - ) do + %State.StreamState{max_latency_timer: nil, jitter_buffer_latency: latency} = stream_state + ) + when latency > 0 do new_timer = case BufferStore.first_record_timestamp(stream_state.buffer_store) do nil -> @@ -571,20 +550,24 @@ defmodule Membrane.RTP.Demuxer do buffer_ts -> since_insertion = Time.monotonic_time() - buffer_ts - send_after_time = max(0, latency - since_insertion) |> Time.as_milliseconds(:round) - Process.send_after(self(), {:send_buffers, ssrc}, send_after_time) + send_after_time = Time.as_milliseconds(latency - since_insertion, :round) + + if send_after_time > 0 do + Process.send_after(self(), {:send_buffers, ssrc}, send_after_time) + else + nil + end end %State.StreamState{stream_state | max_latency_timer: new_timer} end - defp set_timer(_ssrc, %State.StreamState{max_latency_timer: timer} = stream_state) - when timer != nil do + defp set_timer(_ssrc, stream_state) do stream_state end defp record_to_actions(nil, stream_state) do - action = [event: {:output, %Membrane.Event.Discontinuity{}}] + action = [event: {stream_state.pad, %Membrane.Event.Discontinuity{}}] {action, stream_state} end diff --git a/lib/membrane/rtp/jitter_buffer.ex b/lib/membrane/rtp/jitter_buffer.ex index 86621680..c1aa8693 100644 --- a/lib/membrane/rtp/jitter_buffer.ex +++ b/lib/membrane/rtp/jitter_buffer.ex @@ -148,8 +148,9 @@ defmodule Membrane.RTP.JitterBuffer do buffer_ts -> since_insertion = Time.monotonic_time() - buffer_ts - send_after_time = max(0, latency - since_insertion) |> Time.as_milliseconds(:round) - Process.send_after(self(), :send_buffers, send_after_time) + send_after_time = (latency - since_insertion) |> Time.as_milliseconds(:round) + # + if send_after_time > 0, do: Process.send_after(self(), :send_buffers, send_after_time) end %State{state | max_latency_timer: new_timer} diff --git a/lib/membrane/rtp/jitter_buffer/buffer_store.ex b/lib/membrane/rtp/jitter_buffer/buffer_store.ex index 0b247236..727bff9b 100644 --- a/lib/membrane/rtp/jitter_buffer/buffer_store.ex +++ b/lib/membrane/rtp/jitter_buffer/buffer_store.ex @@ -121,7 +121,7 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do @doc """ Returns all buffers that are stored in the `BufferStore`. """ - @spec dump(t()) :: [Record.t()] + @spec dump(t()) :: [Record.t() | nil] def dump(%__MODULE__{} = store) do {records, _store} = flush_while(store, fn _store, _record -> true end) records diff --git a/lib/membrane/rtp/muxer.ex b/lib/membrane/rtp/muxer.ex index 6b30548d..44b6633d 100644 --- a/lib/membrane/rtp/muxer.ex +++ b/lib/membrane/rtp/muxer.ex @@ -103,8 +103,11 @@ defmodule Membrane.RTP.Muxer do new_stream_state = %State.StreamState{ ssrc: ssrc, - sequence_number: Enum.random(0..@max_sequence_number), - initial_timestamp: Enum.random(0..@max_timestamp), + # sequence_number: Enum.random(0..@max_sequence_number), + # initial_timestamp: Enum.random(0..@max_timestamp), + # sequence_number: 0, + sequence_number: @max_sequence_number - 1, + initial_timestamp: 0, clock_rate: clock_rate, payload_type: payload_type } diff --git a/test/membrane/rtp/demuxer_muxer_integration_test.exs b/test/membrane/rtp/demuxer_muxer_integration_test.exs index 1399309b..3df0f480 100644 --- a/test/membrane/rtp/demuxer_muxer_integration_test.exs +++ b/test/membrane/rtp/demuxer_muxer_integration_test.exs @@ -10,6 +10,9 @@ defmodule Membrane.RTP.DemuxerMuxerTest do packets: 862 } + @max_sequence_number Bitwise.bsl(1, 16) - 1 + @max_timestamp Bitwise.bsl(1, 32) - 1 + defmodule ReferencePipeline do use Membrane.Pipeline @@ -45,13 +48,11 @@ defmodule Membrane.RTP.DemuxerMuxerTest do _ctx, state ) do - %{encoding_name: encoding_name, clock_rate: clock_rate} = - Membrane.RTP.PayloadFormat.get_payload_type_mapping(pt) + %{encoding_name: encoding_name} = Membrane.RTP.PayloadFormat.get_payload_type_mapping(pt) spec = get_child(:rtp_demuxer) |> via_out(:output, options: [stream_id: {:ssrc, ssrc}]) - |> child({:jitter_buffer, ssrc}, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate}) |> via_in(:input, options: [encoding: encoding_name]) |> get_child(:rtp_muxer) @@ -78,7 +79,8 @@ defmodule Membrane.RTP.DemuxerMuxerTest do reference_normalized_packets = get_normalized_packets(reference_pipeline, @rtp_input.packets) subject_normalized_packets = get_normalized_packets(subject_pipeline, @rtp_input.packets) - assert reference_normalized_packets == subject_normalized_packets + assert reference_normalized_packets[:H264] == subject_normalized_packets[:H264] + assert reference_normalized_packets[:AAC] == subject_normalized_packets[:AAC] assert_end_of_stream(reference_pipeline, :sink) assert_end_of_stream(subject_pipeline, :sink) @@ -99,25 +101,27 @@ defmodule Membrane.RTP.DemuxerMuxerTest do packet end) |> Enum.group_by(& &1.payload_type) - |> Map.new(fn {payload_type, payload_type_buffers} -> + |> Map.new(fn {payload_type, packets} -> %{encoding_name: encoding_name} = RTP.PayloadFormat.get_payload_type_mapping(payload_type) - sorted_packets = - payload_type_buffers - |> Enum.sort(&(&1.sequence_number < &2.sequence_number)) - %{ssrc: ssrc, sequence_number: first_sequence_number, timestamp: first_timestamp} = - List.first(sorted_packets) + List.first(packets) normalized_packets = - sorted_packets + packets |> Enum.map(fn packet -> %{ packet | ssrc: packet.ssrc - ssrc, - sequence_number: packet.sequence_number - first_sequence_number, + # modulo to account for wrapping + sequence_number: + Integer.mod( + packet.sequence_number - first_sequence_number, + @max_sequence_number + 1 + ), # round to ignore insignificant differences in timestamps - timestamp: round((packet.timestamp - first_timestamp) / 10) + timestamp: + round(Integer.mod(packet.timestamp - first_timestamp, @max_timestamp + 1) / 10) } end) diff --git a/test/membrane/rtp/demuxer_test.exs b/test/membrane/rtp/demuxer_test.exs index d2af8271..17192b43 100644 --- a/test/membrane/rtp/demuxer_test.exs +++ b/test/membrane/rtp/demuxer_test.exs @@ -10,6 +10,34 @@ defmodule Membrane.RTP.DemuxerTest do video: %{ssrc: 670_572_639, packets: 842, payload_type: 96} } + defmodule Reorderer do + use Membrane.Filter + + def_input_pad :input, accepted_format: _any + def_output_pad :output, accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{buffers: []}} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + case state.buffers do + [first_buffer, second_buffer] -> + {[buffer: {:output, [first_buffer, buffer, second_buffer]}], %{buffers: []}} + + buffer_list -> + {[], %{state | buffers: buffer_list ++ [buffer]}} + end + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) do + {[buffer: {:output, state.buffers}, end_of_stream: :output], state} + end + end + defmodule UpfrontPayloadTypePipeline do use Membrane.Pipeline @@ -17,11 +45,22 @@ defmodule Membrane.RTP.DemuxerTest do def handle_init(_ctx, opts) do spec = [ child(:pcap_source, %Membrane.Pcap.Source{path: opts.pcap_path}) + |> child(if opts.reorder_packets, do: Reorderer, else: Membrane.Debug.Filter) |> child(:demuxer, Membrane.RTP.Demuxer) - |> via_out(:output, options: [stream_id: {:payload_type, opts.audio.payload_type}]) + |> via_out(:output, + options: [ + stream_id: {:payload_type, opts.audio.payload_type}, + jitter_buffer_latency: Membrane.Time.milliseconds(1) + ] + ) |> child({:sink, opts.audio.ssrc}, Testing.Sink), get_child(:demuxer) - |> via_out(:output, options: [stream_id: {:payload_type, opts.video.payload_type}]) + |> via_out(:output, + options: [ + stream_id: {:payload_type, opts.video.payload_type}, + jitter_buffer_latency: Membrane.Time.milliseconds(1) + ] + ) |> child({:sink, opts.video.ssrc}, Testing.Sink) ] @@ -60,6 +99,7 @@ defmodule Membrane.RTP.DemuxerTest do def handle_init(_ctx, opts) do spec = child(:pcap_source, %Membrane.Pcap.Source{path: opts.pcap_path}) + |> child(if opts.reorder_packets, do: Reorderer, else: Membrane.Debug.Filter) |> child(:demuxer, Membrane.RTP.Demuxer) {[spec: spec], %{}} @@ -76,9 +116,12 @@ defmodule Membrane.RTP.DemuxerTest do end end - defp perform_test(pipeline_module) do + defp perform_test(pipeline_module, reorder_packets \\ false) do pipeline = - Testing.Pipeline.start_supervised!(module: pipeline_module, custom_args: @rtp_input) + Testing.Pipeline.start_supervised!( + module: pipeline_module, + custom_args: Map.put(@rtp_input, :reorder_packets, reorder_packets) + ) %{audio: %{ssrc: audio_ssrc}, video: %{ssrc: video_ssrc}} = @rtp_input @@ -87,12 +130,12 @@ defmodule Membrane.RTP.DemuxerTest do assert_start_of_stream(pipeline, {:sink, ^video_ssrc}) assert_start_of_stream(pipeline, {:sink, ^audio_ssrc}) - 1..@rtp_input.video.packets + 1..(@rtp_input.video.packets - 0) |> Enum.each(fn _i -> assert_sink_buffer(pipeline, {:sink, video_ssrc}, %Membrane.Buffer{}) end) - 1..@rtp_input.audio.packets + 1..(@rtp_input.audio.packets - 0) |> Enum.each(fn _i -> assert_sink_buffer(pipeline, {:sink, audio_ssrc}, %Membrane.Buffer{}) end) @@ -114,5 +157,13 @@ defmodule Membrane.RTP.DemuxerTest do test "when it's pads were linked based on notifications received" do perform_test(DynamicPipeline) end + + test "when packets are reordered in a pipeline linked upfront" do + perform_test(UpfrontPayloadTypePipeline, true) + end + + test "when packets are reordered in a dynamic pipeline" do + perform_test(DynamicPipeline, true) + end end end diff --git a/test/membrane/rtp/muxer_demuxer_integration_test.exs b/test/membrane/rtp/muxer_demuxer_integration_test.exs index c5ac12cc..754035e7 100644 --- a/test/membrane/rtp/muxer_demuxer_integration_test.exs +++ b/test/membrane/rtp/muxer_demuxer_integration_test.exs @@ -2,7 +2,6 @@ defmodule Membrane.RTP.MuxerDemuxerTest do @moduledoc false use ExUnit.Case import Membrane.Testing.Assertions - alias Membrane.RTP alias Membrane.Testing @input_path "test/fixtures/rtp/h264/bun.h264" @@ -12,8 +11,6 @@ defmodule Membrane.RTP.MuxerDemuxerTest do @impl true def handle_init(_ctx, opts) do - %{clock_rate: clock_rate} = RTP.PayloadFormat.resolve(encoding_name: :H264) - spec = [ child(:source, %Membrane.File.Source{location: opts.input_path}) |> child(:h264_parser, %Membrane.H264.Parser{ @@ -25,7 +22,6 @@ defmodule Membrane.RTP.MuxerDemuxerTest do |> child(:rtp_muxer, Membrane.RTP.Muxer) |> child(:rtp_demuxer, Membrane.RTP.Demuxer) |> via_out(:output, options: [stream_id: {:encoding_name, :H264}]) - |> child(:jitter_buffer, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate}) |> child(:rtp_h264_depayloader, Membrane.RTP.H264.Depayloader) |> child(:sink, %Membrane.File.Sink{location: opts.output_path}) ] From 50af3da699d84e409b23d4100340d51f0d31d4c5 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 5 Mar 2025 16:36:06 +0100 Subject: [PATCH 3/5] Bump version, update readme --- README.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b9c1446b..40988261 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_rtp_plugin` to your list of dep ```elixir def deps do [ - {:membrane_rtp_plugin, "~> 0.30.0"}, + {:membrane_rtp_plugin, "~> 0.31.0"}, {:ex_libsrtp, ">= 0.0.0"} # required only if SRTP/SRTCP support is needed ] end diff --git a/mix.exs b/mix.exs index 190a3bc2..bbf64d73 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTP.Plugin.MixProject do use Mix.Project - @version "0.30.0" + @version "0.31.0" @github_url "https://github.com/membraneframework/membrane_rtp_plugin" def project do From 4e5927af5f51ac5e7460c7eeb664bfbe8cc33a75 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Thu, 6 Mar 2025 10:25:34 +0100 Subject: [PATCH 4/5] Revert temporary hardcoded values --- lib/membrane/rtp/muxer.ex | 7 ++----- test/membrane/rtp/demuxer_test.exs | 4 ++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/membrane/rtp/muxer.ex b/lib/membrane/rtp/muxer.ex index 44b6633d..6b30548d 100644 --- a/lib/membrane/rtp/muxer.ex +++ b/lib/membrane/rtp/muxer.ex @@ -103,11 +103,8 @@ defmodule Membrane.RTP.Muxer do new_stream_state = %State.StreamState{ ssrc: ssrc, - # sequence_number: Enum.random(0..@max_sequence_number), - # initial_timestamp: Enum.random(0..@max_timestamp), - # sequence_number: 0, - sequence_number: @max_sequence_number - 1, - initial_timestamp: 0, + sequence_number: Enum.random(0..@max_sequence_number), + initial_timestamp: Enum.random(0..@max_timestamp), clock_rate: clock_rate, payload_type: payload_type } diff --git a/test/membrane/rtp/demuxer_test.exs b/test/membrane/rtp/demuxer_test.exs index 17192b43..2a6046f2 100644 --- a/test/membrane/rtp/demuxer_test.exs +++ b/test/membrane/rtp/demuxer_test.exs @@ -130,12 +130,12 @@ defmodule Membrane.RTP.DemuxerTest do assert_start_of_stream(pipeline, {:sink, ^video_ssrc}) assert_start_of_stream(pipeline, {:sink, ^audio_ssrc}) - 1..(@rtp_input.video.packets - 0) + 1..@rtp_input.video.packets |> Enum.each(fn _i -> assert_sink_buffer(pipeline, {:sink, video_ssrc}, %Membrane.Buffer{}) end) - 1..(@rtp_input.audio.packets - 0) + 1..@rtp_input.audio.packets |> Enum.each(fn _i -> assert_sink_buffer(pipeline, {:sink, audio_ssrc}, %Membrane.Buffer{}) end) From 9f0c570863dc24f2852fd37c08a0bbf0edd6714e Mon Sep 17 00:00:00 2001 From: noarkhh Date: Thu, 6 Mar 2025 10:39:59 +0100 Subject: [PATCH 5/5] Add options descriptions --- lib/membrane/rtp/demuxer.ex | 16 +++++++++++++--- lib/membrane/rtp/jitter_buffer.ex | 5 ++--- .../rtp/demuxer_muxer_integration_test.exs | 3 +-- test/membrane/rtp/demuxer_test.exs | 6 ++---- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lib/membrane/rtp/demuxer.ex b/lib/membrane/rtp/demuxer.ex index a2f54ba0..ab77dc7c 100644 --- a/lib/membrane/rtp/demuxer.ex +++ b/lib/membrane/rtp/demuxer.ex @@ -61,15 +61,25 @@ defmodule Membrane.RTP.Demuxer do ], clock_rate: [ spec: RTP.clock_rate() | nil, - default: nil + default: nil, + description: """ + Clock rate of the stream. If not provided the demuxer will attempt to resolve it from payload type. + """ ], use_jitter_buffer: [ spec: boolean(), - default: true + default: true, + description: """ + Specifies whether to use an internal jitter buffer or not. The jitter buffer ensures that incoming packets are reordered based on their sequence + numbers. This functionality introduces some latency, so when the order of the packets is ensured by some other means the jitter buffer is redundant. + """ ], jitter_buffer_latency: [ spec: Membrane.Time.t(), - default: Membrane.Time.milliseconds(200) + default: Membrane.Time.milliseconds(200), + description: """ + Maximum latency introduced by the jitter buffer. Determines how long the element will wait for out-of-order packets if there are any gaps in sequence numbers. + """ ] ] diff --git a/lib/membrane/rtp/jitter_buffer.ex b/lib/membrane/rtp/jitter_buffer.ex index c1aa8693..86621680 100644 --- a/lib/membrane/rtp/jitter_buffer.ex +++ b/lib/membrane/rtp/jitter_buffer.ex @@ -148,9 +148,8 @@ defmodule Membrane.RTP.JitterBuffer do buffer_ts -> since_insertion = Time.monotonic_time() - buffer_ts - send_after_time = (latency - since_insertion) |> Time.as_milliseconds(:round) - # - if send_after_time > 0, do: Process.send_after(self(), :send_buffers, send_after_time) + send_after_time = max(0, latency - since_insertion) |> Time.as_milliseconds(:round) + Process.send_after(self(), :send_buffers, send_after_time) end %State{state | max_latency_timer: new_timer} diff --git a/test/membrane/rtp/demuxer_muxer_integration_test.exs b/test/membrane/rtp/demuxer_muxer_integration_test.exs index 3df0f480..64e175c4 100644 --- a/test/membrane/rtp/demuxer_muxer_integration_test.exs +++ b/test/membrane/rtp/demuxer_muxer_integration_test.exs @@ -79,8 +79,7 @@ defmodule Membrane.RTP.DemuxerMuxerTest do reference_normalized_packets = get_normalized_packets(reference_pipeline, @rtp_input.packets) subject_normalized_packets = get_normalized_packets(subject_pipeline, @rtp_input.packets) - assert reference_normalized_packets[:H264] == subject_normalized_packets[:H264] - assert reference_normalized_packets[:AAC] == subject_normalized_packets[:AAC] + assert reference_normalized_packets == subject_normalized_packets assert_end_of_stream(reference_pipeline, :sink) assert_end_of_stream(subject_pipeline, :sink) diff --git a/test/membrane/rtp/demuxer_test.exs b/test/membrane/rtp/demuxer_test.exs index 2a6046f2..60939f22 100644 --- a/test/membrane/rtp/demuxer_test.exs +++ b/test/membrane/rtp/demuxer_test.exs @@ -49,16 +49,14 @@ defmodule Membrane.RTP.DemuxerTest do |> child(:demuxer, Membrane.RTP.Demuxer) |> via_out(:output, options: [ - stream_id: {:payload_type, opts.audio.payload_type}, - jitter_buffer_latency: Membrane.Time.milliseconds(1) + stream_id: {:payload_type, opts.audio.payload_type} ] ) |> child({:sink, opts.audio.ssrc}, Testing.Sink), get_child(:demuxer) |> via_out(:output, options: [ - stream_id: {:payload_type, opts.video.payload_type}, - jitter_buffer_latency: Membrane.Time.milliseconds(1) + stream_id: {:payload_type, opts.video.payload_type} ] ) |> child({:sink, opts.video.ssrc}, Testing.Sink)