From d243ffe3a73881b9575e54ebb60a95698bb7033d Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 21 Feb 2024 13:41:21 +0100 Subject: [PATCH 1/7] Rename RTP.TCP.Depayloader to RTP.RTSP.Depayloader --- lib/membrane/rtp/tcp_depayloader.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/membrane/rtp/tcp_depayloader.ex b/lib/membrane/rtp/tcp_depayloader.ex index 93a2364b..b784a375 100644 --- a/lib/membrane/rtp/tcp_depayloader.ex +++ b/lib/membrane/rtp/tcp_depayloader.ex @@ -1,7 +1,8 @@ -defmodule Membrane.RTP.TCP.Depayloader do +defmodule Membrane.RTP.RTSP.Depayloader do @moduledoc """ This element provides functionality of depayloading RTP Packets received by TCP and redirecting - RTSP messages received in the same stream. The encapsulation is described in RFC 7826 Section 14. + RTSP messages received in the same stream established with RTSP. The encapsulation is described + in RFC 7826 Section 14. Encapsulated packets interleaved in the stream will have the following structure: ["$" = 36 :: 1 byte][Channel id :: 1 byte][Length :: 2 bytes][packet :: bytes] From bff93952a2d4ee372005d99c78239615a4a78d16 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 21 Feb 2024 13:51:37 +0100 Subject: [PATCH 2/7] Bump version --- README.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index bdc79259..515bced6 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_rtp_plugin` to your list of dep ```elixir def deps do [ - {:membrane_rtp_plugin, "~> 0.25.0"}, + {:membrane_rtp_plugin, "~> 0.26.0"}, {:ex_libsrtp, ">= 0.0.0"} # required only if SRTP/SRTCP support is needed ] end diff --git a/mix.exs b/mix.exs index a2c19584..66a7465e 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTP.Plugin.MixProject do use Mix.Project - @version "0.25.0" + @version "0.26.0" @github_url "https://github.com/membraneframework/membrane_rtp_plugin" def project do From 0bb37d5106d8c9148dcd27bd718a234828fb17b5 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Fri, 23 Feb 2024 13:09:10 +0100 Subject: [PATCH 3/7] Add basic tests --- ...tcp_depayloader.ex => rtsp_depayloader.ex} | 4 +- test/membrane/rtp/rtsp_depayloader_test.exs | 82 +++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) rename lib/membrane/rtp/{tcp_depayloader.ex => rtsp_depayloader.ex} (96%) create mode 100644 test/membrane/rtp/rtsp_depayloader_test.exs diff --git a/lib/membrane/rtp/tcp_depayloader.ex b/lib/membrane/rtp/rtsp_depayloader.ex similarity index 96% rename from lib/membrane/rtp/tcp_depayloader.ex rename to lib/membrane/rtp/rtsp_depayloader.ex index b784a375..511e132f 100644 --- a/lib/membrane/rtp/tcp_depayloader.ex +++ b/lib/membrane/rtp/rtsp_depayloader.ex @@ -96,12 +96,10 @@ defmodule Membrane.RTP.RTSP.Depayloader do end defp get_complete_packets( - <<"$", _rest::binary>> = packets_binary, + <<"$", received_channel_id, payload_length::size(16), rest::binary>> = packets_binary, channel_id, complete_packets ) do - <<"$", received_channel_id, payload_length::size(16), rest::binary>> = packets_binary - if payload_length > byte_size(rest) do {packets_binary, Enum.reverse(complete_packets)} else diff --git a/test/membrane/rtp/rtsp_depayloader_test.exs b/test/membrane/rtp/rtsp_depayloader_test.exs new file mode 100644 index 00000000..a3148916 --- /dev/null +++ b/test/membrane/rtp/rtsp_depayloader_test.exs @@ -0,0 +1,82 @@ +defmodule Membrane.RTP.RTSPDepayloaderTest do + use ExUnit.Case + + import Membrane.Testing.Assertions + import Membrane.ChildrenSpec + + alias Membrane.RTP.RTSP.Depayloader + alias Membrane.Testing.{Sink, Source, Pipeline} + + @header_length 4 + + defp encapsulate_rtp_packets(rtp_packets) do + Enum.map(rtp_packets, &<<"$", 0, byte_size(&1)::size(16), &1::binary>>) + end + + defp create_tcp_segments(encapsulated_rtp_packets, tcp_segments_lengths) do + assert Enum.sum(tcp_segments_lengths) == + Enum.sum(Enum.map(encapsulated_rtp_packets, &byte_size(&1))) + + encaplsulated_rtp_packets_binary = Enum.join(encapsulated_rtp_packets) + + {tcp_segments, _length} = + Enum.map_reduce(tcp_segments_lengths, 0, fn len, pos -> + {:binary.part(encaplsulated_rtp_packets_binary, pos, len), pos + len} + end) + + tcp_segments + end + + defp perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) do + rtp_packets = Enum.map(rtp_packets_lengths, &<<0::size(&1)-unit(8)>>) + + tcp_segments = + rtp_packets |> encapsulate_rtp_packets() |> create_tcp_segments(tcp_segments_lengths) + + pipeline = + Pipeline.start_link_supervised!( + spec: + child(:source, %Source{ + output: tcp_segments + }) + |> child(:depayloader, Depayloader) + |> child(:sink, Sink) + ) + + Enum.each(rtp_packets, fn packet -> + assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: ^packet}) + end) + + Pipeline.terminate(pipeline) + end + + describe "RTSP Depayloader depayloads correctly" do + test "when one tcp segment is one rtp packet" do + rtp_packets_lengths = 10..20 + tcp_segments_lengths = Enum.map(rtp_packets_lengths, &(&1 + @header_length)) + + perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) + end + + test "when there are multiple (3) rtp packets in one tcp segment" do + rtp_packets_lengths = 10..40 + + tcp_segments_lengths = + rtp_packets_lengths + |> Enum.chunk_every(3) + |> Enum.map(&(Enum.sum(&1) + length(&1) * @header_length)) + + perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) + end + + test "when rtp packets are spread across multiple (3) tcp segments" do + tcp_segments_lengths = + Enum.flat_map(rtp_packets_lengths, fn len -> + tcp_segment_base_length = div(len + @header_length, 3) + [tcp_segment_base_length - 1, tcp_segment_base_length, tcp_segment_base_length + 1] + end) + + perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) + end + end +end From bbab9d0770dba8246ef212964309a4dac09563ab Mon Sep 17 00:00:00 2001 From: noarkhh Date: Fri, 23 Feb 2024 13:23:10 +0100 Subject: [PATCH 4/7] Rename to decapsulator --- .../{rtsp_depayloader.ex => rtsp_decapsulator.ex} | 12 ++++++------ ...payloader_test.exs => rtsp_decapsulator_test.exs} | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) rename lib/membrane/rtp/{rtsp_depayloader.ex => rtsp_decapsulator.ex} (89%) rename test/membrane/rtp/{rtsp_depayloader_test.exs => rtsp_decapsulator_test.exs} (95%) diff --git a/lib/membrane/rtp/rtsp_depayloader.ex b/lib/membrane/rtp/rtsp_decapsulator.ex similarity index 89% rename from lib/membrane/rtp/rtsp_depayloader.ex rename to lib/membrane/rtp/rtsp_decapsulator.ex index 511e132f..34386381 100644 --- a/lib/membrane/rtp/rtsp_depayloader.ex +++ b/lib/membrane/rtp/rtsp_decapsulator.ex @@ -1,13 +1,13 @@ -defmodule Membrane.RTP.RTSP.Depayloader do +defmodule Membrane.RTP.RTSP.Decapsulator do @moduledoc """ - This element provides functionality of depayloading RTP Packets received by TCP and redirecting - RTSP messages received in the same stream established with RTSP. The encapsulation is described - in RFC 7826 Section 14. + This element provides functionality of decapsulating RTP Packets and redirecting RTSP messages + received in the same TCP stream established with RTSP. The encapsulation is described in + RFC 7826 Section 14. - Encapsulated packets interleaved in the stream will have the following structure: + Encapsulated RTP packets interleaved in the stream will have the following structure: ["$" = 36 :: 1 byte][Channel id :: 1 byte][Length :: 2 bytes][packet :: bytes] - RTSP Messages + RTSP Messages are not encapsulated this way, but can only be present between RTP packets. """ use Membrane.Filter diff --git a/test/membrane/rtp/rtsp_depayloader_test.exs b/test/membrane/rtp/rtsp_decapsulator_test.exs similarity index 95% rename from test/membrane/rtp/rtsp_depayloader_test.exs rename to test/membrane/rtp/rtsp_decapsulator_test.exs index a3148916..961d7c8c 100644 --- a/test/membrane/rtp/rtsp_depayloader_test.exs +++ b/test/membrane/rtp/rtsp_decapsulator_test.exs @@ -1,4 +1,4 @@ -defmodule Membrane.RTP.RTSPDepayloaderTest do +defmodule Membrane.RTP.RTSPDecapsulatorTest do use ExUnit.Case import Membrane.Testing.Assertions @@ -50,7 +50,7 @@ defmodule Membrane.RTP.RTSPDepayloaderTest do Pipeline.terminate(pipeline) end - describe "RTSP Depayloader depayloads correctly" do + describe "RTSP Decapsulator decapsulates correctly" do test "when one tcp segment is one rtp packet" do rtp_packets_lengths = 10..20 tcp_segments_lengths = Enum.map(rtp_packets_lengths, &(&1 + @header_length)) From f6d8c3ed8b2188a6e6c4b8200db5cdce9d6d80f6 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Fri, 23 Feb 2024 14:50:23 +0100 Subject: [PATCH 5/7] Fix test --- test/membrane/rtp/rtsp_decapsulator_test.exs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/membrane/rtp/rtsp_decapsulator_test.exs b/test/membrane/rtp/rtsp_decapsulator_test.exs index 961d7c8c..0b80d622 100644 --- a/test/membrane/rtp/rtsp_decapsulator_test.exs +++ b/test/membrane/rtp/rtsp_decapsulator_test.exs @@ -4,7 +4,7 @@ defmodule Membrane.RTP.RTSPDecapsulatorTest do import Membrane.Testing.Assertions import Membrane.ChildrenSpec - alias Membrane.RTP.RTSP.Depayloader + alias Membrane.RTP.RTSP.Decapsulator alias Membrane.Testing.{Sink, Source, Pipeline} @header_length 4 @@ -39,7 +39,7 @@ defmodule Membrane.RTP.RTSPDecapsulatorTest do child(:source, %Source{ output: tcp_segments }) - |> child(:depayloader, Depayloader) + |> child(:decapsulator, Decapsulator) |> child(:sink, Sink) ) @@ -70,6 +70,8 @@ defmodule Membrane.RTP.RTSPDecapsulatorTest do end test "when rtp packets are spread across multiple (3) tcp segments" do + rtp_packets_lengths = 11..41//3 + tcp_segments_lengths = Enum.flat_map(rtp_packets_lengths, fn len -> tcp_segment_base_length = div(len + @header_length, 3) From 088936e91f4c1b4dd8e59c01e8f25b3fed6d3695 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Fri, 23 Feb 2024 14:52:32 +0100 Subject: [PATCH 6/7] Satisfy credo --- test/membrane/rtp/rtsp_decapsulator_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/membrane/rtp/rtsp_decapsulator_test.exs b/test/membrane/rtp/rtsp_decapsulator_test.exs index 0b80d622..8bb5a460 100644 --- a/test/membrane/rtp/rtsp_decapsulator_test.exs +++ b/test/membrane/rtp/rtsp_decapsulator_test.exs @@ -5,7 +5,7 @@ defmodule Membrane.RTP.RTSPDecapsulatorTest do import Membrane.ChildrenSpec alias Membrane.RTP.RTSP.Decapsulator - alias Membrane.Testing.{Sink, Source, Pipeline} + alias Membrane.Testing.{Pipeline, Sink, Source} @header_length 4 From bf50ec970cb3dd4ffe272e85545eb30c3c80161c Mon Sep 17 00:00:00 2001 From: noarkhh Date: Fri, 23 Feb 2024 16:35:31 +0100 Subject: [PATCH 7/7] Apply reviewers suggestion --- lib/membrane/rtp/rtsp_decapsulator.ex | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/membrane/rtp/rtsp_decapsulator.ex b/lib/membrane/rtp/rtsp_decapsulator.ex index 34386381..fde8a677 100644 --- a/lib/membrane/rtp/rtsp_decapsulator.ex +++ b/lib/membrane/rtp/rtsp_decapsulator.ex @@ -100,17 +100,17 @@ defmodule Membrane.RTP.RTSP.Decapsulator do channel_id, complete_packets ) do - if payload_length > byte_size(rest) do - {packets_binary, Enum.reverse(complete_packets)} - else - <> = rest + case rest do + <> -> + complete_packets = + if channel_id != received_channel_id, + do: complete_packets, + else: [complete_packet_binary | complete_packets] - complete_packets = - if channel_id != received_channel_id, - do: complete_packets, - else: [complete_packet_binary | complete_packets] + get_complete_packets(rest, channel_id, complete_packets) - get_complete_packets(rest, channel_id, complete_packets) + _incomplete_packet_binary -> + {packets_binary, Enum.reverse(complete_packets)} end end