diff --git a/README.md b/README.md index b9c1446b..40988261 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_rtp_plugin` to your list of dep ```elixir def deps do [ - {:membrane_rtp_plugin, "~> 0.30.0"}, + {:membrane_rtp_plugin, "~> 0.31.0"}, {:ex_libsrtp, ">= 0.0.0"} # required only if SRTP/SRTCP support is needed ] end diff --git a/lib/membrane/rtp/demuxer.ex b/lib/membrane/rtp/demuxer.ex index 54e47d96..ab77dc7c 100644 --- a/lib/membrane/rtp/demuxer.ex +++ b/lib/membrane/rtp/demuxer.ex @@ -13,7 +13,11 @@ defmodule Membrane.RTP.Demuxer do require Membrane.Pad require Membrane.Logger - alias Membrane.{Pad, RemoteStream, RTCP, RTP} + alias Membrane.Element.CallbackContext + alias Membrane.{Pad, RemoteStream, RTCP, RTP, Time} + alias Membrane.RTP.JitterBuffer.{BufferStore, Record} + + @max_timestamp Bitwise.bsl(1, 32) - 1 @typedoc """ Metadata present in each output buffer. The `ExRTP.Packet.t()` struct contains @@ -54,6 +58,28 @@ defmodule Membrane.RTP.Demuxer do then most likely it should be used, since it's unique for each stream. If it's not known (for example when the pad is being linked upfront), encoding or payload type should be provided, and the first identified stream of given encoding or payload type will be sent on this pad. """ + ], + clock_rate: [ + spec: RTP.clock_rate() | nil, + default: nil, + description: """ + Clock rate of the stream. If not provided the demuxer will attempt to resolve it from payload type. + """ + ], + use_jitter_buffer: [ + spec: boolean(), + default: true, + description: """ + Specifies whether to use an internal jitter buffer or not. The jitter buffer ensures that incoming packets are reordered based on their sequence + numbers. This functionality introduces some latency, so when the order of the packets is ensured by some other means the jitter buffer is redundant. + """ + ], + jitter_buffer_latency: [ + spec: Membrane.Time.t(), + default: Membrane.Time.milliseconds(200), + description: """ + Maximum latency introduced by the jitter buffer. Determines how long the element will wait for out-of-order packets if there are any gaps in sequence numbers. + """ ] ] @@ -80,17 +106,40 @@ defmodule Membrane.RTP.Demuxer do alias Membrane.RTP @type t :: %__MODULE__{ - queued_buffers: [ExRTP.Packet.t()], + buffer_store: RTP.JitterBuffer.BufferStore.t(), end_of_stream_buffered: boolean(), - phase: :waiting_for_link | :linked | :timed_out, + phase: :waiting_for_matching_pad | :matched_with_pad | :timed_out, + pad_match_timer: reference() | nil, payload_type: RTP.payload_type(), - link_timer: reference() | nil, - pad: Pad.ref() | nil + pad: Pad.ref() | nil, + clock_rate: non_neg_integer() | nil, + jitter_buffer_latency: Membrane.Time.t() | nil, + initial_latency_waiting: boolean(), + initialization_time: Membrane.Time.t(), + max_latency_timer: reference() | nil, + timestamp_base: Membrane.Time.t(), + previous_timestamp: Membrane.Time.t() } - @enforce_keys [:payload_type, :phase, :link_timer, :pad] + @enforce_keys [ + :payload_type, + :phase, + :pad_match_timer, + :pad, + :timestamp_base, + :initial_latency_waiting, + :previous_timestamp, + :initialization_time + ] - defstruct @enforce_keys ++ [queued_buffers: [], end_of_stream_buffered: false] + defstruct @enforce_keys ++ + [ + buffer_store: %RTP.JitterBuffer.BufferStore{}, + end_of_stream_buffered: false, + max_latency_timer: nil, + clock_rate: nil, + jitter_buffer_latency: nil + ] end @type t :: %__MODULE__{ @@ -115,9 +164,9 @@ defmodule Membrane.RTP.Demuxer do end @impl true - def handle_buffer(:input, %Membrane.Buffer{payload: payload}, _ctx, state) do + def handle_buffer(:input, %Membrane.Buffer{payload: payload}, ctx, state) do case classify_packet(payload) do - :rtp -> handle_rtp_packet(payload, state) + :rtp -> handle_rtp_packet(payload, ctx, state) :rtcp -> handle_rtcp_packets(payload, state) end end @@ -138,33 +187,64 @@ defmodule Membrane.RTP.Demuxer do state = put_in(state.pads_waiting_for_stream[pad], ctx.pad_options.stream_id) {[], state} - %{phase: :waiting_for_link} -> - Process.cancel_timer(stream_state.link_timer) + %{phase: :timed_out} -> + Membrane.Logger.warning( + "Connected a pad corresponding to a timed out stream, sending end_of_stream" + ) - buffer_action = [buffer: {pad, Enum.reverse(stream_state.queued_buffers)}] + {[end_of_stream: pad], state} - end_of_stream_action = - if stream_state.end_of_stream_buffered, do: [end_of_stream: pad], else: [] + %{phase: :waiting_for_matching_pad} -> + Process.cancel_timer(stream_state.pad_match_timer) - state = - put_in( - state.stream_states[matching_stream_ssrc], - %{stream_state | phase: :linked, queued_buffers: [], pad: pad} + %{clock_rate: clock_rate} = + RTP.PayloadFormat.resolve( + payload_type: stream_state.payload_type, + clock_rate: ctx.pad_options.clock_rate, + payload_type_mapping: state.payload_type_mapping ) - {[stream_format: {pad, %RTP{}}] ++ buffer_action ++ end_of_stream_action, state} + jitter_buffer_latency = + if ctx.pad_options.use_jitter_buffer, do: ctx.pad_options.jitter_buffer_latency, else: 0 - %{phase: :timed_out} -> - Membrane.Logger.warning( - "Connected a pad corresponding to a timed out stream, sending end_of_stream" - ) + stream_state = %State.StreamState{ + stream_state + | phase: :matched_with_pad, + pad: pad, + clock_rate: clock_rate, + jitter_buffer_latency: jitter_buffer_latency + } - {[end_of_stream: pad], state} + time_since_initialization = Time.monotonic_time() - stream_state.initialization_time + + if time_since_initialization < stream_state.jitter_buffer_latency and + ctx.pad_options.use_jitter_buffer do + initial_latency_left = stream_state.jitter_buffer_latency - time_since_initialization + + Process.send_after( + self(), + {:initial_latency_passed, matching_stream_ssrc}, + Membrane.Time.as_milliseconds(initial_latency_left, :round) + ) + end + + {buffer_actions, stream_state} = send_buffers(matching_stream_ssrc, stream_state) + + {end_of_stream_actions, stream_state} = + if stream_state.end_of_stream_buffered do + get_end_of_stream_actions(stream_state) + else + {[], stream_state} + end + + state = put_in(state.stream_states[matching_stream_ssrc], stream_state) + + {[stream_format: {pad, %RTP{}}] ++ buffer_actions ++ end_of_stream_actions, state} end end @impl true - def handle_info({:link_timeout, ssrc}, _ctx, state) do + def handle_info({:pad_match_timeout, ssrc}, _ctx, state) do case state.not_linked_pad_handling.action do :raise -> raise "Pad corresponding to ssrc #{ssrc} not connected within specified timeout" @@ -179,19 +259,40 @@ defmodule Membrane.RTP.Demuxer do end state = put_in(state.stream_states[ssrc].phase, :timed_out) - state = put_in(state.stream_states[ssrc].queued_buffers, []) + state = put_in(state.stream_states[ssrc].buffer_store, %BufferStore{}) {[], state} end + @impl true + def handle_info({:initial_latency_passed, ssrc}, _context, state) do + {actions, stream_state} = + send_buffers(ssrc, %State.StreamState{ + state.stream_states[ssrc] + | initial_latency_waiting: false + }) + + state = put_in(state.stream_states[ssrc], stream_state) + {actions, state} + end + + @impl true + def handle_info({:send_buffers, ssrc}, _context, state) do + {actions, stream_state} = + send_buffers(ssrc, %State.StreamState{state.stream_states[ssrc] | max_latency_timer: nil}) + + state = put_in(state.stream_states[ssrc], stream_state) + {actions, state} + end + @impl true def handle_end_of_stream(:input, _ctx, state) do - state = - state.stream_states - |> Enum.reduce(state, fn {ssrc, _stream_state}, state -> - put_in(state.stream_states[ssrc].end_of_stream_buffered, true) - end) + state.stream_states + |> Enum.flat_map_reduce(state, fn {ssrc, _stream_state}, state -> + {actions, stream_state} = get_end_of_stream_actions(state.stream_states[ssrc]) - {[forward: :end_of_stream], state} + state = put_in(state.stream_states[ssrc], %{stream_state | end_of_stream_buffered: true}) + {actions, state} + end) end @spec classify_packet(binary()) :: :rtp | :rtcp @@ -201,16 +302,21 @@ defmodule Membrane.RTP.Demuxer do else: :rtp end - @spec handle_rtp_packet(binary(), State.t()) :: + @spec handle_rtp_packet(binary(), CallbackContext.t(), State.t()) :: {[Membrane.Element.Action.t()], State.t()} - defp handle_rtp_packet(raw_rtp_packet, state) do + defp handle_rtp_packet(raw_rtp_packet, ctx, state) do {:ok, packet} = ExRTP.Packet.decode(raw_rtp_packet) {new_stream_actions, state} = if Map.has_key?(state.stream_states, packet.ssrc) do {[], state} else - initialize_new_stream_state(packet, state) + find_matching_pad_for_stream( + packet, + state.pads_waiting_for_stream, + state.payload_type_mapping + ) + |> initialize_new_stream_state(packet, ctx, state) end buffer = %Membrane.Buffer{ @@ -218,69 +324,132 @@ defmodule Membrane.RTP.Demuxer do metadata: %{rtp: %{packet | payload: <<>>}} } - {buffer_actions, state} = - case state.stream_states[packet.ssrc].phase do - :waiting_for_link -> - state = update_in(state.stream_states[packet.ssrc].queued_buffers, &[buffer | &1]) - {[], state} - - :linked -> - buffer_action = - {:buffer, {state.stream_states[packet.ssrc].pad, buffer}} + stream_state = insert_buffer_into_buffer_store(state.stream_states[packet.ssrc], buffer) - {[buffer_action], state} + {buffer_actions, stream_state} = maybe_send_buffers(packet.ssrc, stream_state) - :timed_out -> - {[], state} - end + state = put_in(state.stream_states[packet.ssrc], stream_state) {new_stream_actions ++ buffer_actions, state} end + @spec insert_buffer_into_buffer_store(State.StreamState.t(), Membrane.Buffer.t()) :: + State.StreamState.t() + defp insert_buffer_into_buffer_store(stream_state, buffer) do + case stream_state.phase do + :timed_out -> + stream_state + + phase when phase in [:waiting_for_matching_pad, :matched_with_pad] -> + case BufferStore.insert_buffer(stream_state.buffer_store, buffer) do + {:ok, buffer_store} -> + %State.StreamState{stream_state | buffer_store: buffer_store} + + {:error, :late_packet} -> + Membrane.Logger.debug("Late packet has arrived") + stream_state + end + end + end + + @spec maybe_send_buffers(RTP.ssrc(), State.StreamState.t()) :: + {[Membrane.Element.Action.t()], State.StreamState.t()} + defp maybe_send_buffers(ssrc, stream_state) do + if stream_state.phase == :matched_with_pad and not stream_state.initial_latency_waiting do + send_buffers(ssrc, stream_state) + else + {[], stream_state} + end + end + + @spec get_end_of_stream_actions(State.StreamState.t()) :: + {[Membrane.Element.Action.t()], State.StreamState.t()} + defp get_end_of_stream_actions(%State.StreamState{pad: nil} = stream_state) do + {[], stream_state} + end + + defp get_end_of_stream_actions(stream_state) do + {buffer_actions, stream_state} = + stream_state.buffer_store + |> BufferStore.dump() + |> Enum.flat_map_reduce(stream_state, &record_to_actions/2) + + {buffer_actions ++ [end_of_stream: stream_state.pad], + %State.StreamState{stream_state | buffer_store: %BufferStore{}}} + end + @spec handle_rtcp_packets(binary(), State.t()) :: {[Membrane.Element.Action.t()], State.t()} defp handle_rtcp_packets(rtcp_packets, state) do Membrane.Logger.debug_verbose("Received RTCP Packet(s): #{inspect(rtcp_packets)}") {[], state} end - @spec initialize_new_stream_state(ExRTP.Packet.t(), State.t()) :: - {[Membrane.Element.Action.t()], State.t()} - defp initialize_new_stream_state(packet, state) do - case find_matching_pad_for_stream( - packet, - state.pads_waiting_for_stream, - state.payload_type_mapping - ) do - nil -> - stream_state = %State.StreamState{ - phase: :waiting_for_link, - link_timer: - Process.send_after( - self(), - {:link_timeout, packet.ssrc}, - Membrane.Time.as_milliseconds(state.not_linked_pad_handling.timeout, :round) - ), - payload_type: packet.payload_type, - pad: nil - } + @spec initialize_new_stream_state( + matching_pad :: Pad.ref() | nil, + ExRTP.Packet.t(), + CallbackContext.t(), + State.t() + ) :: {[Membrane.Element.Action.t()], State.t()} + defp initialize_new_stream_state(nil, packet, _ctx, state) do + stream_state = %State.StreamState{ + phase: :waiting_for_matching_pad, + pad_match_timer: + Process.send_after( + self(), + {:pad_match_timeout, packet.ssrc}, + Membrane.Time.as_milliseconds(state.not_linked_pad_handling.timeout, :round) + ), + payload_type: packet.payload_type, + pad: nil, + timestamp_base: packet.timestamp, + previous_timestamp: packet.timestamp, + initialization_time: Membrane.Time.monotonic_time(), + initial_latency_waiting: true + } - state = put_in(state.stream_states[packet.ssrc], stream_state) + state = put_in(state.stream_states[packet.ssrc], stream_state) - {[notify_parent: create_new_stream_notification(packet)], state} + {[notify_parent: create_new_stream_notification(packet)], state} + end - pad_waiting_for_stream -> - stream_state = %State.StreamState{ - phase: :linked, - link_timer: nil, - payload_type: packet.payload_type, - pad: pad_waiting_for_stream - } + defp initialize_new_stream_state(pad_waiting_for_stream, packet, ctx, state) do + pad_options = ctx.pads[pad_waiting_for_stream].options + + jitter_buffer_latency = + if pad_options.use_jitter_buffer, do: pad_options.jitter_buffer_latency, else: 0 - state = put_in(state.stream_states[packet.ssrc], stream_state) - {_stream_id, state} = pop_in(state.pads_waiting_for_stream[pad_waiting_for_stream]) + %{clock_rate: clock_rate} = + RTP.PayloadFormat.resolve( + payload_type: packet.payload_type, + clock_rate: pad_options.clock_rate, + payload_type_mapping: state.payload_type_mapping + ) - {[stream_format: {pad_waiting_for_stream, %RTP{}}], state} + stream_state = %State.StreamState{ + phase: :matched_with_pad, + pad_match_timer: nil, + payload_type: packet.payload_type, + pad: pad_waiting_for_stream, + timestamp_base: packet.timestamp, + previous_timestamp: packet.timestamp, + initialization_time: Membrane.Time.monotonic_time(), + jitter_buffer_latency: jitter_buffer_latency, + clock_rate: clock_rate, + initial_latency_waiting: pad_options.use_jitter_buffer + } + + if pad_options.use_jitter_buffer do + Process.send_after( + self(), + {:initial_latency_passed, packet.ssrc}, + Membrane.Time.as_milliseconds(pad_options.jitter_buffer_latency, :round) + ) end + + state = put_in(state.stream_states[packet.ssrc], stream_state) + {_stream_id, state} = pop_in(state.pads_waiting_for_stream[pad_waiting_for_stream]) + + {[stream_format: {pad_waiting_for_stream, %RTP{}}], state} end @spec find_matching_pad_for_stream( @@ -360,4 +529,82 @@ defmodule Membrane.RTP.Demuxer do {:new_rtp_stream, %{ssrc: packet.ssrc, payload_type: packet.payload_type, extensions: extensions}} end + + @spec send_buffers(RTP.ssrc(), State.StreamState.t()) :: + {[Membrane.Element.Action.t()], State.StreamState.t()} + defp send_buffers(ssrc, stream_state) do + {too_old_records, buffer_store} = + BufferStore.flush_older_than(stream_state.buffer_store, stream_state.jitter_buffer_latency) + + {buffers, buffer_store} = BufferStore.flush_ordered(buffer_store) + stream_state = %{stream_state | buffer_store: buffer_store} + + {actions, stream_state} = + (too_old_records ++ buffers) |> Enum.flat_map_reduce(stream_state, &record_to_actions/2) + + stream_state = set_timer(ssrc, stream_state) + + {actions, stream_state} + end + + @spec set_timer(RTP.ssrc(), State.StreamState.t()) :: State.StreamState.t() + defp set_timer( + ssrc, + %State.StreamState{max_latency_timer: nil, jitter_buffer_latency: latency} = stream_state + ) + when latency > 0 do + new_timer = + case BufferStore.first_record_timestamp(stream_state.buffer_store) do + nil -> + nil + + buffer_ts -> + since_insertion = Time.monotonic_time() - buffer_ts + send_after_time = Time.as_milliseconds(latency - since_insertion, :round) + + if send_after_time > 0 do + Process.send_after(self(), {:send_buffers, ssrc}, send_after_time) + else + nil + end + end + + %State.StreamState{stream_state | max_latency_timer: new_timer} + end + + defp set_timer(_ssrc, stream_state) do + stream_state + end + + defp record_to_actions(nil, stream_state) do + action = [event: {stream_state.pad, %Membrane.Event.Discontinuity{}}] + {action, stream_state} + end + + defp record_to_actions(%Record{buffer: buffer}, stream_state) do + rtp_timestamp = buffer.metadata.rtp.timestamp + + # timestamps in RTP don't have to be monotonic therefore there can be + # a situation where in 2 consecutive packets the latter packet will have smaller timestamp + # than the previous one while not overflowing the timestamp number + # https://datatracker.ietf.org/doc/html/rfc3550#section-5.1 + + timestamp_base = + case RTP.Utils.from_which_rollover( + stream_state.previous_timestamp, + rtp_timestamp, + @max_timestamp + ) do + :next -> stream_state.timestamp_base - @max_timestamp + :previous -> stream_state.timestamp_base + @max_timestamp + :current -> stream_state.timestamp_base + end + + timestamp = div(Time.seconds(rtp_timestamp - timestamp_base), stream_state.clock_rate) + buffer = %Membrane.Buffer{buffer | pts: timestamp} + # An empty buffer is usually a WebRTC probing packet that should be skipped. + actions = if buffer.payload == <<>>, do: [], else: [buffer: {stream_state.pad, buffer}] + state = %{stream_state | timestamp_base: timestamp_base, previous_timestamp: rtp_timestamp} + {actions, state} + end end diff --git a/lib/membrane/rtp/jitter_buffer/buffer_store.ex b/lib/membrane/rtp/jitter_buffer/buffer_store.ex index f0f77d9f..727bff9b 100644 --- a/lib/membrane/rtp/jitter_buffer/buffer_store.ex +++ b/lib/membrane/rtp/jitter_buffer/buffer_store.ex @@ -61,44 +61,6 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do do_insert_buffer(store, buffer, seq_num) end - @spec do_insert_buffer(t(), Buffer.t(), RTP.Header.sequence_number_t()) :: - {:ok, t()} | {:error, insert_error()} - defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, 0) do - store = add_record(store, Record.new(buffer, @seq_number_limit), :next) - {:ok, %__MODULE__{store | flush_index: @seq_number_limit - 1}} - end - - defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, seq_num) do - store = add_record(store, Record.new(buffer, seq_num), :current) - {:ok, %__MODULE__{store | flush_index: seq_num - 1}} - end - - defp do_insert_buffer( - %__MODULE__{ - flush_index: flush_index, - highest_incoming_index: highest_incoming_index, - rollover_count: roc - } = store, - buffer, - seq_num - ) do - highest_seq_num = rem(highest_incoming_index, @seq_number_limit) - - {rollover, index} = - case Utils.from_which_rollover(highest_seq_num, seq_num, @seq_number_limit) do - :current -> {:current, seq_num + roc * @seq_number_limit} - :previous -> {:previous, seq_num + (roc - 1) * @seq_number_limit} - :next -> {:next, seq_num + (roc + 1) * @seq_number_limit} - end - - if fresh_packet?(flush_index, index) do - record = Record.new(buffer, index) - {:ok, add_record(store, record, rollover)} - else - {:error, :late_packet} - end - end - @doc """ Flushes the store to the buffer with the next sequence number. @@ -159,7 +121,7 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do @doc """ Returns all buffers that are stored in the `BufferStore`. """ - @spec dump(t()) :: [Record.t()] + @spec dump(t()) :: [Record.t() | nil] def dump(%__MODULE__{} = store) do {records, _store} = flush_while(store, fn _store, _record -> true end) records @@ -176,6 +138,44 @@ defmodule Membrane.RTP.JitterBuffer.BufferStore do end end + @spec do_insert_buffer(t(), Buffer.t(), RTP.Header.sequence_number_t()) :: + {:ok, t()} | {:error, insert_error()} + defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, 0) do + store = add_record(store, Record.new(buffer, @seq_number_limit), :next) + {:ok, %__MODULE__{store | flush_index: @seq_number_limit - 1}} + end + + defp do_insert_buffer(%__MODULE__{flush_index: nil} = store, buffer, seq_num) do + store = add_record(store, Record.new(buffer, seq_num), :current) + {:ok, %__MODULE__{store | flush_index: seq_num - 1}} + end + + defp do_insert_buffer( + %__MODULE__{ + flush_index: flush_index, + highest_incoming_index: highest_incoming_index, + rollover_count: roc + } = store, + buffer, + seq_num + ) do + highest_seq_num = rem(highest_incoming_index, @seq_number_limit) + + {rollover, index} = + case Utils.from_which_rollover(highest_seq_num, seq_num, @seq_number_limit) do + :current -> {:current, seq_num + roc * @seq_number_limit} + :previous -> {:previous, seq_num + (roc - 1) * @seq_number_limit} + :next -> {:next, seq_num + (roc + 1) * @seq_number_limit} + end + + if fresh_packet?(flush_index, index) do + record = Record.new(buffer, index) + {:ok, add_record(store, record, rollover)} + else + {:error, :late_packet} + end + end + defp fresh_packet?(flush_index, index), do: index > flush_index @spec flush_while(t, (t, Record.t() -> boolean), [Record.t() | nil]) :: diff --git a/mix.exs b/mix.exs index 190a3bc2..bbf64d73 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTP.Plugin.MixProject do use Mix.Project - @version "0.30.0" + @version "0.31.0" @github_url "https://github.com/membraneframework/membrane_rtp_plugin" def project do diff --git a/test/membrane/rtp/demuxer_muxer_integration_test.exs b/test/membrane/rtp/demuxer_muxer_integration_test.exs index 1399309b..64e175c4 100644 --- a/test/membrane/rtp/demuxer_muxer_integration_test.exs +++ b/test/membrane/rtp/demuxer_muxer_integration_test.exs @@ -10,6 +10,9 @@ defmodule Membrane.RTP.DemuxerMuxerTest do packets: 862 } + @max_sequence_number Bitwise.bsl(1, 16) - 1 + @max_timestamp Bitwise.bsl(1, 32) - 1 + defmodule ReferencePipeline do use Membrane.Pipeline @@ -45,13 +48,11 @@ defmodule Membrane.RTP.DemuxerMuxerTest do _ctx, state ) do - %{encoding_name: encoding_name, clock_rate: clock_rate} = - Membrane.RTP.PayloadFormat.get_payload_type_mapping(pt) + %{encoding_name: encoding_name} = Membrane.RTP.PayloadFormat.get_payload_type_mapping(pt) spec = get_child(:rtp_demuxer) |> via_out(:output, options: [stream_id: {:ssrc, ssrc}]) - |> child({:jitter_buffer, ssrc}, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate}) |> via_in(:input, options: [encoding: encoding_name]) |> get_child(:rtp_muxer) @@ -99,25 +100,27 @@ defmodule Membrane.RTP.DemuxerMuxerTest do packet end) |> Enum.group_by(& &1.payload_type) - |> Map.new(fn {payload_type, payload_type_buffers} -> + |> Map.new(fn {payload_type, packets} -> %{encoding_name: encoding_name} = RTP.PayloadFormat.get_payload_type_mapping(payload_type) - sorted_packets = - payload_type_buffers - |> Enum.sort(&(&1.sequence_number < &2.sequence_number)) - %{ssrc: ssrc, sequence_number: first_sequence_number, timestamp: first_timestamp} = - List.first(sorted_packets) + List.first(packets) normalized_packets = - sorted_packets + packets |> Enum.map(fn packet -> %{ packet | ssrc: packet.ssrc - ssrc, - sequence_number: packet.sequence_number - first_sequence_number, + # modulo to account for wrapping + sequence_number: + Integer.mod( + packet.sequence_number - first_sequence_number, + @max_sequence_number + 1 + ), # round to ignore insignificant differences in timestamps - timestamp: round((packet.timestamp - first_timestamp) / 10) + timestamp: + round(Integer.mod(packet.timestamp - first_timestamp, @max_timestamp + 1) / 10) } end) diff --git a/test/membrane/rtp/demuxer_test.exs b/test/membrane/rtp/demuxer_test.exs index d2af8271..60939f22 100644 --- a/test/membrane/rtp/demuxer_test.exs +++ b/test/membrane/rtp/demuxer_test.exs @@ -10,6 +10,34 @@ defmodule Membrane.RTP.DemuxerTest do video: %{ssrc: 670_572_639, packets: 842, payload_type: 96} } + defmodule Reorderer do + use Membrane.Filter + + def_input_pad :input, accepted_format: _any + def_output_pad :output, accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{buffers: []}} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + case state.buffers do + [first_buffer, second_buffer] -> + {[buffer: {:output, [first_buffer, buffer, second_buffer]}], %{buffers: []}} + + buffer_list -> + {[], %{state | buffers: buffer_list ++ [buffer]}} + end + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) do + {[buffer: {:output, state.buffers}, end_of_stream: :output], state} + end + end + defmodule UpfrontPayloadTypePipeline do use Membrane.Pipeline @@ -17,11 +45,20 @@ defmodule Membrane.RTP.DemuxerTest do def handle_init(_ctx, opts) do spec = [ child(:pcap_source, %Membrane.Pcap.Source{path: opts.pcap_path}) + |> child(if opts.reorder_packets, do: Reorderer, else: Membrane.Debug.Filter) |> child(:demuxer, Membrane.RTP.Demuxer) - |> via_out(:output, options: [stream_id: {:payload_type, opts.audio.payload_type}]) + |> via_out(:output, + options: [ + stream_id: {:payload_type, opts.audio.payload_type} + ] + ) |> child({:sink, opts.audio.ssrc}, Testing.Sink), get_child(:demuxer) - |> via_out(:output, options: [stream_id: {:payload_type, opts.video.payload_type}]) + |> via_out(:output, + options: [ + stream_id: {:payload_type, opts.video.payload_type} + ] + ) |> child({:sink, opts.video.ssrc}, Testing.Sink) ] @@ -60,6 +97,7 @@ defmodule Membrane.RTP.DemuxerTest do def handle_init(_ctx, opts) do spec = child(:pcap_source, %Membrane.Pcap.Source{path: opts.pcap_path}) + |> child(if opts.reorder_packets, do: Reorderer, else: Membrane.Debug.Filter) |> child(:demuxer, Membrane.RTP.Demuxer) {[spec: spec], %{}} @@ -76,9 +114,12 @@ defmodule Membrane.RTP.DemuxerTest do end end - defp perform_test(pipeline_module) do + defp perform_test(pipeline_module, reorder_packets \\ false) do pipeline = - Testing.Pipeline.start_supervised!(module: pipeline_module, custom_args: @rtp_input) + Testing.Pipeline.start_supervised!( + module: pipeline_module, + custom_args: Map.put(@rtp_input, :reorder_packets, reorder_packets) + ) %{audio: %{ssrc: audio_ssrc}, video: %{ssrc: video_ssrc}} = @rtp_input @@ -114,5 +155,13 @@ defmodule Membrane.RTP.DemuxerTest do test "when it's pads were linked based on notifications received" do perform_test(DynamicPipeline) end + + test "when packets are reordered in a pipeline linked upfront" do + perform_test(UpfrontPayloadTypePipeline, true) + end + + test "when packets are reordered in a dynamic pipeline" do + perform_test(DynamicPipeline, true) + end end end diff --git a/test/membrane/rtp/muxer_demuxer_integration_test.exs b/test/membrane/rtp/muxer_demuxer_integration_test.exs index c5ac12cc..754035e7 100644 --- a/test/membrane/rtp/muxer_demuxer_integration_test.exs +++ b/test/membrane/rtp/muxer_demuxer_integration_test.exs @@ -2,7 +2,6 @@ defmodule Membrane.RTP.MuxerDemuxerTest do @moduledoc false use ExUnit.Case import Membrane.Testing.Assertions - alias Membrane.RTP alias Membrane.Testing @input_path "test/fixtures/rtp/h264/bun.h264" @@ -12,8 +11,6 @@ defmodule Membrane.RTP.MuxerDemuxerTest do @impl true def handle_init(_ctx, opts) do - %{clock_rate: clock_rate} = RTP.PayloadFormat.resolve(encoding_name: :H264) - spec = [ child(:source, %Membrane.File.Source{location: opts.input_path}) |> child(:h264_parser, %Membrane.H264.Parser{ @@ -25,7 +22,6 @@ defmodule Membrane.RTP.MuxerDemuxerTest do |> child(:rtp_muxer, Membrane.RTP.Muxer) |> child(:rtp_demuxer, Membrane.RTP.Demuxer) |> via_out(:output, options: [stream_id: {:encoding_name, :H264}]) - |> child(:jitter_buffer, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate}) |> child(:rtp_h264_depayloader, Membrane.RTP.H264.Depayloader) |> child(:sink, %Membrane.File.Sink{location: opts.output_path}) ]