From 99664fc6f845aa8447a25b5ee9683e89db4d9151 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Sat, 21 Dec 2024 23:51:45 +0100 Subject: [PATCH 1/7] add support for announce and record methods --- lib/membrane_rtsp/parser.ex | 8 +- lib/membrane_rtsp/parser/transport.ex | 8 +- lib/membrane_rtsp/request.ex | 13 +- lib/membrane_rtsp/server/conn.ex | 61 ++++-- lib/membrane_rtsp/server/handler.ex | 34 ++- lib/membrane_rtsp/server/logic.ex | 200 +++++++++++++++--- .../server/server_logic_test.exs | 52 ++--- test/support/fake_handler.ex | 11 +- 8 files changed, 286 insertions(+), 101 deletions(-) diff --git a/lib/membrane_rtsp/parser.ex b/lib/membrane_rtsp/parser.ex index ccb3450..b9fb16c 100644 --- a/lib/membrane_rtsp/parser.ex +++ b/lib/membrane_rtsp/parser.ex @@ -32,12 +32,16 @@ defmodule Membrane.RTSP.Parser do {:ok, RTSP.Request.transport_header()} | {:error, :invalid_header} def parse_transport_header(header) do case Transport.parse_transport_header(header) do - {:ok, [transport, mode | parameters], _rest, _context, _line, _byte_offset} -> + {:ok, [transport, network_mode | parameters], _rest, _context, _line, _byte_offset} -> + parameters = Map.new(parameters) + mode = String.downcase(parameters["mode"] || "play") + {:ok, [ transport: String.to_atom(transport), + network_mode: String.to_atom(network_mode), mode: String.to_atom(mode), - parameters: Map.new(parameters) + parameters: Map.delete(parameters, "mode") ]} {:error, _reason, _rest, _context, _line, _byte_offset} -> diff --git a/lib/membrane_rtsp/parser/transport.ex b/lib/membrane_rtsp/parser/transport.ex index fc68ffb..1356d3c 100644 --- a/lib/membrane_rtsp/parser/transport.ex +++ b/lib/membrane_rtsp/parser/transport.ex @@ -13,10 +13,14 @@ defmodule Membrane.RTSP.Parser.Transport do |> optional(string("=") |> concat(param_value)) single_value_param = - choice(Enum.map(["ssrc", "mode", "source"], &string/1)) + choice(Enum.map(["ssrc", "source"], &string/1)) |> string("=") |> concat(param_value) + play_mode = to_charlist("PLAY") |> Enum.reduce(empty(), &ascii_char(&2, [&1, &1 + 32])) + record_mode = to_charlist("RECORD") |> Enum.reduce(empty(), &ascii_char(&2, [&1, &1 + 32])) + mode_param = string("mode") |> string("=") |> choice([play_mode, record_mode]) + integer_value_param = choice(Enum.map(["ttl", "layers"], &string/1)) |> string("=") @@ -33,6 +37,7 @@ defmodule Membrane.RTSP.Parser.Transport do |> choice([ string("append"), optional_value_param, + mode_param, single_value_param, integer_value_param, integer_range_param @@ -54,6 +59,7 @@ defmodule Membrane.RTSP.Parser.Transport do case Enum.reverse(args) do [key] -> {key, nil} [key, "=", value] when is_binary(value) or key in ["ttl", "layers"] -> {key, value} + ["mode", "=" | value] -> {"mode", to_string(value)} [key, "=", value] -> {key, {value, value + 1}} [key, "=", min_value, max_value] -> {key, {min_value, max_value}} end diff --git a/lib/membrane_rtsp/request.ex b/lib/membrane_rtsp/request.ex index 9ff81d2..022baac 100644 --- a/lib/membrane_rtsp/request.ex +++ b/lib/membrane_rtsp/request.ex @@ -17,7 +17,8 @@ defmodule Membrane.RTSP.Request do @type transport_header :: [ transport: :TCP | :UDP, - mode: :unicast | :multicast, + network_mode: :unicast | :multicast, + mode: :play | :record, parameters: map() ] @@ -123,19 +124,19 @@ defmodule Membrane.RTSP.Request do ``` iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP;unicast;client_port=30001-30002"}]} iex> Request.parse_transport_header(req) - {:ok, [transport: :UDP, mode: :unicast, parameters: %{"client_port" => {30001, 30002}}]} + {:ok, [transport: :UDP, network_mode: :unicast, mode: :play, parameters: %{"client_port" => {30001, 30002}}]} iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP;ttl=15"}]} iex> Request.parse_transport_header(req) - {:ok, [transport: :UDP, mode: :multicast, parameters: %{"ttl" => 15}]} + {:ok, [transport: :UDP, network_mode: :multicast, mode: :play, parameters: %{"ttl" => 15}]} - iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP/TCP;unicast;interleaved=0-1"}]} + iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"}]} iex> Request.parse_transport_header(req) - {:ok, [transport: :TCP, mode: :unicast, parameters: %{"interleaved" => {0, 1}}]} + {:ok, [transport: :TCP, network_mode: :unicast, mode: :record, parameters: %{"interleaved" => {0, 1}}]} iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP"}]} iex> Request.parse_transport_header(req) - {:ok, [transport: :UDP, mode: :multicast, parameters: %{}]} + {:ok, [transport: :UDP, network_mode: :multicast, mode: :play, parameters: %{}]} iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AV"}]} iex> Request.parse_transport_header(req) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index b491dc8..870e6d9 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -4,10 +4,9 @@ defmodule Membrane.RTSP.Server.Conn do require Logger + alias Membrane.RTSP.Request alias Membrane.RTSP.Server.Logic - @max_request_size 1_000_000 - @spec start(map()) :: GenServer.on_start() def start(state) do GenServer.start(__MODULE__, state) @@ -30,27 +29,59 @@ defmodule Membrane.RTSP.Server.Conn do @impl true def handle_continue(:process_client_requests, state) do - do_process_client_requests(state) + do_process_client_requests(state, state.session_timeout) state.request_handler.handle_closed_connection(state.request_handler_state) {:stop, :normal, state} end - defp do_process_client_requests(state) do - with {:ok, request} <- get_request(state.socket, state.session_timeout) do - request - |> Logic.process_request(state) - |> do_process_client_requests() + defp do_process_client_requests(state, timeout) do + with {:ok, request} <- get_request(state.socket, timeout) do + case Logic.process_request(request, state) do + %{session_state: :recording} = state -> + do_process_client_requests(state, :infinity) + + state -> + do_process_client_requests(state, state.session_timeout) + end + end + end + + defp get_request(socket, timeout, acc \\ "") do + with {:ok, acc} <- do_recv(socket, timeout, acc) do + headers_and_body = String.split(acc, ~r/\r?\n\r?\n/, parts: 2) + + case do_parse_request(headers_and_body) do + :more -> get_request(socket, timeout, acc) + other -> other + end end end - defp get_request(socket, timeout, request \\ "") do - with {:ok, packet} <- :gen_tcp.recv(socket, 0, timeout), - request <- request <> packet, - false <- byte_size(request) > @max_request_size do - if packet != "\r\n", do: get_request(socket, timeout, request), else: {:ok, request} - else + defp do_recv(socket, timeout, acc) do + case :gen_tcp.recv(socket, 0, timeout) do + {:ok, data} -> {:ok, acc <> data} {:error, reason} -> {:error, reason} - true -> {:error, :max_request_size_exceeded} end end + + defp do_parse_request([raw_request, body]) do + case Request.parse(raw_request <> "\r\n\r\n") do + {:ok, request} -> + content_length = + case Request.get_header(request, "Content-Length") do + {:ok, value} -> String.to_integer(value) + _error -> 0 + end + + case byte_size(body) >= content_length do + true -> {:ok, %Request{request | body: :binary.part(body, 0, content_length)}} + false -> :more + end + + _error -> + {:error, :invalid_request} + end + end + + defp do_parse_request(_raw_request), do: :more end diff --git a/lib/membrane_rtsp/server/handler.ex b/lib/membrane_rtsp/server/handler.ex index f8af0cb..990a5b6 100644 --- a/lib/membrane_rtsp/server/handler.ex +++ b/lib/membrane_rtsp/server/handler.ex @@ -113,8 +113,8 @@ defmodule Membrane.RTSP.Server.Handler do @doc """ Optional callback called when the server is initialized. - The argument is a term passed to the server as the `handler_config` option. - The returned value will be used as a state and passed as the last + The argument is a term passed to the server as the `handler_config` option. + The returned value will be used as a state and passed as the last argument to the subsequent callbacks. Default behavior is to return the argument unchanged. @@ -142,12 +142,22 @@ defmodule Membrane.RTSP.Server.Handler do """ @callback handle_describe(request(), state()) :: {Response.t(), state()} + @doc """ + Callback called when receiving an ANNOUNCE request. + + An announce request contains media description (`body` field of the request) of + the tracks that a client wish to publish to the server. + """ + @callback handle_announce(request(), state()) :: {Response.t(), state()} + @doc """ Callback called when receiving a SETUP request. The handler should check for the validity of the requested track (`path` field of the `Request` struct). + + The `mode` argument provides the context of the setup, it's either `:play` or `:record`. """ - @callback handle_setup(request(), state()) :: {Response.t(), state()} + @callback handle_setup(request(), mode :: :play | :record, state()) :: {Response.t(), state()} @doc """ Callback called when receiving a PLAY request. @@ -157,6 +167,14 @@ defmodule Membrane.RTSP.Server.Handler do """ @callback handle_play(configured_media_context(), state()) :: {Response.t(), state()} + @doc """ + Callback called when receiving a RECORD request. + + `configured_media_context` contains the needed information to start receving media packets. + Refer to the type documentation for more details + """ + @callback handle_record(configured_media_context(), state()) :: {Response.t(), state()} + @doc """ Callback called when receiving a PAUSE request. @@ -174,7 +192,7 @@ defmodule Membrane.RTSP.Server.Handler do """ @callback handle_teardown(state()) :: {Response.t(), state()} - @optional_callbacks init: 1 + @optional_callbacks init: 1, handle_announce: 2, handle_record: 2 defmacro __using__(_options) do quote do @@ -183,7 +201,13 @@ defmodule Membrane.RTSP.Server.Handler do @impl true def init(config), do: config - defoverridable init: 1 + @impl true + def handle_announce(_req, state), do: {Response.new(501), state} + + @impl true + def handle_record(_req, state), do: {Response.new(501), state} + + defoverridable init: 1, handle_announce: 2, handle_record: 2 end end end diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index f903dbf..278908b 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -1,11 +1,36 @@ defmodule Membrane.RTSP.Server.Logic do - @moduledoc false + @moduledoc """ + Logic for RTSP Server + """ + + require Logger + import Mockery.Macro alias Membrane.RTSP.{Request, Response, Server} @server "MembraneRTSP/#{Mix.Project.config()[:version]} (Membrane Framework RTSP Server)" - @allowed_methods ["GET_PARAMETER", "OPTIONS", "DESCRIBE", "SETUP", "PLAY", "PAUSE", "TEARDOWN"] + @allowed_methods [ + "GET_PARAMETER", + "OPTIONS", + "ANNOUNCE", + "DESCRIBE", + "SETUP", + "PLAY", + "RECORD", + "PAUSE", + "TEARDOWN" + ] + + @udp_port_range 1000..65_000//2 + + defguardp can_play(state) + when map_size(state.configured_media) != 0 and + state.session_state in [:ready, :paused] + + defguardp can_record(state) + when map_size(state.incoming_media) != 0 and + state.session_state in [:ready, :paused] defmodule State do @moduledoc false @@ -17,6 +42,7 @@ defmodule Membrane.RTSP.Server.Logic do :request_handler_state, :session_timeout, configured_media: %{}, + incoming_media: %{}, session_id: UUID.uuid4(), session_state: :init ] @@ -28,8 +54,9 @@ defmodule Membrane.RTSP.Server.Logic do request_handler: module(), request_handler_state: term(), configured_media: Server.Handler.configured_media_context(), + incoming_media: Server.Handler.configured_media_context(), session_id: binary(), - session_state: :init | :ready | :playing | :paused, + session_state: :init | :ready | :playing | :recording | :paused, session_timeout: non_neg_integer() } end @@ -37,17 +64,10 @@ defmodule Membrane.RTSP.Server.Logic do @spec allowed_methods() :: [binary()] def allowed_methods(), do: @allowed_methods - @spec process_request(binary(), State.t()) :: State.t() - def process_request(raw_request, %State{} = state) do - {response, state} = - case Request.parse(raw_request) do - {:ok, request} -> - {response, state} = do_handle_request(request, state) - {maybe_add_cseq_header(response, request), state} - - {:error, _reason} -> - {Response.new(400), state} - end + @spec process_request(Request.t(), State.t()) :: State.t() + def process_request(request, %State{} = state) do + {response, state} = do_handle_request(request, state) + response = maybe_add_cseq_header(response, request) response |> Response.with_header("Server", @server) @@ -83,18 +103,22 @@ defmodule Membrane.RTSP.Server.Logic do with {:ok, transport_opts} <- Request.parse_transport_header(request), :ok <- validate_transport_parameters(transport_opts, state) do {response, request_handler_state} = - state.request_handler.handle_setup(request, state.request_handler_state) + state.request_handler.handle_setup( + request, + transport_opts[:mode], + state.request_handler_state + ) {response, state} = do_handle_setup_response(request, response, transport_opts, state) {response, %{state | request_handler_state: request_handler_state}} else - _error -> + error -> + Logger.error("SETUP request failed due to: #{inspect(error)}") {Response.new(400), %{state | request_handler_state: state.request_handler_state}} end end - defp do_handle_request(%Request{method: "PLAY"}, state) - when state.session_state in [:ready, :paused] do + defp do_handle_request(%Request{method: "PLAY"}, state) when can_play(state) do {response, request_handler_state} = state.request_handler.handle_play(state.configured_media, state.request_handler_state) @@ -120,20 +144,53 @@ defmodule Membrane.RTSP.Server.Logic do end end + defp do_handle_request(%Request{method: "ANNOUNCE"} = req, state) do + case ExSDP.parse(req.body) do + {:ok, sdp} -> + {response, request_handler_state} = + state.request_handler.handle_announce( + %Request{req | body: sdp}, + state.request_handler_state + ) + + {response, %{state | request_handler_state: request_handler_state}} + + error -> + Logger.error(""" + ANNOUNCE request failed: could not parse the request body to a valid sdp + error: #{inspect(error)} + body: #{inspect(req.body, limit: :infinity)} + """) + + {Response.new(400), state} + end + + {Response.new(200), state} + end + + defp do_handle_request(%Request{method: "RECORD"}, state) when can_record(state) do + {response, handler_state} = state.request_handler.handle_record(state.incoming_media, state) + {response, %{state | request_handler_state: handler_state, session_state: :recording}} + end + defp do_handle_request(%Request{method: "TEARDOWN"}, state) when state.session_state in [:init, :ready] do + close_open_ports(state) + Response.new(200) |> inject_session_header(state) - |> then(&{&1, %{state | configured_media: %{}, session_state: :init}}) + |> then(&{&1, %{state | configured_media: %{}, incoming_media: %{}, session_state: :init}}) end defp do_handle_request(%Request{method: "TEARDOWN"}, state) do {response, _handler_state} = state.request_handler.handle_teardown(state.request_handler_state) + close_open_ports(state) + response |> inject_session_header(state) - |> then(&{&1, %{state | session_state: :init, configured_media: %{}}}) + |> then(&{&1, %{state | session_state: :init, configured_media: %{}, incoming_media: %{}}}) end defp do_handle_request(%Request{}, state) do @@ -142,11 +199,16 @@ defmodule Membrane.RTSP.Server.Logic do # TODO: Add more validation for transport parameters defp validate_transport_parameters(transport_opts, state) do + transport = transport_opts[:transport] + cond do - transport_opts[:mode] == :multicast -> + transport_opts[:network_mode] == :multicast -> {:error, :multicast_not_supported} - transport_opts[:transport] == :UDP and is_nil(state.rtp_socket) -> + transport_opts[:mode] == :record and transport != :UDP -> + {:error, :unsupported_transport} + + transport_opts[:mode] == :play and transport == :UDP and is_nil(state.rtp_socket) -> {:error, :udp_not_supported} true -> @@ -159,24 +221,36 @@ defmodule Membrane.RTSP.Server.Logic do if Response.ok?(response) do track_config = build_track_config(transport_opts, state) - resp_transport_header = build_resp_transport_header(request, track_config) - configured_media = Map.put(state.configured_media, request.path, track_config) + state = + case transport_opts[:mode] do + :play -> + %{ + state + | configured_media: Map.put(state.configured_media, request.path, track_config) + } + + :record -> + %{state | incoming_media: Map.put(state.incoming_media, request.path, track_config)} + end + resp_transport_header = build_resp_transport_header(request, track_config) response = Response.with_header(response, "Transport", resp_transport_header) - {response, - %{ - state - | configured_media: configured_media, - session_state: :ready - }} + {response, %{state | session_state: :ready}} else {response, state} end end defp build_track_config(transport_opts, state) do + case transport_opts[:mode] do + :play -> build_play_track_config(transport_opts, state) + :record -> build_record_track_config(transport_opts, state) + end + end + + defp build_play_track_config(transport_opts, state) do <> = :crypto.strong_rand_bytes(4) case transport_opts[:transport] do @@ -202,9 +276,63 @@ defmodule Membrane.RTSP.Server.Logic do end end + defp build_record_track_config(transport_opts, state) do + case transport_opts[:transport] do + :TCP -> + %{ + transport: :TCP, + tcp_socket: state.socket, + channels: transport_opts[:parameters]["interleaved"] + } + + :UDP -> + case find_rtp_ports(0) do + {rtp_socket, rtcp_socket} -> + {:ok, {address, _port}} = :inet.peername(state.socket) + + %{ + transport: :UDP, + rtp_socket: rtp_socket, + rtcp_socket: rtcp_socket, + address: address, + client_port: transport_opts[:parameters]["client_port"] + } + + :error -> + {:error, :udp_port_failure} + end + end + end + + defp find_rtp_ports(attempt) when attempt >= 100, do: :error + + defp find_rtp_ports(attempt) do + rtp_port = Enum.random(@udp_port_range) + + case :gen_udp.open(rtp_port, [:binary, active: false]) do + {:ok, rtp_socket} -> + case :gen_udp.open(rtp_port + 1, [:binary, active: false]) do + {:ok, rtcp_socket} -> + {rtp_socket, rtcp_socket} + + _error -> + :gen_udp.close(rtp_socket) + find_rtp_ports(attempt + 1) + end + + _error -> + find_rtp_ports(attempt + 1) + end + end + defp build_resp_transport_header(request, track_config) do {:ok, req_header} = Request.get_header(request, "Transport") - resp_header = req_header <> ";ssrc=#{Integer.to_string(track_config.ssrc, 16)}" + + resp_header = + case track_config[:ssrc] do + nil -> req_header + ssrc -> req_header <> ";ssrc=#{Integer.to_string(ssrc, 16)}" + end case track_config.transport do :TCP -> @@ -233,4 +361,14 @@ defmodule Membrane.RTSP.Server.Logic do state.session_id <> ";timeout=#{timeout_in_seconds}" ) end + + defp close_open_ports(%State{} = state) do + state.incoming_media + |> Map.values() + |> Enum.filter(&(&1.transport == :UDP)) + |> Enum.each(fn in_media -> + :ok = :inet.close(in_media.rtp_socket) + :ok = :inet.close(in_media.rtcp_socket) + end) + end end diff --git a/test/membrane_rtsp/server/server_logic_test.exs b/test/membrane_rtsp/server/server_logic_test.exs index 9a9bd69..b949f19 100644 --- a/test/membrane_rtsp/server/server_logic_test.exs +++ b/test/membrane_rtsp/server/server_logic_test.exs @@ -15,8 +15,10 @@ defmodule Membrane.RTSP.ServerLogicTest do state = %State{ socket: %{}, request_handler: FakeHandler, - request_handler_state: FakeHandler.handle_open_connection(nil), - session_timeout: :timer.minutes(1) + request_handler_state: FakeHandler.handle_open_connection(nil, []), + session_timeout: :timer.minutes(1), + incoming_media: %{}, + configured_media: %{} } [state: state] @@ -30,7 +32,6 @@ defmodule Membrane.RTSP.ServerLogicTest do assert state == %Request{method: "OPTIONS"} - |> Request.stringify(@url) |> Logic.process_request(state) end @@ -39,7 +40,6 @@ defmodule Membrane.RTSP.ServerLogicTest do assert state == %Request{method: "GET_PARAMETER"} - |> Request.stringify(@url) |> Logic.process_request(state) end @@ -65,9 +65,7 @@ defmodule Membrane.RTSP.ServerLogicTest do expected_uri = URI.to_string(@url) assert %{request_handler_state: %{described_url: ^expected_uri}} = - %Request{method: "DESCRIBE"} - |> Request.stringify(@url) - |> Logic.process_request(state) + Logic.process_request(%Request{method: "DESCRIBE", path: expected_uri}, state) end describe "handle SETUP request" do @@ -85,9 +83,8 @@ defmodule Membrane.RTSP.ServerLogicTest do end) state = - %Request{method: "SETUP"} + %Request{method: "SETUP", path: control_path} |> Request.with_header("Transport", "RTP/AVP/TCP;unicast;interleaved=0-1") - |> Request.stringify(%URI{@url | path: "/stream/trackId=0"}) |> Logic.process_request(state) assert state.session_state == :ready @@ -109,13 +106,9 @@ defmodule Membrane.RTSP.ServerLogicTest do assert ^state = %Request{method: "SETUP"} |> Request.with_header("Transport", "RTP/AVP") - |> Request.stringify(@url) |> Logic.process_request(state) - assert ^state = - %Request{method: "SETUP"} - |> Request.stringify(@url) - |> Logic.process_request(state) + assert ^state = Logic.process_request(%Request{method: "SETUP"}, state) end test "multicast not supported", %{state: state} do @@ -126,7 +119,6 @@ defmodule Membrane.RTSP.ServerLogicTest do assert ^state = %Request{method: "SETUP"} |> Request.with_header("Transport", "RTP/AVP;multicast") - |> Request.stringify(@url) |> Logic.process_request(state) end @@ -138,7 +130,6 @@ defmodule Membrane.RTSP.ServerLogicTest do assert ^state = %Request{method: "SETUP"} |> Request.with_header("Transport", "RTP/AVP;unicast;client_port=3000-3001") - |> Request.stringify(@url) |> Logic.process_request(state) end @@ -150,18 +141,17 @@ defmodule Membrane.RTSP.ServerLogicTest do end) assert ^state = - %Request{method: "SETUP"} - |> Request.stringify(@url) + %Request{method: "SETUP", path: @url} |> Logic.process_request(state) end end describe "handle PLAY request" do test "handle PLAY request", %{state: state} do - uri = %URI{@url | path: "/stream/trackId=0"} + uri = %URI{@url | path: "/stream/trackId=0"} |> URI.to_string() configured_media = %{ - URI.to_string(uri) => %{ + uri => %{ ssrc: :rand.uniform(100_000), transport: :UDP, rtp_socket: %{}, @@ -182,8 +172,7 @@ defmodule Membrane.RTSP.ServerLogicTest do end) assert %{session_state: :playing} = - %Request{method: "PLAY"} - |> Request.stringify(uri) + %Request{method: "PLAY", path: uri} |> Logic.process_request(state) end @@ -193,8 +182,7 @@ defmodule Membrane.RTSP.ServerLogicTest do end) assert ^state = - %Request{method: "PLAY"} - |> Request.stringify(@url) + %Request{method: "PLAY", path: @url} |> Logic.process_request(state) end end @@ -211,7 +199,6 @@ defmodule Membrane.RTSP.ServerLogicTest do assert %{session_state: :init, configured_media: %{}} = %Request{method: "TEARDOWN"} - |> Request.stringify(@url) |> Logic.process_request(state) end @@ -222,8 +209,7 @@ defmodule Membrane.RTSP.ServerLogicTest do mock(:gen_tcp, [send: 2], fn %{}, response -> assert response =~ "RTSP/1.0 200 OK" end) assert %{session_state: :init, configured_media: %{}} = - %Request{method: "TEARDOWN"} - |> Request.stringify(@url) + %Request{method: "TEARDOWN", path: @url} |> Logic.process_request(state) end end @@ -233,16 +219,6 @@ defmodule Membrane.RTSP.ServerLogicTest do assert response =~ "RTSP/1.0 501 Not Implemented" end) - request = "ANNOUNCE rtsp://localhost:554/stream RTSP/1.0\r\n\r\n" - assert ^state = Logic.process_request(request, state) - end - - test "parse invalid request returns Bad Request", %{state: state} do - mock(:gen_tcp, [send: 2], fn %{}, response -> - assert response =~ "RTSP/1.0 400 Bad Request" - end) - - request = "OPTIONS rtsp://localhost:554/stream RTSP\r\n" - assert ^state = Logic.process_request(request, state) + assert ^state = Logic.process_request(%Request{method: "SET_PARAMETER"}, state) end end diff --git a/test/support/fake_handler.ex b/test/support/fake_handler.ex index 571dc36..6b80f3c 100644 --- a/test/support/fake_handler.ex +++ b/test/support/fake_handler.ex @@ -1,12 +1,12 @@ defmodule Membrane.RTSP.Server.FakeHandler do @moduledoc false - @behaviour Membrane.RTSP.Server.Handler + use Membrane.RTSP.Server.Handler import Mockery.Macro @impl true - def handle_open_connection(_conn), do: %{} + def handle_open_connection(_conn, _state), do: %{} @impl true def handle_closed_connection(_state), do: :ok @@ -17,7 +17,12 @@ defmodule Membrane.RTSP.Server.FakeHandler do end @impl true - def handle_setup(request, state) do + def handle_setup(request, _mode, state) do + mockable(__MODULE__).respond(request, state) + end + + @impl true + def handle_announce(request, state) do mockable(__MODULE__).respond(request, state) end From ae4bce1373b8d52e42972c045dbc47d1e5de6364 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Sun, 22 Dec 2024 23:07:15 +0100 Subject: [PATCH 2/7] add tests --- README.md | 4 +- lib/membrane_rtsp/server/logic.ex | 23 +++-- .../server/server_logic_test.exs | 91 +++++++++++++++++++ 3 files changed, 107 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 5fec56c..58795a9 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,9 @@ [![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtsp.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtsp) -The RTSP client for Elixir +The RTSP client and server for Elixir -Currently supports only RTSP 1.1 defined by +Currently supports only RTSP 1.0 defined by [RFC2326](https://tools.ietf.org/html/rfc2326) ## Installation diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index 278908b..1250838 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -23,14 +23,15 @@ defmodule Membrane.RTSP.Server.Logic do ] @udp_port_range 1000..65_000//2 + @find_udp_port_max_attempts 1000 defguardp can_play(state) when map_size(state.configured_media) != 0 and - state.session_state in [:ready, :paused] + state.session_state in [:ready, :paused_playing] defguardp can_record(state) when map_size(state.incoming_media) != 0 and - state.session_state in [:ready, :paused] + state.session_state in [:ready, :paused_recording] defmodule State do @moduledoc false @@ -56,7 +57,8 @@ defmodule Membrane.RTSP.Server.Logic do configured_media: Server.Handler.configured_media_context(), incoming_media: Server.Handler.configured_media_context(), session_id: binary(), - session_state: :init | :ready | :playing | :recording | :paused, + session_state: + :init | :ready | :playing | :recording | :paused_playing | :paused_recording, session_timeout: non_neg_integer() } end @@ -131,14 +133,19 @@ defmodule Membrane.RTSP.Server.Logic do end end - defp do_handle_request(%Request{method: "PAUSE"}, %{session_state: :playing} = state) do + defp do_handle_request(%Request{method: "PAUSE"}, state) + when state.session_state in [:playing, :recording] do {response, request_handler_state} = state.request_handler.handle_pause(state.request_handler_state) response = inject_session_header(response, state) if Response.ok?(response) do - {response, %{state | request_handler_state: request_handler_state, session_state: :paused}} + session_state = + if state.session_state == :playing, do: :paused_playing, else: :paused_recording + + {response, + %{state | request_handler_state: request_handler_state, session_state: session_state}} else {response, %{state | request_handler_state: request_handler_state}} end @@ -164,8 +171,6 @@ defmodule Membrane.RTSP.Server.Logic do {Response.new(400), state} end - - {Response.new(200), state} end defp do_handle_request(%Request{method: "RECORD"}, state) when can_record(state) do @@ -288,7 +293,7 @@ defmodule Membrane.RTSP.Server.Logic do :UDP -> case find_rtp_ports(0) do {rtp_socket, rtcp_socket} -> - {:ok, {address, _port}} = :inet.peername(state.socket) + {:ok, {address, _port}} = mockable(:inet).peername(state.socket) %{ transport: :UDP, @@ -304,7 +309,7 @@ defmodule Membrane.RTSP.Server.Logic do end end - defp find_rtp_ports(attempt) when attempt >= 100, do: :error + defp find_rtp_ports(attempt) when attempt >= @find_udp_port_max_attempts, do: :error defp find_rtp_ports(attempt) do rtp_port = Enum.random(@udp_port_range) diff --git a/test/membrane_rtsp/server/server_logic_test.exs b/test/membrane_rtsp/server/server_logic_test.exs index b949f19..fc13c26 100644 --- a/test/membrane_rtsp/server/server_logic_test.exs +++ b/test/membrane_rtsp/server/server_logic_test.exs @@ -68,6 +68,59 @@ defmodule Membrane.RTSP.ServerLogicTest do Logic.process_request(%Request{method: "DESCRIBE", path: expected_uri}, state) end + describe "handle ANNOUNCE request" do + test "announce media streams", %{state: state} do + req_body = """ + v=0 + o=- 0 0 IN IP4 127.0.0.1 + s=No Name + c=IN IP4 127.0.0.1 + t=0 0 + a=tool:libavformat 58.29.100 + m=video 0 RTP/AVP 96 + b=AS:1529 + a=rtpmap:96 H264/90000 + a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z0LAKNoB4BnsBagICAoAAAMAAgAAAwBkHjBlQA==,aM48gA==; profile-level-id=42C028 + a=control:streamid=0 + """ + + mock(FakeHandler, [respond: 2], fn %Request{ + body: %ExSDP{session_name: "No Name", media: medias} + }, + %{} -> + assert length(medias) == 1 + {Response.new(200), %{}} + end) + + mock(:gen_tcp, [send: 2], fn %{}, response -> + assert response =~ "RTSP/1.0 200 OK" + end) + + expected_uri = URI.to_string(@url) + + assert ^state = + Logic.process_request( + %Request{method: "ANNOUNCE", body: req_body, path: expected_uri}, + state + ) + end + + test "announce with invalid sdp", %{state: state} do + req_body = """ + v=0 + o=- 0 0 IN IP4 127.0.0.1 + s=No Name + b=X-YZ:256 + """ + + mock(:gen_tcp, [send: 2], fn %{}, response -> + assert response =~ "RTSP/1.0 400 Bad Request" + end) + + assert ^state = Logic.process_request(%Request{method: "ANNOUNCE", body: req_body}, state) + end + end + describe "handle SETUP request" do test "setup track", %{state: state} do control_path = URI.to_string(@url) <> "/trackId=0" @@ -98,6 +151,44 @@ defmodule Membrane.RTSP.ServerLogicTest do } = state.configured_media end + test "setup record track", %{state: state} do + control_path = URI.to_string(@url) <> "/trackId=0" + + mock(FakeHandler, [respond: 2], fn request, %{} -> + assert request.path == control_path + {Response.new(200), state} + end) + + mock(:inet, [peername: 1], fn _socket -> {:ok, {{127, 0, 0, 1}, 0}} end) + + mock(:gen_tcp, [send: 2], fn %{}, response -> + assert response =~ "RTSP/1.0 200 OK" + assert response =~ "\r\nSession: #{state.session_id};timeout=60\r\n" + end) + + state = + %Request{method: "SETUP", path: control_path} + |> Request.with_header("Transport", "RTP/AVP;unicast;client_port=1000-1001;mode=record") + |> Logic.process_request(state) + + assert state.session_state == :ready + + assert %{ + ^control_path => %{ + transport: :UDP, + client_port: {1000, 1001}, + rtp_socket: rtp_socket, + rtcp_socket: rtcp_socket + } + } = state.incoming_media + + assert rtp_socket + assert rtcp_socket + + :gen_udp.close(rtp_socket) + :gen_udp.close(rtcp_socket) + end + test "invalid/missing transport header", %{state: state} do mock(:gen_tcp, [send: 2], fn %{}, response -> assert response =~ "RTSP/1.0 400 Bad Request" From 8a8f4ec39dd9326b1d4a243912a8d0daba7a01db Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Sun, 22 Dec 2024 23:16:32 +0100 Subject: [PATCH 3/7] fix handle record request --- lib/membrane_rtsp/server/logic.ex | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index 1250838..1b2b83d 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -175,7 +175,12 @@ defmodule Membrane.RTSP.Server.Logic do defp do_handle_request(%Request{method: "RECORD"}, state) when can_record(state) do {response, handler_state} = state.request_handler.handle_record(state.incoming_media, state) - {response, %{state | request_handler_state: handler_state, session_state: :recording}} + + if Response.ok?(response) do + {response, %{state | request_handler_state: handler_state, session_state: :recording}} + else + {response, %{state | request_handler_state: handler_state}} + end end defp do_handle_request(%Request{method: "TEARDOWN"}, state) From a651181b84c7f780bdb15d6d8ad5c6ceb204b61d Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Tue, 24 Dec 2024 23:47:26 +0100 Subject: [PATCH 4/7] pass handler state to handle record --- lib/membrane_rtsp/server/logic.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index 1b2b83d..de63b97 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -174,7 +174,8 @@ defmodule Membrane.RTSP.Server.Logic do end defp do_handle_request(%Request{method: "RECORD"}, state) when can_record(state) do - {response, handler_state} = state.request_handler.handle_record(state.incoming_media, state) + {response, handler_state} = + state.request_handler.handle_record(state.incoming_media, state.request_handler_state) if Response.ok?(response) do {response, %{state | request_handler_state: handler_state, session_state: :recording}} From f9df8a5f6402e8ae95fffe44deb0c58a36ed7ed8 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Tue, 21 Jan 2025 12:18:48 +0100 Subject: [PATCH 5/7] update rtsp version in livebook --- livebook/basic_server.livemd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/livebook/basic_server.livemd b/livebook/basic_server.livemd index 1ac9ac6..5b0f611 100644 --- a/livebook/basic_server.livemd +++ b/livebook/basic_server.livemd @@ -3,7 +3,7 @@ ```elixir Mix.install([ {:membrane_core, "~> 1.0.0"}, - {:membrane_rtsp, "~> 0.6.0"}, + {:membrane_rtsp, "~> 0.6"}, {:membrane_rtp_plugin, "~> 0.24.0"}, {:membrane_rtp_h264_plugin, "~> 0.19.0"}, {:membrane_h264_plugin, "~> 0.9"}, From 744290b11c1ee5ffab32f150f1753ba5e5423874 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Tue, 21 Jan 2025 12:22:41 +0100 Subject: [PATCH 6/7] fix credo warning --- lib/membrane_rtsp/server/logic.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index de63b97..0c57859 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -2,11 +2,10 @@ defmodule Membrane.RTSP.Server.Logic do @moduledoc """ Logic for RTSP Server """ + import Mockery.Macro require Logger - import Mockery.Macro - alias Membrane.RTSP.{Request, Response, Server} @server "MembraneRTSP/#{Mix.Project.config()[:version]} (Membrane Framework RTSP Server)" From deb719bb14ae2f2267950fb64558860951155895 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Tue, 21 Jan 2025 12:27:28 +0100 Subject: [PATCH 7/7] make server logic module private --- lib/membrane_rtsp/server/logic.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index 0c57859..a5f8afb 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -1,7 +1,7 @@ defmodule Membrane.RTSP.Server.Logic do - @moduledoc """ - Logic for RTSP Server - """ + @moduledoc false + # Logic for RTSP Server + import Mockery.Macro require Logger