Skip to content

Commit

Permalink
Add an option for the source to send EOS when connection is closed by…
Browse files Browse the repository at this point in the history
… the RTSP Server (#11)

* WIP

* Allow specifying behavior on connection closed

* Use released tcp_plugin

* Fix tests, use released membrane_rtsp

* Don't send keepalive after end_of_stream

* Update deps

* Send tracks notification in a batch

* Use newer membrane_rtsp

* Update deps

* Update docs

* Bump version

* Minor change in tests

* Bump membrane_rtsp

* Revert sending new_tracks in a batch
  • Loading branch information
Noarkhh authored Sep 4, 2024
1 parent ff7aeff commit bc32983
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 56 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Add the following line to your deps in `mix.exs`:
```elixir
def deps do
[
{:membrane_rtsp_plugin, "~> 0.2.0"}
{:membrane_rtsp_plugin, "~> 0.3.0"}
]
end
```
Expand Down Expand Up @@ -68,4 +68,4 @@ defmodule RtspPipeline do
{[], state}
end
end
```
```
96 changes: 78 additions & 18 deletions lib/membrane_rtsp_plugin/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ defmodule Membrane.RTSP.Source do
* `H264`
* `H265`
### Notifications
* `{:new_track, ssrc, track}` - sent when the track is parsed and available for consumption by the next
elements. An output pad `Pad.ref(:output, ssrc)` should be linked to receive the data.
When the element finishes setting up all tracks it will send a `t:set_up_tracks/0` notification.
Each time a track is parsed and available for further processing the element will send a
`t:new_track/0` notification. An output pad `Pad.ref(:output, ssrc)` should be linked to receive
the data.
"""

use Membrane.Bin
Expand All @@ -25,6 +26,15 @@ defmodule Membrane.RTSP.Source do
alias __MODULE__.ConnectionManager
alias Membrane.{RTSP, Time}

@type set_up_tracks_notification :: {:set_up_tracks, [track()]}
@type new_track_notification :: {:new_track, ssrc :: pos_integer(), track :: track()}
@type track :: %{
control_path: String.t(),
type: :video | :audio | :application,
fmtp: ExSDP.Attribute.FMTP.t() | nil,
rtpmap: ExSDP.Attribute.RTPMapping.t() | nil
}

@type transport ::
{:udp, port_range_start :: non_neg_integer(), port_range_end :: non_neg_integer()}
| :tcp
Expand Down Expand Up @@ -62,6 +72,15 @@ defmodule Membrane.RTSP.Source do
Interval of a heartbeat sent to the RTSP server at a regular interval to
keep the session alive.
"""
],
on_connection_closed: [
spec: :raise_error | :send_eos,
default: :raise_error,
description: """
Defines the element's behavior if the TCP connection is closed by the RTSP server:
- `:raise_error` - Raise an error.
- `:send_eos` - Send an `:end_of_stream` to the output pad.
"""
]

def_output_pad :output,
Expand All @@ -79,16 +98,26 @@ defmodule Membrane.RTSP.Source do
tracks: [ConnectionManager.track()],
ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()},
rtsp_session: Membrane.RTSP.t() | nil,
keep_alive_timer: reference() | nil
keep_alive_timer: reference() | nil,
on_connection_closed: :raise_error | :send_eos,
end_of_stream: boolean()
}

@enforce_keys [:stream_uri, :allowed_media_types, :transport, :timeout, :keep_alive_interval]
@enforce_keys [
:stream_uri,
:allowed_media_types,
:transport,
:timeout,
:keep_alive_interval,
:on_connection_closed
]
defstruct @enforce_keys ++
[
tracks: [],
ssrc_to_track: %{},
rtsp_session: nil,
keep_alive_timer: nil
keep_alive_timer: nil,
end_of_stream: false
]
end

Expand All @@ -102,7 +131,19 @@ defmodule Membrane.RTSP.Source do
@impl true
def handle_setup(_ctx, state) do
state = ConnectionManager.establish_connection(state)
{[spec: create_sources_spec(state)], state}

{[spec: create_sources_spec(state), notify_parent: get_set_up_tracks_notification(state)],
state}
end

@impl true
def handle_child_playing(:rtp_session, _ctx, state) do
{[], ConnectionManager.play(state)}
end

@impl true
def handle_child_playing(_child, _ctx, state) do
{[], state}
end

@impl true
Expand All @@ -114,7 +155,7 @@ defmodule Membrane.RTSP.Source do
) do
case Enum.find(state.tracks, fn track -> track.rtpmap.payload_type == pt end) do
nil ->
{[], state}
raise "No track of payload type #{inspect(pt)} has been requested with SETUP"

track ->
ssrc_to_track = Map.put(state.ssrc_to_track, ssrc, track)
Expand All @@ -136,17 +177,31 @@ defmodule Membrane.RTSP.Source do
end

@impl true
def handle_child_playing(:rtp_session, _ctx, state) do
{[], ConnectionManager.play(state)}
def handle_info(
{:EXIT, rtsp_session, :socket_closed},
ctx,
%State{rtsp_session: rtsp_session, on_connection_closed: :send_eos} = state
) do
notify_udp_sources_actions =
ctx.children
|> Map.keys()
|> Enum.filter(&match?({:udp_source, _ref}, &1))
|> Enum.map(&{:notify_child, {&1, :close_socket}})

{notify_udp_sources_actions, %{state | end_of_stream: true}}
end

@impl true
def handle_child_playing(_child, _ctx, state) do
{[], state}
def handle_info(
{:EXIT, rtsp_session, reason},
_ctx,
%State{rtsp_session: rtsp_session} = state
) do
{[terminate: {:rtsp_session_crash, reason}], state}
end

@impl true
def handle_info(:keep_alive, _ctx, state) do
def handle_info(:keep_alive, _ctx, %{end_of_stream: false} = state) do
{[], ConnectionManager.keep_alive(state)}
end

Expand All @@ -173,21 +228,26 @@ defmodule Membrane.RTSP.Source do
{[terminate: :normal], state}
end

@spec get_set_up_tracks_notification(State.t()) :: set_up_tracks_notification()
def get_set_up_tracks_notification(state) do
{:set_up_tracks, Enum.map(state.tracks, &Map.delete(&1, :transport))}
end

@spec create_sources_spec(State.t()) :: Membrane.ChildrenSpec.t()
defp create_sources_spec(state) do
fmt_mapping =
Enum.map(state.tracks, fn %{rtpmap: rtpmap} ->
Map.new(state.tracks, fn %{rtpmap: rtpmap} ->
{rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}}
end)
|> Enum.into(%{})

case state.transport do
:tcp ->
{:tcp, socket} = List.first(state.tracks).transport

child(:tcp_source, %Membrane.TCP.Source{
connection_side: :client,
local_socket: socket
local_socket: socket,
on_connection_closed: state.on_connection_closed
})
|> child(:tcp_depayloader, %RTSP.TCP.Decapsulator{rtsp_session: state.rtsp_session})
|> via_in(Pad.ref(:rtp_input, make_ref()))
Expand All @@ -200,10 +260,10 @@ defmodule Membrane.RTSP.Source do
{:udp, rtp_port, rtcp_port} = track.transport

[
child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port})
child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session),
child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port})
child({:udp_source, make_ref()}, %Membrane.UDP.Source{local_port_no: rtcp_port})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> get_child(:rtp_session)
]
Expand Down
1 change: 1 addition & 0 deletions lib/membrane_rtsp_plugin/source/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ defmodule Membrane.RTSP.Source.ConnectionManager do
response_timeout: Membrane.Time.as_milliseconds(state.timeout, :round)
) do
{:ok, session} ->
Process.flag(:trap_exit, true)
{:ok, %{state | rtsp_session: session}}

{:error, reason} ->
Expand Down
10 changes: 5 additions & 5 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTSP.Plugin.Mixfile do
use Mix.Project

@version "0.2.0"
@version "0.3.0"
@github_url "https://github.com/gBillal/membrane_rtsp_plugin"

def project do
Expand Down Expand Up @@ -37,13 +37,13 @@ defmodule Membrane.RTSP.Plugin.Mixfile do
defp deps do
[
{:membrane_core, "~> 1.1"},
{:membrane_rtsp, "~> 0.7.1"},
{:membrane_rtp_plugin, "~> 0.28.0"},
{:membrane_rtsp, "~> 0.9.0"},
{:membrane_rtp_plugin, "~> 0.29.0"},
{:membrane_rtp_h264_plugin, "~> 0.19.0"},
{:membrane_rtp_h265_plugin, "~> 0.5.1"},
{:membrane_tcp_plugin, "~> 0.4.0"},
{:membrane_tcp_plugin, "~> 0.6.0"},
{:membrane_h26x_plugin, "~> 0.10.0"},
{:membrane_udp_plugin, "~> 0.13.0"},
{:membrane_udp_plugin, "~> 0.14.0"},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
{:credo, ">= 0.0.0", only: :dev, runtime: false},
Expand Down
Loading

0 comments on commit bc32983

Please sign in to comment.