Skip to content

Commit

Permalink
Remove ConnectionManager as a separate process (#10)
Browse files Browse the repository at this point in the history
* Remove ConnectionManager as a separate process

* Rename initialize to establish

* Apply reviewer suggestion
  • Loading branch information
Noarkhh authored Jul 11, 2024
1 parent 7290230 commit ff7aeff
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 223 deletions.
115 changes: 52 additions & 63 deletions lib/membrane_rtsp_plugin/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ defmodule Membrane.RTSP.Source do

alias __MODULE__
alias __MODULE__.ConnectionManager
alias Membrane.RTSP.TCP.Decapsulator
alias Membrane.Time
alias Membrane.{RTSP, Time}

@type transport ::
{:udp, port_range_start :: non_neg_integer(), port_range_end :: non_neg_integer()}
Expand Down Expand Up @@ -77,17 +76,19 @@ defmodule Membrane.RTSP.Source do
transport: Source.transport(),
timeout: Time.t(),
keep_alive_interval: Time.t(),
connection_manager: ConnectionManager.t(),
tracks: [ConnectionManager.track()],
ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()}
ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()},
rtsp_session: Membrane.RTSP.t() | nil,
keep_alive_timer: reference() | nil
}

@enforce_keys [:stream_uri, :allowed_media_types, :transport, :timeout, :keep_alive_interval]
defstruct @enforce_keys ++
[
connection_manager: nil,
tracks: [],
ssrc_to_track: %{}
ssrc_to_track: %{},
rtsp_session: nil,
keep_alive_timer: nil
]
end

Expand All @@ -100,17 +101,8 @@ defmodule Membrane.RTSP.Source do

@impl true
def handle_setup(_ctx, state) do
opts =
Map.take(state, [
:stream_uri,
:allowed_media_types,
:transport,
:timeout,
:keep_alive_interval
])

{:ok, connection_manager} = ConnectionManager.start_link(opts)
{[], %{state | connection_manager: connection_manager}}
state = ConnectionManager.establish_connection(state)
{[spec: create_sources_spec(state)], state}
end

@impl true
Expand All @@ -134,7 +126,7 @@ defmodule Membrane.RTSP.Source do

@impl true
def handle_child_notification({:request_socket_control, _socket, pid}, :tcp_source, _ctx, state) do
ConnectionManager.transfer_rtsp_socket_control(state.connection_manager, pid)
RTSP.transfer_socket_control(state.rtsp_session, pid)
{[], state}
end

Expand All @@ -145,8 +137,7 @@ defmodule Membrane.RTSP.Source do

@impl true
def handle_child_playing(:rtp_session, _ctx, state) do
send(state.connection_manager, :source_ready)
{[], state}
{[], ConnectionManager.play(state)}
end

@impl true
Expand All @@ -155,48 +146,8 @@ defmodule Membrane.RTSP.Source do
end

@impl true
def handle_info(%{rtsp_session: rtsp_session, tracks: tracks}, _ctx, state) do
fmt_mapping =
Enum.map(tracks, fn %{rtpmap: rtpmap} ->
{rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}}
end)
|> Enum.into(%{})

case state.transport do
:tcp ->
{:tcp, socket} = List.first(tracks).transport

spec =
child(:tcp_source, %Membrane.TCP.Source{
connection_side: :client,
local_socket: socket
})
|> child(:tcp_depayloader, %Decapsulator{rtsp_session: rtsp_session})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping})

{[spec: spec], %{state | tracks: tracks}}

{:udp, _port_range_start, _port_range_end} ->
spec =
[
child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping})
| Enum.flat_map(tracks, fn track ->
{:udp, rtp_port, rtcp_port} = track.transport

[
child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session),
child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session)
]
end)
]

{[spec: spec], %{state | tracks: tracks}}
end
def handle_info(:keep_alive, _ctx, state) do
{[], ConnectionManager.keep_alive(state)}
end

@impl true
Expand All @@ -219,10 +170,48 @@ defmodule Membrane.RTSP.Source do

@impl true
def handle_terminate_request(_ctx, state) do
ConnectionManager.stop(state.connection_manager)
{[terminate: :normal], state}
end

@spec create_sources_spec(State.t()) :: Membrane.ChildrenSpec.t()
defp create_sources_spec(state) do
fmt_mapping =
Enum.map(state.tracks, fn %{rtpmap: rtpmap} ->
{rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}}
end)
|> Enum.into(%{})

case state.transport do
:tcp ->
{:tcp, socket} = List.first(state.tracks).transport

child(:tcp_source, %Membrane.TCP.Source{
connection_side: :client,
local_socket: socket
})
|> child(:tcp_depayloader, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping})

{:udp, _port_range_start, _port_range_end} ->
[
child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping})
| Enum.flat_map(state.tracks, fn track ->
{:udp, rtp_port, rtcp_port} = track.transport

[
child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session),
child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session)
]
end)
]
end
end

@spec get_rtp_depayloader(ConnectionManager.track()) :: module() | nil
defp get_rtp_depayloader(%{rtpmap: %{encoding: "H264"}}), do: Membrane.RTP.H264.Depayloader
defp get_rtp_depayloader(%{rtpmap: %{encoding: "H265"}}), do: Membrane.RTP.H265.Depayloader
Expand Down
163 changes: 34 additions & 129 deletions lib/membrane_rtsp_plugin/source/connection_manager.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
defmodule Membrane.RTSP.Source.ConnectionManager do
@moduledoc false

use GenServer

require Membrane.Logger
require Membrane.Pad

alias __MODULE__
alias Membrane.RTSP
alias Membrane.RTSP.Source.State

@content_type_header [{"accept", "application/sdp"}]

Expand All @@ -24,110 +23,61 @@ defmodule Membrane.RTSP.Source.ConnectionManager do
transport: track_transport()
}

defmodule State do
@moduledoc false
@type t :: %__MODULE__{
stream_uri: binary(),
allowed_media_types: ConnectionManager.media_types(),
transport: RTSP.Source.transport(),
timeout: Membrane.Time.t(),
keep_alive_interval: Membrane.Time.t(),
parent_pid: pid(),
rtsp_session: RTSP.t() | nil,
tracks: [ConnectionManager.track()],
keep_alive_timer: reference()
}

@enforce_keys [
:stream_uri,
:allowed_media_types,
:transport,
:timeout,
:keep_alive_interval,
:parent_pid
]
defstruct @enforce_keys ++
[
rtsp_session: nil,
tracks: [],
keep_alive_timer: nil
]
end

@typep connection_establishment_phase_return() ::
{:ok, State.t()} | {:error, reason :: term(), State.t()}

@spec start_link(connection_opts()) :: GenServer.on_start()
def start_link(options) do
GenServer.start_link(__MODULE__, Map.put(options, :parent_pid, self()))
end

@spec stop(pid()) :: :ok
def stop(server) do
GenServer.call(server, :stop)
end

@spec transfer_rtsp_socket_control(connection_manager :: pid(), new_controller :: pid()) :: :ok
def transfer_rtsp_socket_control(connection_manager, new_controller) do
GenServer.call(connection_manager, {:transfer_rtsp_socket_control, new_controller})
@spec transfer_rtsp_socket_control(RTSP.t(), pid()) :: :ok
def transfer_rtsp_socket_control(rtsp_session, new_controller) do
RTSP.transfer_socket_control(rtsp_session, new_controller)
end

@impl true
def init(options) do
state = struct(State, options)
send(self(), :connect)
{:ok, state}
end

@impl true
def handle_info(:connect, state) do
@spec establish_connection(State.t()) :: State.t()
def establish_connection(state) do
state =
with {:ok, state} <- start_rtsp_connection(state),
{:ok, state} <- get_rtsp_description(state),
{:ok, state} <- setup_rtsp_connection(state),
{:ok, state} <- prepare_source(state) do
{:ok, state} <- setup_rtsp_connection(state) do
state
else
{:error, reason, state} -> handle_rtsp_error(reason, state)
end

{:noreply, state}
state
end

@impl true
def handle_info(:source_ready, state) do
state =
case play(state) do
{:ok, state} ->
%{state | keep_alive_timer: start_keep_alive_timer(state)}
@spec play(State.t()) :: State.t()
def play(%State{transport: {:udp, _, _}} = state) do
Membrane.Logger.debug("ConnectionManager: Setting RTSP on play mode")

{:error, reason, state} ->
handle_rtsp_error(reason, state)
end
case RTSP.play(state.rtsp_session) do
{:ok, %{status: 200}} ->
%{state | keep_alive_timer: start_keep_alive_timer(state)}

{:noreply, state}
_error ->
handle_rtsp_error(:play_rtsp_failed, state)
end
end

@impl true
def handle_info(:keep_alive, state) do
{:noreply, keep_alive(state)}
end
def play(%State{transport: :tcp} = state) do
Membrane.Logger.debug("ConnectionManager: Setting RTSP on play mode")

@impl true
def handle_info(message, state) do
Membrane.Logger.warning("received unexpected message: #{inspect(message)}")
{:noreply, state}
RTSP.play_no_response(state.rtsp_session)
%{state | keep_alive_timer: start_keep_alive_timer(state)}
end

@impl true
def handle_call(:stop, _from, state) do
RTSP.close(state.rtsp_session)
{:stop, :normal, :ok, state}
end
@spec keep_alive(State.t()) :: State.t()
def keep_alive(state) do
Membrane.Logger.debug("Send GET_PARAMETER to keep session alive")

@impl true
def handle_call({:transfer_rtsp_socket_control, new_controller}, _from, state) do
{:reply, RTSP.transfer_socket_control(state.rtsp_session, new_controller), state}
case state.transport do
:tcp ->
RTSP.get_parameter_no_response(state.rtsp_session)

{:udp, _port_range_start, _port_range_end} ->
{:ok, %{status: 200}} = RTSP.get_parameter(state.rtsp_session)
end

%{state | keep_alive_timer: start_keep_alive_timer(state)}
end

@spec start_rtsp_connection(State.t()) :: connection_establishment_phase_return()
Expand Down Expand Up @@ -175,45 +125,6 @@ defmodule Membrane.RTSP.Source.ConnectionManager do
end
end

@spec prepare_source(State.t()) :: connection_establishment_phase_return()
defp prepare_source(state) do
notify_parent(%{rtsp_session: state.rtsp_session, tracks: state.tracks}, state)

{:ok, state}
end

@spec play(State.t()) :: connection_establishment_phase_return()
defp play(%{rtsp_session: rtsp_session, transport: {:udp, _, _}} = state) do
Membrane.Logger.debug("ConnectionManager: Setting RTSP on play mode")

case RTSP.play(rtsp_session) do
{:ok, %{status: 200}} -> {:ok, state}
_error -> {:error, :play_rtsp_failed, state}
end
end

defp play(%{rtsp_session: rtsp_session, transport: :tcp} = state) do
Membrane.Logger.debug("ConnectionManager: Setting RTSP on play mode")

RTSP.play_no_response(rtsp_session)
{:ok, state}
end

@spec keep_alive(State.t()) :: State.t()
defp keep_alive(state) do
Membrane.Logger.debug("Send GET_PARAMETER to keep session alive")

case state.transport do
:tcp ->
RTSP.get_parameter_no_response(state.rtsp_session)

{:udp, _port_range_start, _port_range_end} ->
{:ok, %{status: 200}} = RTSP.get_parameter(state.rtsp_session)
end

%{state | keep_alive_timer: start_keep_alive_timer(state)}
end

@spec start_keep_alive_timer(State.t()) :: reference()
defp start_keep_alive_timer(%{keep_alive_interval: interval}) do
Process.send_after(self(), :keep_alive, interval |> Membrane.Time.as_milliseconds(:round))
Expand Down Expand Up @@ -313,12 +224,6 @@ defmodule Membrane.RTSP.Source.ConnectionManager do
raise "RTSP connection failed, reason: #{inspect(reason)}"
end

@spec notify_parent(term(), State.t()) :: State.t()
defp notify_parent(msg, state) do
send(state.parent_pid, msg)
state
end

@spec get_tracks(RTSP.Response.t(), media_types()) :: [track()]
defp get_tracks(%{body: %ExSDP{media: media_list}}, stream_types) do
media_list
Expand Down
Loading

0 comments on commit ff7aeff

Please sign in to comment.