Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to record with TCP transport (interleaved mode) #52

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
30 changes: 30 additions & 0 deletions lib/membrane_rtsp/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ defmodule Membrane.RTSP.Server do
GenServer.call(server, :port_number)
end

@doc """
In interleaved TCP mode we want to pass control over the client connection socket to the pipeline (usually).

This function allows to transfer the control over such socket to a specified process.
"""
@spec transfer_client_socket_control(
server_pid :: pid() | GenServer.name(),
client_conn_pid :: pid(),
new_controlling_process_pid :: pid()
) :: :ok | {:error, :unknown_conn | :closed | :not_owner | :badarg | :inet.posix()}
def transfer_client_socket_control(server, conn_pid, new_controlling_process) do
GenServer.call(server, {:transfer_client_socket_control, conn_pid, new_controlling_process})
end

@impl true
def init(config) do
address = config[:address] || :any
Expand Down Expand Up @@ -147,6 +161,21 @@ defmodule Membrane.RTSP.Server do
{:reply, :inet.port(state.socket), state}
end

@impl true
def handle_call(
{:transfer_client_socket_control, conn_pid, new_controlling_process},
_from,
state
) do
case Map.fetch(state.client_conns, conn_pid) do
{:ok, socket} ->
{:reply, :gen_tcp.controlling_process(socket, new_controlling_process), state}

:error ->
{:reply, {:error, :unknown_conn}, state}
end
end

@impl true
def handle_info({:new_connection, client_socket}, state) do
child_state =
Expand Down Expand Up @@ -187,6 +216,7 @@ defmodule Membrane.RTSP.Server do
defp do_listen(socket, parent_pid) do
case :gen_tcp.accept(socket) do
{:ok, client_socket} ->
:ok = :gen_tcp.controlling_process(client_socket, parent_pid)
send(parent_pid, {:new_connection, client_socket})
do_listen(socket, parent_pid)

Expand Down
43 changes: 39 additions & 4 deletions lib/membrane_rtsp/server/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,50 @@ defmodule Membrane.RTSP.Server.Conn do

@impl true
def handle_continue(:process_client_requests, state) do
do_process_client_requests(state, state.session_timeout)
state.request_handler.handle_closed_connection(state.request_handler_state)
{:stop, :normal, state}
case do_process_client_requests(state, state.session_timeout) do
%Logic.State{recording?: true} = state ->
{:noreply, state}

state ->
state.request_handler.handle_closed_connection(state.request_handler_state)
{:stop, :normal, state}
end
end

@impl true
def handle_info({:rtsp, %Request{} = rtsp_request}, state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler should just send the raw rtsp message, no need to parse it beforehand.

Copy link
Author

@maslowalex maslowalex Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is to extract the message from the raw TCP stream. How do we know that the RTSP message we are building is done? Is match on "\r\n\r\n" really enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the handler responsibility so he can do the parsing however it wants. You can check here for an example on how it's done on the client side.

However we still need to address the following issue, when an rtsp message is sent to the server, it should parse it and then depending on the returned result, we may need to return control to the server. For example when a client send GET_PARAMETER we can just parse it and send the response, however if it sends PAUSE, we need to start listening for rtsp messages in the server, since the handler is no longer responsible for parsing them.

@impl true
def handle_info({:rtsp, raw_rtsp_request}, state) do
  with {:ok, request} <- Request.parse(raw_rtsp_request) do
    case Logic.process_request(request, state) do
      %Logic.State{recording?: true} = state ->
        {:noreply, state}

      state ->
        handle_continue(:process_client_requests, state)
    end
  end
end

I think it's fine to raise in case the rtsp message is not valid.
Anyway the rtsp session (tcp socket) should only be closed when the client close it even TEARDOWN should not close it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can delete this.

case Logic.process_request(rtsp_request, state) do
%Logic.State{recording?: true} = state ->
{:noreply, state}

state ->
handle_continue(:process_client_requests, state)
end
end

@impl true
def handle_info({:rtsp, raw_rtsp_request}, state) do
with {:ok, request} <- Request.parse(raw_rtsp_request) do
case Logic.process_request(request, state) do
%Logic.State{recording?: true} = state ->
{:noreply, state}

state ->
handle_continue(:process_client_requests, state)
end
else
{:error, error} ->
raise "Failed to parse RTSP request: #{inspect(error)}.\nRequest: #{inspect(raw_rtsp_request)}"
end
end

defp do_process_client_requests(state, timeout) do
with {:ok, request} <- get_request(state.socket, timeout) do
case Logic.process_request(request, state) do
%{session_state: :recording} = state ->
%Logic.State{recording?: true} = state ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the field recording?, it's confusing, what about naming it recording_with_tcp? ??

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, I think recording? is confusing in this context.

state

%Logic.State{session_state: :recording} = state ->
do_process_client_requests(state, :infinity)

state ->
Expand Down
53 changes: 43 additions & 10 deletions lib/membrane_rtsp/server/logic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ defmodule Membrane.RTSP.Server.Logic do
:rtcp_socket,
:request_handler_state,
:session_timeout,
:transport_opts,
configured_media: %{},
incoming_media: %{},
session_id: UUID.uuid4(),
session_state: :init
session_state: :init,
recording?: false
]

@type t :: %__MODULE__{
Expand Down Expand Up @@ -111,7 +113,9 @@ defmodule Membrane.RTSP.Server.Logic do
)

{response, state} = do_handle_setup_response(request, response, transport_opts, state)
{response, %{state | request_handler_state: request_handler_state}}

{response,
%{state | request_handler_state: request_handler_state, transport_opts: transport_opts}}
else
error ->
Logger.error("SETUP request failed due to: #{inspect(error)}")
Expand Down Expand Up @@ -144,7 +148,12 @@ defmodule Membrane.RTSP.Server.Logic do
if state.session_state == :playing, do: :paused_playing, else: :paused_recording

{response,
%{state | request_handler_state: request_handler_state, session_state: session_state}}
%{
state
| request_handler_state: request_handler_state,
session_state: session_state,
recording?: false
}}
else
{response, %{state | request_handler_state: request_handler_state}}
end
Expand Down Expand Up @@ -176,10 +185,19 @@ defmodule Membrane.RTSP.Server.Logic do
{response, handler_state} =
state.request_handler.handle_record(state.incoming_media, state.request_handler_state)

tcp_interleaved_mode? =
state.transport_opts[:transport] == :TCP

if Response.ok?(response) do
{response, %{state | request_handler_state: handler_state, session_state: :recording}}
{response,
%{
state
| request_handler_state: handler_state,
session_state: :recording,
recording?: tcp_interleaved_mode?
}}
else
{response, %{state | request_handler_state: handler_state}}
{response, %{state | request_handler_state: handler_state, recording?: false}}
end
end

Expand All @@ -189,7 +207,16 @@ defmodule Membrane.RTSP.Server.Logic do

Response.new(200)
|> inject_session_header(state)
|> then(&{&1, %{state | configured_media: %{}, incoming_media: %{}, session_state: :init}})
|> then(
&{&1,
%{
state
| configured_media: %{},
incoming_media: %{},
session_state: :init,
recording?: false
}}
)
end

defp do_handle_request(%Request{method: "TEARDOWN"}, state) do
Expand All @@ -200,7 +227,16 @@ defmodule Membrane.RTSP.Server.Logic do

response
|> inject_session_header(state)
|> then(&{&1, %{state | session_state: :init, configured_media: %{}, incoming_media: %{}}})
|> then(
&{&1,
%{
state
| session_state: :init,
configured_media: %{},
incoming_media: %{},
recording?: false
}}
)
end

defp do_handle_request(%Request{}, state) do
Expand All @@ -215,9 +251,6 @@ defmodule Membrane.RTSP.Server.Logic do
transport_opts[:network_mode] == :multicast ->
{:error, :multicast_not_supported}

transport_opts[:mode] == :record and transport != :UDP ->
{:error, :unsupported_transport}

transport_opts[:mode] == :play and transport == :UDP and is_nil(state.rtp_socket) ->
{:error, :udp_not_supported}

Expand Down
11 changes: 11 additions & 0 deletions test/membrane_rtsp/server/server_logic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,17 @@ defmodule Membrane.RTSP.ServerLogicTest do
%Request{method: "TEARDOWN", path: @url}
|> Logic.process_request(state)
end

test "resets recording? flag", %{state: state} do
state = %State{state | recording?: true, session_state: :playing}

mock(FakeHandler, [respond: 2], fn nil, state -> {Response.new(200), state} end)
mock(:gen_tcp, [send: 2], fn %{}, response -> assert response =~ "RTSP/1.0 200 OK" end)

assert %{recording?: false} =
%Request{method: "TEARDOWN", path: @url}
|> Logic.process_request(state)
end
end

test "return 501 (Not Implemented) for not supported methods", %{state: state} do
Expand Down