From ec5ab5f9bcd1e710cd207c1e62538d198c5b085d Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 28 Feb 2025 10:46:49 +0200 Subject: [PATCH 01/10] Allow to record with TCP transport (interleaved mode) --- lib/membrane_rtsp/server.ex | 30 ++++++++++++++++++++++++++++++ lib/membrane_rtsp/server/conn.ex | 16 ++++++++++++---- lib/membrane_rtsp/server/logic.ex | 25 ++++++++++++++++++------- 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/lib/membrane_rtsp/server.ex b/lib/membrane_rtsp/server.ex index f176038..36d65d4 100644 --- a/lib/membrane_rtsp/server.ex +++ b/lib/membrane_rtsp/server.ex @@ -97,6 +97,20 @@ defmodule Membrane.RTSP.Server do GenServer.call(server, :port_number) end + @doc """ + In interleaved TCP mode we want to pass control over the client connection socket to the pipeline (usually). + + This function allows to transfer the control over such socket to a specified process. + """ + @spec transfer_client_socket_control( + server_pid :: pid() | GenServer.name(), + client_conn_pid :: pid(), + new_controlling_process_pid :: pid() + ) :: :ok | {:error, :unknown_conn | :closed | :not_owner | :badarg | :inet.posix()} + def transfer_client_socket_control(server, conn_pid, new_controlling_process) do + GenServer.call(server, {:transfer_client_socket_control, conn_pid, new_controlling_process}) + end + @impl true def init(config) do address = config[:address] || :any @@ -147,6 +161,21 @@ defmodule Membrane.RTSP.Server do {:reply, :inet.port(state.socket), state} end + @impl true + def handle_call( + {:transfer_client_socket_control, conn_pid, new_controlling_process}, + _from, + state + ) do + case Map.fetch(state.client_conns, conn_pid) do + {:ok, socket} -> + {:reply, :gen_tcp.controlling_process(socket, new_controlling_process), state} + + :error -> + {:reply, {:error, :unknown_conn}, state} + end + end + @impl true def handle_info({:new_connection, client_socket}, state) do child_state = @@ -187,6 +216,7 @@ defmodule Membrane.RTSP.Server do defp do_listen(socket, parent_pid) do case :gen_tcp.accept(socket) do {:ok, client_socket} -> + :ok = :gen_tcp.controlling_process(client_socket, parent_pid) send(parent_pid, {:new_connection, client_socket}) do_listen(socket, parent_pid) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index 870e6d9..813bd64 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -29,15 +29,23 @@ defmodule Membrane.RTSP.Server.Conn do @impl true def handle_continue(:process_client_requests, state) do - do_process_client_requests(state, state.session_timeout) - state.request_handler.handle_closed_connection(state.request_handler_state) - {:stop, :normal, state} + case do_process_client_requests(state, state.session_timeout) do + %Logic.State{recording?: true} = state -> + {:noreply, state} + + state -> + state.request_handler.handle_closed_connection(state.request_handler_state) + {:stop, :normal, state} + end end defp do_process_client_requests(state, timeout) do with {:ok, request} <- get_request(state.socket, timeout) do case Logic.process_request(request, state) do - %{session_state: :recording} = state -> + %Logic.State{recording?: true} = state -> + state + + %Logic.State{session_state: :recording} = state -> do_process_client_requests(state, :infinity) state -> diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index a5f8afb..b032fda 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -41,10 +41,12 @@ defmodule Membrane.RTSP.Server.Logic do :rtcp_socket, :request_handler_state, :session_timeout, + :transport_opts, configured_media: %{}, incoming_media: %{}, session_id: UUID.uuid4(), - session_state: :init + session_state: :init, + recording?: false ] @type t :: %__MODULE__{ @@ -111,7 +113,9 @@ defmodule Membrane.RTSP.Server.Logic do ) {response, state} = do_handle_setup_response(request, response, transport_opts, state) - {response, %{state | request_handler_state: request_handler_state}} + + {response, + %{state | request_handler_state: request_handler_state, transport_opts: transport_opts}} else error -> Logger.error("SETUP request failed due to: #{inspect(error)}") @@ -176,10 +180,20 @@ defmodule Membrane.RTSP.Server.Logic do {response, handler_state} = state.request_handler.handle_record(state.incoming_media, state.request_handler_state) + tcp_interleaved_mode? = + state.transport_opts[:transport] == :TCP && state.transport_opts[:mode] == :record + if Response.ok?(response) do - {response, %{state | request_handler_state: handler_state, session_state: :recording}} + {response, + %{ + state + | request_handler_state: handler_state, + session_state: :recording, + recording?: tcp_interleaved_mode? + }} else - {response, %{state | request_handler_state: handler_state}} + {response, + %{state | request_handler_state: handler_state, recording?: tcp_interleaved_mode?}} end end @@ -215,9 +229,6 @@ defmodule Membrane.RTSP.Server.Logic do transport_opts[:network_mode] == :multicast -> {:error, :multicast_not_supported} - transport_opts[:mode] == :record and transport != :UDP -> - {:error, :unsupported_transport} - transport_opts[:mode] == :play and transport == :UDP and is_nil(state.rtp_socket) -> {:error, :udp_not_supported} From ef449d8238de09e24f885ce7b320bfa46e0c7a4a Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 28 Feb 2025 11:06:41 +0200 Subject: [PATCH 02/10] Allow Server.Conn to handle :rtsp requests from outside the process --- lib/membrane_rtsp/server/conn.ex | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index 813bd64..ce73962 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -39,6 +39,18 @@ defmodule Membrane.RTSP.Server.Conn do end end + @impl true + def handle_info({:rtsp, %Request{} = rtsp_request}, state) do + case Logic.process_request(rtsp_request, state) do + %Logic.State{recording?: true} = state -> + {:noreply, state} + + state -> + state.request_handler.handle_closed_connection(state.request_handler_state) + {:stop, :normal, state} + end + end + defp do_process_client_requests(state, timeout) do with {:ok, request} <- get_request(state.socket, timeout) do case Logic.process_request(request, state) do From 168a08af3482debac74b66f11c672e9714a4d30e Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Mon, 3 Mar 2025 10:27:32 +0200 Subject: [PATCH 03/10] support handling of raw rtsp messages in Conn --- lib/membrane_rtsp/server/conn.ex | 12 ++++++++++++ lib/membrane_rtsp/server/logic.ex | 5 ++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index ce73962..ea85fed 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -51,6 +51,18 @@ defmodule Membrane.RTSP.Server.Conn do end end + @impl true + def handle_info({:rtsp, raw_rtsp_request}, state) do + case Request.parse(raw_rtsp_request) do + {:ok, rtsp_request} -> + handle_info({:rtsp, rtsp_request}, state) + + _error -> + Logger.warning("Failed to parse RTSP request: #{inspect(raw_rtsp_request)}") + {:noreply, state} + end + end + defp do_process_client_requests(state, timeout) do with {:ok, request} <- get_request(state.socket, timeout) do case Logic.process_request(request, state) do diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index b032fda..bd2b062 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -181,7 +181,7 @@ defmodule Membrane.RTSP.Server.Logic do state.request_handler.handle_record(state.incoming_media, state.request_handler_state) tcp_interleaved_mode? = - state.transport_opts[:transport] == :TCP && state.transport_opts[:mode] == :record + state.transport_opts[:transport] == :TCP if Response.ok?(response) do {response, @@ -192,8 +192,7 @@ defmodule Membrane.RTSP.Server.Logic do recording?: tcp_interleaved_mode? }} else - {response, - %{state | request_handler_state: handler_state, recording?: tcp_interleaved_mode?}} + {response, %{state | request_handler_state: handler_state, recording?: false}} end end From 650f49a88e3c76c2f9b2632d6ce21ecb0816898b Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 7 Mar 2025 11:27:04 +0200 Subject: [PATCH 04/10] do not initiate the connection closing when handling RTSP request in Conn --- lib/membrane_rtsp/server/conn.ex | 16 +++++++------- lib/membrane_rtsp/server/logic.ex | 22 +++++++++++++++++-- .../server/server_logic_test.exs | 11 ++++++++++ 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index ea85fed..a641fe6 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -46,20 +46,20 @@ defmodule Membrane.RTSP.Server.Conn do {:noreply, state} state -> - state.request_handler.handle_closed_connection(state.request_handler_state) - {:stop, :normal, state} + handle_continue(:process_client_requests, state) end end @impl true def handle_info({:rtsp, raw_rtsp_request}, state) do - case Request.parse(raw_rtsp_request) do - {:ok, rtsp_request} -> - handle_info({:rtsp, rtsp_request}, state) + with {:ok, request} <- Request.parse(raw_rtsp_request) do + case Logic.process_request(request, state) do + %Logic.State{recording?: true} = state -> + {:noreply, state} - _error -> - Logger.warning("Failed to parse RTSP request: #{inspect(raw_rtsp_request)}") - {:noreply, state} + state -> + handle_continue(:process_client_requests, state) + end end end diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index bd2b062..1d7555c 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -202,7 +202,16 @@ defmodule Membrane.RTSP.Server.Logic do Response.new(200) |> inject_session_header(state) - |> then(&{&1, %{state | configured_media: %{}, incoming_media: %{}, session_state: :init}}) + |> then( + &{&1, + %{ + state + | configured_media: %{}, + incoming_media: %{}, + session_state: :init, + recording?: false + }} + ) end defp do_handle_request(%Request{method: "TEARDOWN"}, state) do @@ -213,7 +222,16 @@ defmodule Membrane.RTSP.Server.Logic do response |> inject_session_header(state) - |> then(&{&1, %{state | session_state: :init, configured_media: %{}, incoming_media: %{}}}) + |> then( + &{&1, + %{ + state + | session_state: :init, + configured_media: %{}, + incoming_media: %{}, + recording?: false + }} + ) end defp do_handle_request(%Request{}, state) do diff --git a/test/membrane_rtsp/server/server_logic_test.exs b/test/membrane_rtsp/server/server_logic_test.exs index fc13c26..e4bf65a 100644 --- a/test/membrane_rtsp/server/server_logic_test.exs +++ b/test/membrane_rtsp/server/server_logic_test.exs @@ -303,6 +303,17 @@ defmodule Membrane.RTSP.ServerLogicTest do %Request{method: "TEARDOWN", path: @url} |> Logic.process_request(state) end + + test "resets recording? flag", %{state: state} do + state = %State{state | recording?: true, session_state: :playing} + + mock(FakeHandler, [respond: 2], fn nil, state -> {Response.new(200), state} end) + mock(:gen_tcp, [send: 2], fn %{}, response -> assert response =~ "RTSP/1.0 200 OK" end) + + assert %{recording?: false} = + %Request{method: "TEARDOWN", path: @url} + |> Logic.process_request(state) + end end test "return 501 (Not Implemented) for not supported methods", %{state: state} do From 279f762d6666c82c1d757d829789c2dd3649cd3f Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 7 Mar 2025 11:31:31 +0200 Subject: [PATCH 05/10] raise in Conn on invalid raw RTSP message --- lib/membrane_rtsp/server/conn.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index a641fe6..d79505f 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -60,6 +60,9 @@ defmodule Membrane.RTSP.Server.Conn do state -> handle_continue(:process_client_requests, state) end + else + {:error, error} -> + raise "Failed to parse RTSP request: #{inspect(error)}.\nRequest: #{inspect(raw_rtsp_request)}" end end From 6f31ea222ef5bd8682acbb25853660c56bdc7f58 Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 7 Mar 2025 12:29:39 +0200 Subject: [PATCH 06/10] set recording? to false on successful PAUSE request --- lib/membrane_rtsp/server/logic.ex | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index 1d7555c..5b8bfbe 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -148,7 +148,12 @@ defmodule Membrane.RTSP.Server.Logic do if state.session_state == :playing, do: :paused_playing, else: :paused_recording {response, - %{state | request_handler_state: request_handler_state, session_state: session_state}} + %{ + state + | request_handler_state: request_handler_state, + session_state: session_state, + recording?: false + }} else {response, %{state | request_handler_state: request_handler_state}} end From 9f9755e746d04a0030d519b6832edfe9d30be7aa Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 7 Mar 2025 15:59:43 +0200 Subject: [PATCH 07/10] handle {:error, :closed} gracefully in Conn --- lib/membrane_rtsp/server/conn.ex | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index d79505f..94744bd 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -33,9 +33,14 @@ defmodule Membrane.RTSP.Server.Conn do %Logic.State{recording?: true} = state -> {:noreply, state} - state -> + %Logic.State{} = state -> state.request_handler.handle_closed_connection(state.request_handler_state) {:stop, :normal, state} + + {:error, :closed} -> + state.request_handler.handle_closed_connection(state.request_handler_state) + + {:stop, :normal, state} end end From d125b73600ca2b7da242de655082461f7b9af860 Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 7 Mar 2025 16:09:53 +0200 Subject: [PATCH 08/10] refactored code, fixed dialyzer error --- lib/membrane_rtsp/server/conn.ex | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index 94744bd..e81c4f9 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -33,14 +33,9 @@ defmodule Membrane.RTSP.Server.Conn do %Logic.State{recording?: true} = state -> {:noreply, state} - %Logic.State{} = state -> + _other -> state.request_handler.handle_closed_connection(state.request_handler_state) {:stop, :normal, state} - - {:error, :closed} -> - state.request_handler.handle_closed_connection(state.request_handler_state) - - {:stop, :normal, state} end end From 105c620847bcd8ae03b92fe15b1b6bc091bfc1ee Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 7 Mar 2025 16:11:55 +0200 Subject: [PATCH 09/10] renamed recording? flag to recording_with_tcp? --- lib/membrane_rtsp/server/conn.ex | 8 ++++---- lib/membrane_rtsp/server/logic.ex | 12 ++++++------ test/membrane_rtsp/server/server_logic_test.exs | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index e81c4f9..e08220b 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -30,7 +30,7 @@ defmodule Membrane.RTSP.Server.Conn do @impl true def handle_continue(:process_client_requests, state) do case do_process_client_requests(state, state.session_timeout) do - %Logic.State{recording?: true} = state -> + %Logic.State{recording_with_tcp?: true} = state -> {:noreply, state} _other -> @@ -42,7 +42,7 @@ defmodule Membrane.RTSP.Server.Conn do @impl true def handle_info({:rtsp, %Request{} = rtsp_request}, state) do case Logic.process_request(rtsp_request, state) do - %Logic.State{recording?: true} = state -> + %Logic.State{recording_with_tcp?: true} = state -> {:noreply, state} state -> @@ -54,7 +54,7 @@ defmodule Membrane.RTSP.Server.Conn do def handle_info({:rtsp, raw_rtsp_request}, state) do with {:ok, request} <- Request.parse(raw_rtsp_request) do case Logic.process_request(request, state) do - %Logic.State{recording?: true} = state -> + %Logic.State{recording_with_tcp?: true} = state -> {:noreply, state} state -> @@ -69,7 +69,7 @@ defmodule Membrane.RTSP.Server.Conn do defp do_process_client_requests(state, timeout) do with {:ok, request} <- get_request(state.socket, timeout) do case Logic.process_request(request, state) do - %Logic.State{recording?: true} = state -> + %Logic.State{recording_with_tcp?: true} = state -> state %Logic.State{session_state: :recording} = state -> diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index 5b8bfbe..65e869f 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -46,7 +46,7 @@ defmodule Membrane.RTSP.Server.Logic do incoming_media: %{}, session_id: UUID.uuid4(), session_state: :init, - recording?: false + recording_with_tcp?: false ] @type t :: %__MODULE__{ @@ -152,7 +152,7 @@ defmodule Membrane.RTSP.Server.Logic do state | request_handler_state: request_handler_state, session_state: session_state, - recording?: false + recording_with_tcp?: false }} else {response, %{state | request_handler_state: request_handler_state}} @@ -194,10 +194,10 @@ defmodule Membrane.RTSP.Server.Logic do state | request_handler_state: handler_state, session_state: :recording, - recording?: tcp_interleaved_mode? + recording_with_tcp?: tcp_interleaved_mode? }} else - {response, %{state | request_handler_state: handler_state, recording?: false}} + {response, %{state | request_handler_state: handler_state, recording_with_tcp?: false}} end end @@ -214,7 +214,7 @@ defmodule Membrane.RTSP.Server.Logic do | configured_media: %{}, incoming_media: %{}, session_state: :init, - recording?: false + recording_with_tcp?: false }} ) end @@ -234,7 +234,7 @@ defmodule Membrane.RTSP.Server.Logic do | session_state: :init, configured_media: %{}, incoming_media: %{}, - recording?: false + recording_with_tcp?: false }} ) end diff --git a/test/membrane_rtsp/server/server_logic_test.exs b/test/membrane_rtsp/server/server_logic_test.exs index e4bf65a..671ff68 100644 --- a/test/membrane_rtsp/server/server_logic_test.exs +++ b/test/membrane_rtsp/server/server_logic_test.exs @@ -304,13 +304,13 @@ defmodule Membrane.RTSP.ServerLogicTest do |> Logic.process_request(state) end - test "resets recording? flag", %{state: state} do - state = %State{state | recording?: true, session_state: :playing} + test "resets recording_with_tcp? flag", %{state: state} do + state = %State{state | recording_with_tcp?: true, session_state: :playing} mock(FakeHandler, [respond: 2], fn nil, state -> {Response.new(200), state} end) mock(:gen_tcp, [send: 2], fn %{}, response -> assert response =~ "RTSP/1.0 200 OK" end) - assert %{recording?: false} = + assert %{recording_with_tcp?: false} = %Request{method: "TEARDOWN", path: @url} |> Logic.process_request(state) end From 8048a26aa01b28dabb3bb3c760ab097e8c660abe Mon Sep 17 00:00:00 2001 From: Alex Maslo Date: Fri, 7 Mar 2025 17:50:24 +0200 Subject: [PATCH 10/10] removed the clause that allowed to process a parsed RTSP request in Conn --- lib/membrane_rtsp/server/conn.ex | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index e08220b..1648cb9 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -39,17 +39,6 @@ defmodule Membrane.RTSP.Server.Conn do end end - @impl true - def handle_info({:rtsp, %Request{} = rtsp_request}, state) do - case Logic.process_request(rtsp_request, state) do - %Logic.State{recording_with_tcp?: true} = state -> - {:noreply, state} - - state -> - handle_continue(:process_client_requests, state) - end - end - @impl true def handle_info({:rtsp, raw_rtsp_request}, state) do with {:ok, request} <- Request.parse(raw_rtsp_request) do