Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to record with TCP transport (interleaved mode) #52

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
30 changes: 30 additions & 0 deletions lib/membrane_rtsp/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ defmodule Membrane.RTSP.Server do
GenServer.call(server, :port_number)
end

@doc """
In interleaved TCP mode we want to pass control over the client connection socket to the pipeline (usually).

This function allows to transfer the control over such socket to a specified process.
"""
@spec transfer_client_socket_control(
server_pid :: pid() | GenServer.name(),
client_conn_pid :: pid(),
new_controlling_process_pid :: pid()
) :: :ok | {:error, :unknown_conn | :closed | :not_owner | :badarg | :inet.posix()}
def transfer_client_socket_control(server, conn_pid, new_controlling_process) do
GenServer.call(server, {:transfer_client_socket_control, conn_pid, new_controlling_process})
end

@impl true
def init(config) do
address = config[:address] || :any
Expand Down Expand Up @@ -147,6 +161,21 @@ defmodule Membrane.RTSP.Server do
{:reply, :inet.port(state.socket), state}
end

@impl true
def handle_call(
{:transfer_client_socket_control, conn_pid, new_controlling_process},
_from,
state
) do
case Map.fetch(state.client_conns, conn_pid) do
{:ok, socket} ->
{:reply, :gen_tcp.controlling_process(socket, new_controlling_process), state}

:error ->
{:reply, {:error, :unknown_conn}, state}
end
end

@impl true
def handle_info({:new_connection, client_socket}, state) do
child_state =
Expand Down Expand Up @@ -187,6 +216,7 @@ defmodule Membrane.RTSP.Server do
defp do_listen(socket, parent_pid) do
case :gen_tcp.accept(socket) do
{:ok, client_socket} ->
:ok = :gen_tcp.controlling_process(client_socket, parent_pid)
send(parent_pid, {:new_connection, client_socket})
do_listen(socket, parent_pid)

Expand Down
32 changes: 28 additions & 4 deletions lib/membrane_rtsp/server/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,39 @@ defmodule Membrane.RTSP.Server.Conn do

@impl true
def handle_continue(:process_client_requests, state) do
do_process_client_requests(state, state.session_timeout)
state.request_handler.handle_closed_connection(state.request_handler_state)
{:stop, :normal, state}
case do_process_client_requests(state, state.session_timeout) do
%Logic.State{recording_with_tcp?: true} = state ->
{:noreply, state}

_other ->
state.request_handler.handle_closed_connection(state.request_handler_state)
{:stop, :normal, state}
end
end

@impl true
def handle_info({:rtsp, raw_rtsp_request}, state) do
with {:ok, request} <- Request.parse(raw_rtsp_request) do
case Logic.process_request(request, state) do
%Logic.State{recording_with_tcp?: true} = state ->
{:noreply, state}

state ->
handle_continue(:process_client_requests, state)
end
else
{:error, error} ->
raise "Failed to parse RTSP request: #{inspect(error)}.\nRequest: #{inspect(raw_rtsp_request)}"
end
end

defp do_process_client_requests(state, timeout) do
with {:ok, request} <- get_request(state.socket, timeout) do
case Logic.process_request(request, state) do
%{session_state: :recording} = state ->
%Logic.State{recording_with_tcp?: true} = state ->
state

%Logic.State{session_state: :recording} = state ->
do_process_client_requests(state, :infinity)

state ->
Expand Down
53 changes: 43 additions & 10 deletions lib/membrane_rtsp/server/logic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ defmodule Membrane.RTSP.Server.Logic do
:rtcp_socket,
:request_handler_state,
:session_timeout,
:transport_opts,
configured_media: %{},
incoming_media: %{},
session_id: UUID.uuid4(),
session_state: :init
session_state: :init,
recording_with_tcp?: false
]

@type t :: %__MODULE__{
Expand Down Expand Up @@ -111,7 +113,9 @@ defmodule Membrane.RTSP.Server.Logic do
)

{response, state} = do_handle_setup_response(request, response, transport_opts, state)
{response, %{state | request_handler_state: request_handler_state}}

{response,
%{state | request_handler_state: request_handler_state, transport_opts: transport_opts}}
else
error ->
Logger.error("SETUP request failed due to: #{inspect(error)}")
Expand Down Expand Up @@ -144,7 +148,12 @@ defmodule Membrane.RTSP.Server.Logic do
if state.session_state == :playing, do: :paused_playing, else: :paused_recording

{response,
%{state | request_handler_state: request_handler_state, session_state: session_state}}
%{
state
| request_handler_state: request_handler_state,
session_state: session_state,
recording_with_tcp?: false
}}
else
{response, %{state | request_handler_state: request_handler_state}}
end
Expand Down Expand Up @@ -176,10 +185,19 @@ defmodule Membrane.RTSP.Server.Logic do
{response, handler_state} =
state.request_handler.handle_record(state.incoming_media, state.request_handler_state)

tcp_interleaved_mode? =
state.transport_opts[:transport] == :TCP

if Response.ok?(response) do
{response, %{state | request_handler_state: handler_state, session_state: :recording}}
{response,
%{
state
| request_handler_state: handler_state,
session_state: :recording,
recording_with_tcp?: tcp_interleaved_mode?
}}
else
{response, %{state | request_handler_state: handler_state}}
{response, %{state | request_handler_state: handler_state, recording_with_tcp?: false}}
end
end

Expand All @@ -189,7 +207,16 @@ defmodule Membrane.RTSP.Server.Logic do

Response.new(200)
|> inject_session_header(state)
|> then(&{&1, %{state | configured_media: %{}, incoming_media: %{}, session_state: :init}})
|> then(
&{&1,
%{
state
| configured_media: %{},
incoming_media: %{},
session_state: :init,
recording_with_tcp?: false
}}
)
end

defp do_handle_request(%Request{method: "TEARDOWN"}, state) do
Expand All @@ -200,7 +227,16 @@ defmodule Membrane.RTSP.Server.Logic do

response
|> inject_session_header(state)
|> then(&{&1, %{state | session_state: :init, configured_media: %{}, incoming_media: %{}}})
|> then(
&{&1,
%{
state
| session_state: :init,
configured_media: %{},
incoming_media: %{},
recording_with_tcp?: false
}}
)
end

defp do_handle_request(%Request{}, state) do
Expand All @@ -215,9 +251,6 @@ defmodule Membrane.RTSP.Server.Logic do
transport_opts[:network_mode] == :multicast ->
{:error, :multicast_not_supported}

transport_opts[:mode] == :record and transport != :UDP ->
{:error, :unsupported_transport}

transport_opts[:mode] == :play and transport == :UDP and is_nil(state.rtp_socket) ->
{:error, :udp_not_supported}

Expand Down
11 changes: 11 additions & 0 deletions test/membrane_rtsp/server/server_logic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,17 @@ defmodule Membrane.RTSP.ServerLogicTest do
%Request{method: "TEARDOWN", path: @url}
|> Logic.process_request(state)
end

test "resets recording_with_tcp? flag", %{state: state} do
state = %State{state | recording_with_tcp?: true, session_state: :playing}

mock(FakeHandler, [respond: 2], fn nil, state -> {Response.new(200), state} end)
mock(:gen_tcp, [send: 2], fn %{}, response -> assert response =~ "RTSP/1.0 200 OK" end)

assert %{recording_with_tcp?: false} =
%Request{method: "TEARDOWN", path: @url}
|> Logic.process_request(state)
end
end

test "return 501 (Not Implemented) for not supported methods", %{state: state} do
Expand Down