diff --git a/README.md b/README.md index f58fb15..5215f89 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Add the following line to your deps in `mix.exs`: ```elixir def deps do [ - {:membrane_rtsp_plugin, github: "gBillal/membrane_rtsp_plugin", tag: "v0.1.3"} + {:membrane_rtsp_plugin, "~> 0.2.0"} ] end ``` diff --git a/lib/membrane_rtsp_plugin/source.ex b/lib/membrane_rtsp_plugin/source.ex index 1e40775..ee0f939 100644 --- a/lib/membrane_rtsp_plugin/source.ex +++ b/lib/membrane_rtsp_plugin/source.ex @@ -2,31 +2,37 @@ defmodule Membrane.RTSP.Source do @moduledoc """ Source bin responsible for connecting to an RTSP server. - This element connects to an RTSP server, depayload and parse the received media if possible. + This element connects to an RTSP server, depayload and parses the received media if possible. If there's no suitable depayloader and parser, the raw payload is sent to the subsequent elements in the pipeline. + In case connection can't be established or is severed during streaming this bin will crash. + The following codecs are depayloaded and parsed: * `H264` * `H265` ### Notifications - * `{:new_track, ssrc, track}` - sent when the track is parsed and available for consumption by the upper - elements. The output pad should be linked to receive the data. - * `{:connection_failed, reason}` - sent when the element cannot establish connection or a connection is lost - during streaming. This element will try to reconnect to the server, this event is sent only once even if the error - persist. + * `{:new_track, ssrc, track}` - sent when the track is parsed and available for consumption by the next + elements. An output pad `Pad.ref(:output, ssrc)` should be linked to receive the data. """ use Membrane.Bin require Membrane.Logger - alias __MODULE__.{ConnectionManager, Decapsulator, TCP} + alias __MODULE__ + alias __MODULE__.{ConnectionManager, ReadyNotifier} + alias Membrane.RTSP.TCP.Decapsulator + alias Membrane.Time + + @type transport :: + {:udp, port_range_start :: non_neg_integer(), port_range_end :: non_neg_integer()} + | :tcp def_options stream_uri: [ spec: binary(), - description: "The RTSP uri of the resource to stream." + description: "The RTSP URI of the resource to stream." ], allowed_media_types: [ spec: [:video | :audio | :application], @@ -36,20 +42,25 @@ defmodule Membrane.RTSP.Source do """ ], transport: [ - spec: [:udp | :tcp], + spec: transport(), default: :tcp, - description: "Set the rtsp transport protocol." + description: """ + Transport protocol that will be used in the established RTSP stream. In case of + UDP a range needs to provided from which receiving ports will be chosen. + """ ], timeout: [ - spec: non_neg_integer(), - default: :timer.seconds(15), - description: "Set RTSP response timeout" + spec: Time.t(), + default: Time.seconds(15), + default_inspector: &Time.pretty_duration/1, + description: "RTSP response timeout" ], keep_alive_interval: [ - spec: non_neg_integer(), - default: :timer.seconds(15), + spec: Time.t(), + default: Time.seconds(15), + default_inspector: &Time.pretty_duration/1, description: """ - Send a heartbeat to the RTSP server at a regular interval to + Interval of a heartbeat sent to the RTSP server at a regular interval to keep the session alive. """ ] @@ -58,18 +69,49 @@ defmodule Membrane.RTSP.Source do accepted_format: _any, availability: :on_request + defmodule State do + @moduledoc false + @type t :: %__MODULE__{ + stream_uri: binary(), + allowed_media_types: ConnectionManager.media_types(), + transport: Source.transport(), + timeout: Time.t(), + keep_alive_interval: Time.t(), + connection_manager: ConnectionManager.t(), + tracks: [ConnectionManager.track()], + ssrc_to_track: %{non_neg_integer() => ConnectionManager.track()} + } + + @enforce_keys [:stream_uri, :allowed_media_types, :transport, :timeout, :keep_alive_interval] + defstruct @enforce_keys ++ + [ + connection_manager: nil, + tracks: [], + ssrc_to_track: %{} + ] + end + + defmodule ReadyNotifier do + @moduledoc false + # This element's only purpose is to send a notification to it's parent when it's entered playing + # state, meaning all it's siblings also did. + use Membrane.Source + + @impl true + def handle_playing(_ctx, state) do + {[notify_parent: :ready], state} + end + end + @impl true def handle_init(_ctx, options) do - state = - options - |> Map.from_struct() - |> Map.merge(%{connection_manager: nil, tracks: [], ssrc_to_track: %{}, tcp_socket: nil}) + state = struct(State, Map.from_struct(options)) {[], state} end @impl true - def handle_playing(_ctx, state) do + def handle_setup(_ctx, state) do opts = Map.take(state, [ :stream_uri, @@ -90,69 +132,71 @@ defmodule Membrane.RTSP.Source do _ctx, state ) do - if track = Enum.find(state.tracks, fn track -> track.rtpmap.payload_type == pt end) do - ssrc_to_track = Map.put(state.ssrc_to_track, ssrc, track) + case Enum.find(state.tracks, fn track -> track.rtpmap.payload_type == pt end) do + nil -> + {[], state} + + track -> + ssrc_to_track = Map.put(state.ssrc_to_track, ssrc, track) - {[notify_parent: {:new_track, ssrc, Map.delete(track, :transport)}], - %{state | ssrc_to_track: ssrc_to_track}} - else - {[], state} + {[notify_parent: {:new_track, ssrc, Map.delete(track, :transport)}], + %{state | ssrc_to_track: ssrc_to_track}} end end @impl true - def handle_child_notification({:pid, pid}, :source, _ctx, %{tcp_socket: socket} = state) do - # Socket active mode consumes much less cpu that non-active mode. - :ok = :gen_tcp.controlling_process(socket, pid) - :ok = :inet.setopts(socket, active: true) + def handle_child_notification({:request_socket_control, _socket, pid}, :tcp_source, _ctx, state) do + ConnectionManager.transfer_rtsp_socket_control(state.connection_manager, pid) {[], state} end @impl true - def handle_child_notification(_notification, _element, _ctx, state) do + def handle_child_notification(:ready, :ready_notifier, _ctx, state) do + send(state.connection_manager, :source_ready) {[], state} end @impl true - def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do - track = Map.fetch!(state.ssrc_to_track, ssrc) - - spec = - get_child(:rtp_session) - |> via_out(Pad.ref(:output, ssrc), options: [depayloader: get_rtp_depayloader(track)]) - |> parser(track) - |> bin_output(pad) - - {[spec: spec], state} + def handle_child_notification(_notification, _element, _ctx, state) do + {[], state} end @impl true - def handle_info({:tracks, tracks}, _ctx, state) do - Membrane.Logger.info("Received tracks: #{inspect(tracks)}") - + def handle_info(%{tracks: tracks}, _ctx, state) do fmt_mapping = Enum.map(tracks, fn %{rtpmap: rtpmap} -> {rtpmap.payload_type, {String.to_atom(rtpmap.encoding), rtpmap.clock_rate}} end) |> Enum.into(%{}) + common_spec = [ + child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping}), + child(:ready_notifier, ReadyNotifier) + ] + case state.transport do :tcp -> - local_socket = List.first(tracks).transport + {:tcp, socket} = List.first(tracks).transport spec = - child(:source, %TCP{local_socket: local_socket}) - |> child(:tcp_depayloader, Decapsulator) - |> via_in(Pad.ref(:rtp_input, make_ref())) - |> child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping}) + common_spec ++ + [ + child(:tcp_source, %Membrane.TCP.Source{ + connection_side: :client, + local_socket: socket + }) + |> child(:tcp_depayloader, Decapsulator) + |> via_in(Pad.ref(:rtp_input, make_ref())) + |> get_child(:rtp_session) + ] - {[spec: spec], %{state | tracks: tracks, tcp_socket: local_socket}} + {[spec: spec], %{state | tracks: tracks}} - :udp -> + {:udp, _port_range_start, _port_range_end} -> spec = - [child(:rtp_session, %Membrane.RTP.SessionBin{fmt_mapping: fmt_mapping})] ++ + common_spec ++ Enum.flat_map(tracks, fn track -> - {rtp_port, rtcp_port} = track.transport + {:udp, rtp_port, rtcp_port} = track.transport [ child({:udp, make_ref()}, %Membrane.UDP.Source{local_port_no: rtp_port}) @@ -169,14 +213,21 @@ defmodule Membrane.RTSP.Source do end @impl true - def handle_info({:connection_failed, reason}, ctx, state) do - {[remove_children: Map.keys(ctx.children), notify_parent: {:connection_failed, reason}], - state} + def handle_info(_message, _ctx, state) do + {[], state} end @impl true - def handle_info(_message, _ctx, state) do - {[], state} + def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do + track = Map.fetch!(state.ssrc_to_track, ssrc) + + spec = + get_child(:rtp_session) + |> via_out(Pad.ref(:output, ssrc), options: [depayloader: get_rtp_depayloader(track)]) + |> parser(track) + |> bin_output(pad) + + {[spec: spec], state} end @impl true @@ -185,10 +236,12 @@ defmodule Membrane.RTSP.Source do {[terminate: :normal], state} end + @spec get_rtp_depayloader(ConnectionManager.track()) :: module() | nil defp get_rtp_depayloader(%{rtpmap: %{encoding: "H264"}}), do: Membrane.RTP.H264.Depayloader defp get_rtp_depayloader(%{rtpmap: %{encoding: "H265"}}), do: Membrane.RTP.H265.Depayloader defp get_rtp_depayloader(_track), do: nil + @spec parser(ChildrenSpec.builder(), ConnectionManager.track()) :: ChildrenSpec.builder() defp parser(link_builder, %{rtpmap: %{encoding: "H264"}} = track) do sps = track.fmtp.sprop_parameter_sets && track.fmtp.sprop_parameter_sets.sps pps = track.fmtp.sprop_parameter_sets && track.fmtp.sprop_parameter_sets.pps @@ -213,6 +266,7 @@ defmodule Membrane.RTSP.Source do # a strange issue with one of Milesight camera where the parameter sets has # <<0, 0, 0, 1>> at the end + @spec clean_parameter_set(binary()) :: binary() defp clean_parameter_set(ps) do case :binary.part(ps, byte_size(ps), -4) do <<0, 0, 0, 1>> -> :binary.part(ps, 0, byte_size(ps) - 4) diff --git a/lib/membrane_rtsp_plugin/source/connection_manager.ex b/lib/membrane_rtsp_plugin/source/connection_manager.ex index 09b105f..593e95f 100644 --- a/lib/membrane_rtsp_plugin/source/connection_manager.ex +++ b/lib/membrane_rtsp_plugin/source/connection_manager.ex @@ -5,19 +5,57 @@ defmodule Membrane.RTSP.Source.ConnectionManager do require Membrane.Logger + alias __MODULE__ alias Membrane.RTSP - alias Membrane.RTSP.Source.Transport.TCPWrapper @content_type_header [{"accept", "application/sdp"}] - @udp_min_port 5000 - @udp_max_port 65_000 - - @base_back_off_in_ms 10 - @max_back_off_in_ms :timer.minutes(2) - + @type t() :: pid() @type media_types :: [:video | :audio | :application] @type connection_opts :: %{stream_uri: binary(), allowed_media_types: media_types()} + @type track_transport :: + {:tcp, :gen_tcp.socket()} + | {:udp, rtp_port :: :inet.port_number(), rtcp_port :: :inet.port_number()} + @type track :: %{ + control_path: String.t(), + type: :video | :audio | :application, + fmtp: ExSDP.Attribute.FMTP.t() | nil, + rtpmap: ExSDP.Attribute.RTPMapping.t() | nil, + transport: track_transport() + } + + defmodule State do + @moduledoc false + @type t :: %__MODULE__{ + stream_uri: binary(), + allowed_media_types: ConnectionManager.media_types(), + transport: RTSP.Source.transport(), + timeout: Membrane.Time.t(), + keep_alive_interval: Membrane.Time.t(), + parent_pid: pid(), + rtsp_session: RTSP.t() | nil, + tracks: [ConnectionManager.track()], + keep_alive_timer: reference() + } + + @enforce_keys [ + :stream_uri, + :allowed_media_types, + :transport, + :timeout, + :keep_alive_interval, + :parent_pid + ] + defstruct @enforce_keys ++ + [ + rtsp_session: nil, + tracks: [], + keep_alive_timer: nil + ] + end + + @typep connection_establishment_phase_return() :: + {:ok, State.t()} | {:error, reason :: term(), State.t()} @spec start_link(connection_opts()) :: GenServer.on_start() def start_link(options) do @@ -29,18 +67,15 @@ defmodule Membrane.RTSP.Source.ConnectionManager do GenServer.call(server, :stop) end + @spec transfer_rtsp_socket_control(connection_manager :: pid(), new_controller :: pid()) :: :ok + def transfer_rtsp_socket_control(connection_manager, new_controller) do + GenServer.call(connection_manager, {:transfer_rtsp_socket_control, new_controller}) + end + @impl true def init(options) do - state = - Map.merge(options, %{ - rtsp_session: nil, - tracks: [], - status: :init, - keep_alive_timer: nil, - reconnect_attempt: 0 - }) - - Process.send_after(self(), :connect, 0) + state = struct(State, options) + send(self(), :connect) {:ok, state} end @@ -50,43 +85,32 @@ defmodule Membrane.RTSP.Source.ConnectionManager do with {:ok, state} <- start_rtsp_connection(state), {:ok, state} <- get_rtsp_description(state), {:ok, state} <- setup_rtsp_connection(state), - :ok <- play(state) do - %{state | status: :connected, reconnect_attempt: 0} - |> notify_parent({:tracks, Map.values(state.tracks)}) - |> keep_alive() + {:ok, state} <- prepare_source(state) do + state else - {:error, reason, state} -> - Membrane.Logger.error("could not connect to RTSP server due to: #{inspect(reason)}") - if is_pid(state.rtsp_session), do: RTSP.close(state.rtsp_session) - - state = notify_parent(state, {:connection_failed, reason}) |> retry() - %{state | status: :failed} + {:error, reason, state} -> handle_rtsp_error(reason, state) end {:noreply, state} end @impl true - def handle_info(:keep_alive, state) do - {:noreply, keep_alive(state)} - end - - @impl true - def handle_info({:DOWN, _ref, :process, pid, reason}, %{rtsp_session: pid} = state) do + def handle_info(:source_ready, state) do state = - case state.status do - :connected -> - state - |> notify_parent({:connection_failed, reason}) - |> cancel_keep_alive() - |> retry() - |> then(&%{&1 | status: :failed}) + case play(state) do + {:ok, state} -> + %{state | keep_alive_timer: start_keep_alive_timer(state)} - _other -> - state + {:error, reason, state} -> + handle_rtsp_error(reason, state) end - {:noreply, %{state | rtsp_session: nil}} + {:noreply, state} + end + + @impl true + def handle_info(:keep_alive, state) do + {:noreply, keep_alive(state)} end @impl true @@ -97,16 +121,21 @@ defmodule Membrane.RTSP.Source.ConnectionManager do @impl true def handle_call(:stop, _from, state) do - Membrane.RTSP.close(state.rtsp_session) + RTSP.close(state.rtsp_session) {:stop, :normal, :ok, state} end - defp start_rtsp_connection(state) do - options = [response_timeout: state.timeout, controlling_process: state.parent_pid] + @impl true + def handle_call({:transfer_rtsp_socket_control, new_controller}, _from, state) do + {:reply, RTSP.transfer_socket_control(state.rtsp_session, new_controller), state} + end - case RTSP.start(state.stream_uri, TCPWrapper, options) do + @spec start_rtsp_connection(State.t()) :: connection_establishment_phase_return() + defp start_rtsp_connection(state) do + case RTSP.start_link(state.stream_uri, + response_timeout: Membrane.Time.as_milliseconds(state.timeout, :round) + ) do {:ok, session} -> - Process.monitor(session) {:ok, %{state | rtsp_session: session}} {:error, reason} -> @@ -114,6 +143,7 @@ defmodule Membrane.RTSP.Source.ConnectionManager do end end + @spec get_rtsp_description(State.t()) :: connection_establishment_phase_return() defp get_rtsp_description(%{rtsp_session: rtsp_session} = state, retry \\ true) do Membrane.Logger.debug("ConnectionManager: Getting RTSP description") @@ -130,116 +160,176 @@ defmodule Membrane.RTSP.Source.ConnectionManager do end end - defp setup_rtsp_connection(%{rtsp_session: rtsp_session} = state) do - Membrane.Logger.debug("ConnectionManager: Setting up RTSP connection") + @spec setup_rtsp_connection(State.t()) :: connection_establishment_phase_return() + defp setup_rtsp_connection(%{transport: :tcp} = state) do + case setup_rtsp_connection_with_tcp(state.rtsp_session, state.tracks) do + {:ok, tracks} -> {:ok, %{state | tracks: tracks}} + {:error, reason} -> {:error, reason, state} + end + end + + defp setup_rtsp_connection(%{transport: {:udp, min_port, max_port}} = state) do + case setup_rtsp_connection_with_udp(state.rtsp_session, min_port, max_port, state.tracks) do + {:ok, tracks} -> {:ok, %{state | tracks: tracks}} + {:error, reason} -> {:error, reason, state} + end + end + + @spec prepare_source(State.t()) :: connection_establishment_phase_return() + defp prepare_source(state) do + notify_parent(%{tracks: state.tracks}, state) + + {:ok, state} + end + + @spec play(State.t()) :: connection_establishment_phase_return() + defp play(%{rtsp_session: rtsp_session, transport: {:udp, _, _}} = state) do + Membrane.Logger.debug("ConnectionManager: Setting RTSP on play mode") + + case RTSP.play(rtsp_session) do + {:ok, %{status: 200}} -> {:ok, state} + _error -> {:error, :play_rtsp_failed, state} + end + end - state.tracks + defp play(%{rtsp_session: rtsp_session, transport: :tcp} = state) do + Membrane.Logger.debug("ConnectionManager: Setting RTSP on play mode") + + RTSP.play_no_response(rtsp_session) + {:ok, state} + end + + @spec keep_alive(State.t()) :: State.t() + defp keep_alive(state) do + Membrane.Logger.debug("Send GET_PARAMETER to keep session alive") + RTSP.get_parameter_no_response(state.rtsp_session) + + %{state | keep_alive_timer: start_keep_alive_timer(state)} + end + + @spec start_keep_alive_timer(State.t()) :: reference() + defp start_keep_alive_timer(%{keep_alive_interval: interval}) do + Process.send_after(self(), :keep_alive, interval |> Membrane.Time.as_milliseconds(:round)) + end + + @spec setup_rtsp_connection_with_tcp(RTSP.t(), [track()]) :: + {:ok, tracks :: [track()]} | {:error, reason :: term()} + defp setup_rtsp_connection_with_tcp(rtsp_session, tracks) do + socket = RTSP.get_socket(rtsp_session) + + tracks |> Enum.with_index() - |> Enum.reduce_while({:ok, state}, fn {{control_path, _track}, idx}, {:ok, state} -> - with {:ok, transport, transport_header} <- build_transport_header(state, idx), - {:ok, %{status: 200}} <- RTSP.setup(rtsp_session, control_path, transport_header) do - tracks = Map.update!(state.tracks, control_path, &%{&1 | transport: transport}) - {:cont, {:ok, %{state | tracks: tracks}}} - else + |> Enum.reduce_while({:ok, []}, fn {track, idx}, {:ok, set_up_tracks} -> + transport_header = + [{"Transport", "RTP/AVP/TCP;unicast;interleaved=#{idx * 2}-#{idx * 2 + 1}"}] + + case RTSP.setup(rtsp_session, track.control_path, transport_header) do + {:ok, %{status: 200}} -> + {:cont, {:ok, [%{track | transport: {:tcp, socket}} | set_up_tracks]}} + error -> Membrane.Logger.debug( "ConnectionManager: Setting up RTSP connection failed: #{inspect(error)}" ) - {:halt, {:error, :setting_up_sdp_connection_failed, state}} + {:halt, {:error, :setting_up_rtsp_connection_failed}} end end) end - defp play(%{rtsp_session: rtsp_session} = state) do - Membrane.Logger.debug("ConnectionManager: Setting RTSP on play mode") - - case RTSP.play(rtsp_session) do - {:ok, %{status: 200}} -> :ok - _error -> {:error, :play_rtsp_failed, state} - end + @spec setup_rtsp_connection_with_udp( + RTSP.t(), + :inet.port_number(), + :inet.port_number(), + [track()], + [track()] + ) :: {:ok, tracks :: [track()]} | {:error, reason :: term()} + defp setup_rtsp_connection_with_udp( + rtsp_session, + port, + max_port, + tracks, + set_up_tracks \\ [] + ) + + defp setup_rtsp_connection_with_udp(_rtsp_session, _port, _max_port, [], set_up_tracks) do + {:ok, Enum.reverse(set_up_tracks)} end - defp build_transport_header(%{transport: :tcp} = state, media_id) do - {:ok, RTSP.get_transport(state.rtsp_session), - [{"Transport", "RTP/AVP/TCP;unicast;interleaved=#{media_id * 2}-#{media_id * 2 + 1}"}]} + defp setup_rtsp_connection_with_udp(_rtsp_session, max_port, max_port, _tracks, _set_up_tracks) do + # when current port is equal to max_port the range is already exceeded, because port + 1 is also required. + {:error, :port_range_exceeded} end - defp build_transport_header(%{transport: :udp}, _media_id) do - @udp_min_port..@udp_max_port//2 - |> Enum.shuffle() - |> Enum.reduce_while({:error, :no_free_port}, fn rtp_port, acc -> - if free_port?(rtp_port) and free_port?(rtp_port + 1) do - {:halt, - {:ok, {rtp_port, rtp_port + 1}, - [{"Transport", "RTP/AVP;unicast;client_port=#{rtp_port}-#{rtp_port + 1}"}]}} - else - {:cont, acc} + defp setup_rtsp_connection_with_udp(rtsp_session, port, max_port, tracks, set_up_tracks) do + if port_taken?(port) or port_taken?(port + 1) do + setup_rtsp_connection_with_udp(rtsp_session, port + 1, max_port, tracks, set_up_tracks) + else + transport_header = [{"Transport", "RTP/AVP/UDP;unicast;client_port=#{port}-#{port + 1}"}] + [track | rest_tracks] = tracks + + case RTSP.setup(rtsp_session, track.control_path, transport_header) do + {:ok, %{status: 200}} -> + set_up_tracks = [%{track | transport: {:udp, port, port + 1}} | set_up_tracks] + + setup_rtsp_connection_with_udp( + rtsp_session, + port + 2, + max_port, + rest_tracks, + set_up_tracks + ) + + _other -> + {:error, :setup_failed} end - end) + end end - defp free_port?(port) do + @spec port_taken?(:inet.port_number()) :: boolean() + defp port_taken?(port) do case :gen_udp.open(port, reuseaddr: true) do {:ok, socket} -> :inet.close(socket) - true + false _error -> - false + true end end - defp keep_alive(state) do - Membrane.Logger.debug("Send GET_PARAMETER to keep session alive") - RTSP.get_parameter_no_response(state.rtsp_session) - - %{ - state - | keep_alive_timer: Process.send_after(self(), :keep_alive, state.keep_alive_interval) - } - end + @spec handle_rtsp_error(term(), State.t()) :: no_return() + defp handle_rtsp_error(reason, state) do + Membrane.Logger.error("could not connect to RTSP server due to: #{inspect(reason)}") + if state.rtsp_session != nil, do: RTSP.close(state.rtsp_session) - defp cancel_keep_alive(state) do - Process.cancel_timer(state.keep_alive_timer) - %{state | keep_alive_timer: nil} + raise "RTSP connection failed, reason: #{inspect(reason)}" end - # notify the parent only once on successive failures - defp notify_parent(%{status: :failed} = state, _msg), do: state - - defp notify_parent(state, msg) do + @spec notify_parent(term(), State.t()) :: State.t() + defp notify_parent(msg, state) do send(state.parent_pid, msg) state end - defp retry(%{reconnect_attempt: attempt} = state) do - delay = - :math.pow(2, attempt) - |> Kernel.*(@base_back_off_in_ms) - |> min(@max_back_off_in_ms) - |> trunc() - - Membrane.Logger.info("retry connection in #{delay} ms") - Process.send_after(self(), :connect, delay) - %{state | reconnect_attempt: attempt + 1} - end - + @spec get_tracks(RTSP.Response.t(), media_types()) :: [track()] defp get_tracks(%{body: %ExSDP{media: media_list}}, stream_types) do media_list |> Enum.filter(&(&1.type in stream_types)) |> Enum.map(fn media -> - {get_attribute(media, "control", ""), - %{ - type: media.type, - rtpmap: get_attribute(media, ExSDP.Attribute.RTPMapping), - fmtp: get_attribute(media, ExSDP.Attribute.FMTP), - transport: nil - }} + %{ + control_path: get_attribute(media, "control", ""), + type: media.type, + rtpmap: get_attribute(media, ExSDP.Attribute.RTPMapping), + fmtp: get_attribute(media, ExSDP.Attribute.FMTP), + transport: nil + } end) - |> Map.new() end + @spec get_attribute(ExSDP.Media.t(), ExSDP.Attribute.key(), default) :: + ExSDP.Attribute.t() | default + when default: var defp get_attribute(video_attributes, attribute, default \\ nil) do case ExSDP.get_attribute(video_attributes, attribute) do {^attribute, value} -> value diff --git a/lib/membrane_rtsp_plugin/source/rtsp_decapsulator.ex b/lib/membrane_rtsp_plugin/source/rtsp_decapsulator.ex deleted file mode 100644 index 1014906..0000000 --- a/lib/membrane_rtsp_plugin/source/rtsp_decapsulator.ex +++ /dev/null @@ -1,105 +0,0 @@ -defmodule Membrane.RTSP.Source.Decapsulator do - @moduledoc false - - use Membrane.Filter - - alias Membrane.{Buffer, RemoteStream, RTP, RTSP} - - def_options rtsp_session: [ - spec: pid() | nil, - default: nil, - description: """ - PID of a RTSP Session (returned from Membrane.RTSP.start or Membrane.RTSP.start_link) - that received RTSP responses will be forwarded to. If nil the responses will be - discarded. - """ - ] - - def_input_pad :input, accepted_format: %RemoteStream{type: :bytestream} - - def_output_pad :output, accepted_format: %RemoteStream{type: :packetized, content_format: RTP} - - @impl true - def handle_init(_ctx, opts) do - state = - Map.from_struct(opts) - |> Map.merge(%{ - unprocessed_data: <<>> - }) - - {[], state} - end - - @impl true - def handle_playing(_ctx, state) do - stream_format = %RemoteStream{type: :packetized, content_format: RTP} - {[stream_format: {:output, stream_format}], state} - end - - @impl true - def handle_stream_format(:input, _stream_format, _ctx, state) do - {[], state} - end - - @impl true - def handle_buffer(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do - # Some IP Cameras doesn't send whole RTP packets, so RTSP messages may come with - # RTP data in the same network packet - packets_binary = - if String.starts_with?(state.unprocessed_data, "RTSP"), - do: parse_rtsp_response(state.unprocessed_data <> payload), - else: state.unprocessed_data <> payload - - {unprocessed_data, complete_packets_binaries} = get_complete_packets(packets_binary) - - packets_buffers = - Enum.map(complete_packets_binaries, &%Buffer{payload: &1, metadata: metadata}) - - {[buffer: {:output, packets_buffers}], %{state | unprocessed_data: unprocessed_data}} - end - - defp parse_rtsp_response(data) do - with {:ok, %{status: 200} = resp} <- RTSP.Response.parse(data), - length <- get_resp_content_length(resp), - true <- byte_size(resp.body) >= length do - :binary.part(resp.body, length, byte_size(resp.body) - length) - else - _other -> data - end - end - - defp get_resp_content_length(resp) do - case RTSP.Response.get_header(resp, "Content-Length") do - {:ok, length} -> String.to_integer(length) - _error -> 0 - end - end - - @spec get_complete_packets(binary()) :: - {unprocessed_data :: binary(), complete_packets :: [binary()]} - defp get_complete_packets(packets_binary, complete_packets \\ []) - - defp get_complete_packets(packets_binary, complete_packets) - when byte_size(packets_binary) <= 4 do - {packets_binary, Enum.reverse(complete_packets)} - end - - defp get_complete_packets( - <<"$", _received_channel_id, payload_length::size(16), rest::binary>> = packets_binary, - complete_packets - ) do - case rest do - <> -> - complete_packets = [complete_packet_binary | complete_packets] - get_complete_packets(rest, complete_packets) - - _incomplete_packet_binary -> - {packets_binary, Enum.reverse(complete_packets)} - end - end - - defp get_complete_packets(rtsp_message, complete_packets_binaries) do - # If the payload doesn't start with a "$" then it must be a RTSP message (or a part of it) - {rtsp_message, complete_packets_binaries} - end -end diff --git a/lib/membrane_rtsp_plugin/source/tcp.ex b/lib/membrane_rtsp_plugin/source/tcp.ex deleted file mode 100644 index 0ace47b..0000000 --- a/lib/membrane_rtsp_plugin/source/tcp.ex +++ /dev/null @@ -1,68 +0,0 @@ -defmodule Membrane.RTSP.Source.TCP do - @moduledoc """ - Element that reads packets from a TCP socket and sends their payloads through the output pad. - """ - use Membrane.Source - - require Membrane.Logger - alias Membrane.RemoteStream - - def_options local_socket: [ - spec: :gen_tcp.socket(), - description: "Already connected TCP socket." - ] - - def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :push - - @impl true - def handle_init(_context, opts) do - {:ok, {peer_address, peer_port_no}} = :inet.peername(opts.local_socket) - - state = %{ - local_socket: opts.local_socket, - peer_address: peer_address, - peer_port_no: peer_port_no - } - - {[], state} - end - - @impl true - def handle_playing(_ctx, state) do - {[stream_format: {:output, %RemoteStream{type: :bytestream}}, notify_parent: {:pid, self()}], - state} - end - - @impl true - def handle_info({:tcp, _socket, msg}, _ctx, state) do - metadata = - Map.new() - |> Map.put(:tcp_source_address, state.peer_address) - |> Map.put(:tcp_source_port, state.peer_port_no) - |> Map.put(:arrival_ts, Membrane.Time.vm_time()) - - {[buffer: {:output, %Membrane.Buffer{payload: msg, metadata: metadata}}], state} - end - - @impl true - def handle_info({:tcp_closed, _socket}, _ctx, state) do - {[end_of_stream: :output], state} - end - - @impl true - def handle_info({:tcp_error, _socket, reason}, _ctx, _state) do - raise "TCP Socket receiving error, reason: #{inspect(reason)}" - end - - @impl true - def handle_info(msg, _ctx, state) do - Membrane.Logger.warning("Received unexpected message: #{inspect(msg)}") - {[], state} - end - - @impl true - def handle_terminate_request(_ctx, state) do - :gen_tcp.close(state.local_socket) - {[terminate: :normal], state} - end -end diff --git a/lib/membrane_rtsp_plugin/source/transport/tcp_wrapper.ex b/lib/membrane_rtsp_plugin/source/transport/tcp_wrapper.ex deleted file mode 100644 index 5b4cc1d..0000000 --- a/lib/membrane_rtsp_plugin/source/transport/tcp_wrapper.ex +++ /dev/null @@ -1,27 +0,0 @@ -defmodule Membrane.RTSP.Source.Transport.TCPWrapper do - @moduledoc false - - use Membrane.RTSP.Transport - - alias Membrane.RTSP.Transport.TCPSocket - - @impl true - def init(url, options) do - with {:ok, tcp_socket} <- TCPSocket.init(url, options) do - if pid = options[:controlling_process] do - :gen_tcp.controlling_process(tcp_socket, pid) - end - - {:ok, tcp_socket} - end - end - - @impl true - defdelegate execute(request, socket, options), to: TCPSocket - - @impl true - defdelegate handle_info(msg, state), to: TCPSocket - - @impl true - defdelegate close(state), to: TCPSocket -end diff --git a/lib/membrane_rtsp_plugin/tcp_decapsulator.ex b/lib/membrane_rtsp_plugin/tcp_decapsulator.ex new file mode 100644 index 0000000..001a91e --- /dev/null +++ b/lib/membrane_rtsp_plugin/tcp_decapsulator.ex @@ -0,0 +1,121 @@ +defmodule Membrane.RTSP.TCP.Decapsulator do + @moduledoc """ + This element provides functionality of decapsulating RTP Packets and redirecting RTSP messages + received in the same TCP stream established with RTSP. The encapsulation is described in + RFC 7826 Section 14. + + Encapsulated RTP packets interleaved in the stream will have the following structure: + ["$" = 36 :: 1 byte][Channel id :: 1 byte][Length :: 2 bytes][packet :: bytes] + + RTSP Messages are not encapsulated this way, but can only be present between RTP packets. + """ + use Membrane.Filter + + alias Membrane.{Buffer, RemoteStream, RTP, RTSP} + + def_options rtsp_session: [ + spec: pid() | nil, + default: nil, + description: """ + PID of a RTSP Session (returned from Membrane.RTSP.start or Membrane.RTSP.start_link) + that received RTSP responses will be forwarded to. If nil the responses will be + discarded. + """ + ] + + def_input_pad :input, accepted_format: %RemoteStream{type: :bytestream} + + def_output_pad :output, accepted_format: %RemoteStream{type: :packetized, content_format: RTP} + + @impl true + def handle_init(_ctx, opts) do + state = + Map.from_struct(opts) + |> Map.merge(%{ + unprocessed_data: <<>> + }) + + {[], state} + end + + @impl true + def handle_playing(_ctx, state) do + stream_format = %RemoteStream{type: :packetized, content_format: RTP} + {[stream_format: {:output, stream_format}], state} + end + + @impl true + def handle_stream_format(:input, _stream_format, _ctx, state) do + {[], state} + end + + @impl true + def handle_buffer(:input, %Buffer{payload: payload, metadata: metadata}, _ctx, state) do + packets_binary = state.unprocessed_data <> payload + + {unprocessed_data, complete_packets_binaries} = + get_complete_packets(packets_binary, state.rtsp_session) + + packets_buffers = + Enum.map(complete_packets_binaries, &%Buffer{payload: &1, metadata: metadata}) + + {[buffer: {:output, packets_buffers}], %{state | unprocessed_data: unprocessed_data}} + end + + @spec get_complete_packets(binary(), pid() | nil, [binary()]) :: + {unprocessed_data :: binary(), complete_packets :: [binary()]} + defp get_complete_packets(packets_binary, rtsp_session, complete_packets \\ []) + + defp get_complete_packets(packets_binary, _rtsp_session, complete_packets) + when byte_size(packets_binary) <= 4 do + {packets_binary, Enum.reverse(complete_packets)} + end + + defp get_complete_packets( + <<"$", _channel_id, payload_length::size(16), rest::binary>> = packets_binary, + rtsp_session, + complete_packets + ) do + case rest do + <> -> + complete_packets = [complete_packet_binary | complete_packets] + + get_complete_packets(rest, rtsp_session, complete_packets) + + _incomplete_packet_binary -> + {packets_binary, Enum.reverse(complete_packets)} + end + end + + defp get_complete_packets( + <<"RTSP", _rest::binary>> = rtsp_message_start, + rtsp_session, + complete_packets_binaries + ) do + case RTSP.Response.verify_content_length(rtsp_message_start) do + {:ok, _expected_length, _actual_length} -> + if rtsp_session != nil do + {:ok, %RTSP.Response{status: 200}} = + RTSP.handle_response(rtsp_session, rtsp_message_start) + end + + {<<>>, complete_packets_binaries} + + {:error, expected_length, actual_length} when actual_length > expected_length -> + rest_length = actual_length - expected_length + rtsp_message_length = byte_size(rtsp_message_start) - rest_length + + <> = + rtsp_message_start + + if rtsp_session != nil do + {:ok, %RTSP.Response{status: 200}} = RTSP.handle_response(rtsp_session, rtsp_message) + end + + get_complete_packets(rest, rtsp_session, complete_packets_binaries) + + {:error, expected_length, actual_length} when actual_length <= expected_length -> + {rtsp_message_start, Enum.reverse(complete_packets_binaries)} + end + end +end diff --git a/mix.exs b/mix.exs index 7c3bff7..f344608 100644 --- a/mix.exs +++ b/mix.exs @@ -1,8 +1,8 @@ defmodule Membrane.RTSP.Plugin.Mixfile do use Mix.Project - @version "0.1.3" - @github_url "https://github.com/gBillal/membrane_rtsp_plugin" + @version "0.2.0" + @github_url "https://github.com/membraneframework-labs/membrane_rtsp_plugin" def project do [ @@ -37,11 +37,15 @@ defmodule Membrane.RTSP.Plugin.Mixfile do defp deps do [ {:membrane_core, "~> 1.0"}, - {:membrane_rtsp, "~> 0.6.0"}, - {:membrane_rtp_plugin, "~> 0.27.1"}, + {:membrane_rtsp, "~> 0.7.1"}, + {:membrane_rtp_plugin, + github: "membraneframework/membrane_rtp_plugin", + branch: "move-rtsp-decapsulator", + override: true}, + # {:membrane_rtp_plugin, "~> 0.27.1"}, {:membrane_rtp_h264_plugin, "~> 0.19.0"}, {:membrane_rtp_h265_plugin, "~> 0.5.0"}, - {:membrane_tcp_plugin, "~> 0.2.0"}, + {:membrane_tcp_plugin, "~> 0.4.0"}, {:membrane_h26x_plugin, "~> 0.10.0"}, {:membrane_udp_plugin, "~> 0.13.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, @@ -67,7 +71,7 @@ defmodule Membrane.RTSP.Plugin.Mixfile do defp package do [ - maintainers: ["Billal Ghilas"], + maintainers: ["Membrane Team"], licenses: ["Apache-2.0"], links: %{ "GitHub" => @github_url diff --git a/mix.lock b/mix.lock index 9ae9b79..28e084b 100644 --- a/mix.lock +++ b/mix.lock @@ -8,7 +8,7 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.31.2", "8b06d0a5ac69e1a54df35519c951f1f44a7b7ca9a5bb7a260cd8a174d6322ece", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "317346c14febaba9ca40fd97b5b5919f7751fb85d399cc8e7e8872049f37e0af"}, + "ex_doc": {:hex, :ex_doc, "0.32.1", "21e40f939515373bcdc9cffe65f3b3543f05015ac6c3d01d991874129d173420", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5142c9db521f106d61ff33250f779807ed2a88620e472ac95dc7d59c380113da"}, "ex_sdp": {:hex, :ex_sdp, "0.15.0", "53815fb5b5e4fae0f3b26de90f372446bb8e0eed62a3cc20394d3c29519698be", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "d3f23596b73e7057521ff0f0d55b1189c6320a2f04388aa3a80a0aa97ffb379f"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, @@ -24,11 +24,11 @@ "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.1", "d7aeb166da55c6573b2178e18caeea290b09fd6f3cca428454085223e81476a0", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "9cd63a67ffed0654a932efff34395ded04a05e48d08ea996c93daebf889dac08"}, "membrane_rtp_format": {:hex, :membrane_rtp_format, "0.8.0", "828924bbd27efcf85b2015ae781e824c4a9928f0a7dc132abc66817b2c6edfc4", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "bc75d2a649dfaef6df563212fbb9f9f62eebc871393692f9dae8d289bd4f94bb"}, - "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.19.0", "112bfedc14fb83bdb549ef1a03da23908feedeb165fd3e4512a549f1af532ae7", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "76fd159e7406cadbef15124cba30eca3fffcf71a7420964f26e23d4cffd9b29d"}, + "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.19.1", "8c61b3d2968e54e1b459a42f070ea71f597056eba4059df780eaa8da8aee6035", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "fb2bb5d4ed18f38523851cb449a2c78ed533e58028dc058342b8cfd659f812d5"}, "membrane_rtp_h265_plugin": {:hex, :membrane_rtp_h265_plugin, "0.5.0", "c4009d1ab0d59df659d05a493d4a32bfc5adf6a63cf6e915022db05040e7ab91", [:mix], [{:membrane_core, "~> 1.0.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "17537a7a4ad39602ccb2b0f37fa5327e5be61f0a3bb2471eda4f381311be0f48"}, - "membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.27.1", "bee67c272f2ba252a98e302c251dfa04fa4e061f154e6112f154d5eb58d6b199", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtsp, "~> 0.6.0", [hex: :membrane_rtsp, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "61410fa44b71d6bc4c31cbb2c832417bc71f5f2bf9a29337536b12ec089614b5"}, - "membrane_rtsp": {:hex, :membrane_rtsp, "0.6.2", "782050a1dd604721647c984741c85ed0b6eaf1b0940fbdaab459a267c6dca71c", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.15.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.4.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d2f9aae6d81aeb6df35d2f45f2f99414473e5bf978b554930a25c6fb89311e80"}, - "membrane_tcp_plugin": {:hex, :membrane_tcp_plugin, "0.2.0", "b7eb928b6879570051660a6e71582a69e03adda07d303a3502a7453476de93dd", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "1868fef575b43c03f60ddb555d6475bfe65262656378e2be18f11bb62c8db6dc"}, + "membrane_rtp_plugin": {:git, "https://github.com/membraneframework/membrane_rtp_plugin.git", "9df1417dfbcd5d8775c7f49a815ad016c076bc8d", [branch: "move-rtsp-decapsulator"]}, + "membrane_rtsp": {:hex, :membrane_rtsp, "0.7.1", "4f54d0d335fe9c2fb6e6e5c10d244db78f7e3ba781b6daf18094c9feff296c6c", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.15.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.4.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e47f9cd0caf02730d723dc3785bcce4a1cfafcc3bb0f434c63abb5a5db0f7978"}, + "membrane_tcp_plugin": {:hex, :membrane_tcp_plugin, "0.4.0", "95a9a2fcbe7c94e3f18b61ef1d35ea07ccb96838185b8c2f2e3ea890a33f3473", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "27df6c36381cbfabf039a6bfd53469e1de61d10cda54f1015d1e4ac12f152834"}, "membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"}, "membrane_udp_plugin": {:hex, :membrane_udp_plugin, "0.13.0", "c4d10b4cb152a95779e36fac4338e11ef0b0cb545c78ca337d7676f6df5d5709", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "47a1661038ef65025fe36cfcae8ce23c022f9dc0867b8340c46dd4963f5a1bcb"}, "mimic": {:hex, :mimic, "1.7.4", "cd2772ffbc9edefe964bc668bfd4059487fa639a5b7f1cbdf4fd22946505aa4f", [:mix], [], "hexpm", "437c61041ecf8a7fae35763ce89859e4973bb0666e6ce76d75efc789204447c3"}, diff --git a/test/membrane_rtsp_plugin/source/connection_manager_test.exs b/test/membrane_rtsp_plugin/source/connection_manager_test.exs index 3755bcf..8fd73b1 100644 --- a/test/membrane_rtsp_plugin/source/connection_manager_test.exs +++ b/test/membrane_rtsp_plugin/source/connection_manager_test.exs @@ -30,8 +30,8 @@ defmodule Membrane.RTSP.Source.ConnectionManagerTest do stream_uri: @stream_uri, allowed_media_types: @allowed_media_types, transport: :tcp, - timeout: :timer.seconds(15), - keep_alive_interval: :timer.seconds(15), + timeout: Membrane.Time.seconds(15), + keep_alive_interval: Membrane.Time.seconds(15), parent_pid: self() } @@ -40,18 +40,16 @@ defmodule Membrane.RTSP.Source.ConnectionManagerTest do test "initialize state", %{opts: opts} do assert {:ok, - %{ + %ConnectionManager.State{ stream_uri: @stream_uri, allowed_media_types: @allowed_media_types, transport: :tcp, - timeout: :timer.seconds(15), - keep_alive_interval: :timer.seconds(15), + timeout: Membrane.Time.seconds(15), + keep_alive_interval: Membrane.Time.seconds(15), rtsp_session: nil, tracks: [], keep_alive_timer: nil, - status: :init, - parent_pid: self(), - reconnect_attempt: 0 + parent_pid: self() }} == ConnectionManager.init(opts) end @@ -59,23 +57,21 @@ defmodule Membrane.RTSP.Source.ConnectionManagerTest do test "successful connection", %{opts: opts} do pid = :c.pid(0, 1, 1) - expect(RTSP.start(@stream_uri, _transport, _options), do: {:ok, pid}) + expect(RTSP.start_link(@stream_uri, _options), do: {:ok, pid}) expect RTSP.describe(^pid, [{"accept", "application/sdp"}]) do {:ok, %Response{Response.new(200) | body: ExSDP.parse!(@sdp)}} end expect(RTSP.setup(^pid, _control, _headers), [num_calls: 3], do: {:ok, Response.new(200)}) - expect(RTSP.play(^pid), do: {:ok, Response.new(200)}) - expect(RTSP.get_transport(^pid), [num_calls: 3], do: %{}) + expect(RTSP.get_socket(^pid), do: :socket) assert {:ok, state} = ConnectionManager.init(opts) assert {:noreply, state} = ConnectionManager.handle_info(:connect, state) - assert state.status == :connected assert state.rtsp_session == pid - assert_received {:tracks, tracks} + assert_received %{tracks: tracks} assert length(tracks) == 3 assert [:application, :audio, :video] == Enum.map(tracks, & &1.type) end @@ -83,36 +79,20 @@ defmodule Membrane.RTSP.Source.ConnectionManagerTest do test "failed connection", %{opts: opts} do pid = :c.pid(0, 1, 1) - expect(RTSP.start(@stream_uri, _transport, _options), do: {:error, :econnrefused}) - expect(RTSP.start(@stream_uri, _transport, _options), do: {:ok, pid}) + expect(RTSP.start_link(@stream_uri, _options), do: {:error, :econnrefused}) + expect(RTSP.start_link(@stream_uri, _options), do: {:ok, pid}) expect(RTSP.describe(^pid, [{"accept", "application/sdp"}]), do: {:ok, Response.new(401)}) expect(RTSP.describe(^pid, [{"accept", "application/sdp"}]), do: {:ok, Response.new(404)}) assert {:ok, state} = ConnectionManager.init(opts) - assert {:noreply, %{status: :failed} = state} = ConnectionManager.handle_info(:connect, state) - assert_received {:connection_failed, :econnrefused} - assert {:noreply, %{status: :failed}} = ConnectionManager.handle_info(:connect, state) - refute_received {:connection_failed, :setting_up_sdp_connection_failed} - end - - test "lost connection reset state", %{opts: opts} do - pid = :c.pid(0, 1, 1) - - assert {:ok, state} = ConnectionManager.init(opts) - - state = %{ - state - | parent_pid: self(), - rtsp_session: pid, - status: :connected, - keep_alive_timer: make_ref() - } - - assert {:noreply, %{rtsp_session: nil, status: :failed, keep_alive_timer: nil}} = - ConnectionManager.handle_info({:DOWN, make_ref(), :process, pid, :crash}, state) + assert_raise RuntimeError, + "RTSP connection failed, reason: :econnrefused", + fn -> ConnectionManager.handle_info(:connect, state) end - assert_received {:connection_failed, :crash} + assert_raise RuntimeError, + "RTSP connection failed, reason: :getting_rtsp_description_failed", + fn -> ConnectionManager.handle_info(:connect, state) end end end diff --git a/test/membrane_rtsp_plugin/source_test.exs b/test/membrane_rtsp_plugin/source_test.exs index c21416f..e026b52 100644 --- a/test/membrane_rtsp_plugin/source_test.exs +++ b/test/membrane_rtsp_plugin/source_test.exs @@ -25,8 +25,8 @@ defmodule Membrane.RTSP.SourceTest do transport: opts[:transport] || :tcp, allowed_media_types: opts[:allowed_media_types] || [:video, :audio, :application], stream_uri: "rtsp://localhost:#{opts[:port]}/", - timeout: opts[:timeout] || :timer.seconds(15), - keep_alive_interval: opts[:keep_alive_interval] || :timer.seconds(15) + timeout: opts[:timeout] || Membrane.Time.seconds(15), + keep_alive_interval: opts[:keep_alive_interval] || Membrane.Time.seconds(15) }) {[spec: spec], %{dest_folder: opts[:dest_folder]}} @@ -153,12 +153,15 @@ defmodule Membrane.RTSP.SourceTest do custom_args: %{ port: port, dest_folder: tmp_dir, - transport: :udp, - timeout: :timer.seconds(1), - keep_alive_interval: :timer.seconds(1) + transport: {:udp, 20_000, 20_020}, + timeout: Membrane.Time.seconds(1), + keep_alive_interval: Membrane.Time.seconds(10) } ] + {:ok, blocking_socket1} = :gen_udp.open(20_000) + {:ok, blocking_socket2} = :gen_udp.open(20_003) + pid = Membrane.Testing.Pipeline.start_link_supervised!(options) assert_pipeline_notified( @@ -179,20 +182,26 @@ defmodule Membrane.RTSP.SourceTest do {:new_track, _ssrc, %{type: :application, rtpmap: %{encoding: "plain"}}} ) - assert_pipeline_notified(pid, :source, {:connection_failed, _reason}, 5_000) + Process.sleep(500) :ok = Membrane.Testing.Pipeline.terminate(pid) + :gen_udp.close(blocking_socket1) + :gen_udp.close(blocking_socket2) + assert File.exists?(Path.join(tmp_dir, "out.h264")) assert File.exists?(Path.join(tmp_dir, "out.hevc")) assert File.exists?(Path.join(tmp_dir, "out.txt")) - assert File.read!(Path.join(tmp_dir, "out.h264")) == File.read!("test/fixtures/in.h264"), + assert File.read!("test/fixtures/in.h264") + |> String.starts_with?(File.read!(Path.join(tmp_dir, "out.h264"))), "content is not the same" - assert File.read!(Path.join(tmp_dir, "out.hevc")) == File.read!("test/fixtures/in.hevc"), + assert File.read!("test/fixtures/in.hevc") + |> String.starts_with?(File.read!(Path.join(tmp_dir, "out.hevc"))), "content is not the same" - assert File.read!(Path.join(tmp_dir, "out.txt")) == File.read!("test/fixtures/in.txt"), + assert File.read!("test/fixtures/in.txt") + |> String.starts_with?(File.read!(Path.join(tmp_dir, "out.txt"))), "content is not the same" end end diff --git a/test/membrane_rtsp_plugin/tcp_decapsulator_test.exs b/test/membrane_rtsp_plugin/tcp_decapsulator_test.exs new file mode 100644 index 0000000..7d5d2ff --- /dev/null +++ b/test/membrane_rtsp_plugin/tcp_decapsulator_test.exs @@ -0,0 +1,84 @@ +defmodule Membrane.RTSP.DecapsulatorTest do + use ExUnit.Case + + import Membrane.Testing.Assertions + import Membrane.ChildrenSpec + + alias Membrane.RTSP.TCP.Decapsulator + alias Membrane.Testing.{Pipeline, Sink, Source} + + @header_length 4 + + defp encapsulate_rtp_packets(rtp_packets) do + Enum.map(rtp_packets, &<<"$", 0, byte_size(&1)::size(16), &1::binary>>) + end + + defp create_tcp_segments(encapsulated_rtp_packets, tcp_segments_lengths) do + assert Enum.sum(tcp_segments_lengths) == + Enum.sum(Enum.map(encapsulated_rtp_packets, &byte_size(&1))) + + encaplsulated_rtp_packets_binary = Enum.join(encapsulated_rtp_packets) + + {tcp_segments, _length} = + Enum.map_reduce(tcp_segments_lengths, 0, fn len, pos -> + {:binary.part(encaplsulated_rtp_packets_binary, pos, len), pos + len} + end) + + tcp_segments + end + + defp perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) do + rtp_packets = Enum.map(rtp_packets_lengths, &<<0::size(&1)-unit(8)>>) + + tcp_segments = + rtp_packets |> encapsulate_rtp_packets() |> create_tcp_segments(tcp_segments_lengths) + + pipeline = + Pipeline.start_link_supervised!( + spec: + child(:source, %Source{ + output: tcp_segments + }) + |> child(:decapsulator, Decapsulator) + |> child(:sink, Sink) + ) + + Enum.each(rtp_packets, fn packet -> + assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: ^packet}) + end) + + Pipeline.terminate(pipeline) + end + + describe "RTSP Decapsulator decapsulates correctly" do + test "when one tcp segment is one rtp packet" do + rtp_packets_lengths = 10..20 + tcp_segments_lengths = Enum.map(rtp_packets_lengths, &(&1 + @header_length)) + + perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) + end + + test "when there are multiple (3) rtp packets in one tcp segment" do + rtp_packets_lengths = 10..40 + + tcp_segments_lengths = + rtp_packets_lengths + |> Enum.chunk_every(3) + |> Enum.map(&(Enum.sum(&1) + length(&1) * @header_length)) + + perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) + end + + test "when rtp packets are spread across multiple (3) tcp segments" do + rtp_packets_lengths = 11..41//3 + + tcp_segments_lengths = + Enum.flat_map(rtp_packets_lengths, fn len -> + tcp_segment_base_length = div(len + @header_length, 3) + [tcp_segment_base_length - 1, tcp_segment_base_length, tcp_segment_base_length + 1] + end) + + perform_standard_test(rtp_packets_lengths, tcp_segments_lengths) + end + end +end diff --git a/test/support/server_pipeline.ex b/test/support/server_pipeline.ex index da54451..cd669c5 100644 --- a/test/support/server_pipeline.ex +++ b/test/support/server_pipeline.ex @@ -32,7 +32,7 @@ defmodule Membrane.RTSP.UDP.Sink do def_options( socket: [spec: :inet.socket()], - address: [spec: :inet.ipaddress()], + address: [spec: :inet.ip_address()], port: [spec: :inet.port_number()] )