Skip to content

Commit

Permalink
Add support for announce and record to server (#48)
Browse files Browse the repository at this point in the history
* add support for announce and record methods

* add tests

* update rtsp version in livebook
  • Loading branch information
gBillal authored Jan 21, 2025
1 parent f89a915 commit 0582419
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 105 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtsp.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtsp)


The RTSP client for Elixir
The RTSP client and server for Elixir

Currently supports only RTSP 1.1 defined by
Currently supports only RTSP 1.0 defined by
[RFC2326](https://tools.ietf.org/html/rfc2326)

## Installation
Expand Down
8 changes: 6 additions & 2 deletions lib/membrane_rtsp/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ defmodule Membrane.RTSP.Parser do
{:ok, RTSP.Request.transport_header()} | {:error, :invalid_header}
def parse_transport_header(header) do
case Transport.parse_transport_header(header) do
{:ok, [transport, mode | parameters], _rest, _context, _line, _byte_offset} ->
{:ok, [transport, network_mode | parameters], _rest, _context, _line, _byte_offset} ->
parameters = Map.new(parameters)
mode = String.downcase(parameters["mode"] || "play")

{:ok,
[
transport: String.to_atom(transport),
network_mode: String.to_atom(network_mode),
mode: String.to_atom(mode),
parameters: Map.new(parameters)
parameters: Map.delete(parameters, "mode")
]}

{:error, _reason, _rest, _context, _line, _byte_offset} ->
Expand Down
8 changes: 7 additions & 1 deletion lib/membrane_rtsp/parser/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ defmodule Membrane.RTSP.Parser.Transport do
|> optional(string("=") |> concat(param_value))

single_value_param =
choice(Enum.map(["ssrc", "mode", "source"], &string/1))
choice(Enum.map(["ssrc", "source"], &string/1))
|> string("=")
|> concat(param_value)

play_mode = to_charlist("PLAY") |> Enum.reduce(empty(), &ascii_char(&2, [&1, &1 + 32]))
record_mode = to_charlist("RECORD") |> Enum.reduce(empty(), &ascii_char(&2, [&1, &1 + 32]))
mode_param = string("mode") |> string("=") |> choice([play_mode, record_mode])

integer_value_param =
choice(Enum.map(["ttl", "layers"], &string/1))
|> string("=")
Expand All @@ -33,6 +37,7 @@ defmodule Membrane.RTSP.Parser.Transport do
|> choice([
string("append"),
optional_value_param,
mode_param,
single_value_param,
integer_value_param,
integer_range_param
Expand All @@ -54,6 +59,7 @@ defmodule Membrane.RTSP.Parser.Transport do
case Enum.reverse(args) do
[key] -> {key, nil}
[key, "=", value] when is_binary(value) or key in ["ttl", "layers"] -> {key, value}
["mode", "=" | value] -> {"mode", to_string(value)}
[key, "=", value] -> {key, {value, value + 1}}
[key, "=", min_value, max_value] -> {key, {min_value, max_value}}
end
Expand Down
13 changes: 7 additions & 6 deletions lib/membrane_rtsp/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ defmodule Membrane.RTSP.Request do

@type transport_header :: [
transport: :TCP | :UDP,
mode: :unicast | :multicast,
network_mode: :unicast | :multicast,
mode: :play | :record,
parameters: map()
]

Expand Down Expand Up @@ -123,19 +124,19 @@ defmodule Membrane.RTSP.Request do
```
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP;unicast;client_port=30001-30002"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :UDP, mode: :unicast, parameters: %{"client_port" => {30001, 30002}}]}
{:ok, [transport: :UDP, network_mode: :unicast, mode: :play, parameters: %{"client_port" => {30001, 30002}}]}
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP;ttl=15"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :UDP, mode: :multicast, parameters: %{"ttl" => 15}]}
{:ok, [transport: :UDP, network_mode: :multicast, mode: :play, parameters: %{"ttl" => 15}]}
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP/TCP;unicast;interleaved=0-1"}]}
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :TCP, mode: :unicast, parameters: %{"interleaved" => {0, 1}}]}
{:ok, [transport: :TCP, network_mode: :unicast, mode: :record, parameters: %{"interleaved" => {0, 1}}]}
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :UDP, mode: :multicast, parameters: %{}]}
{:ok, [transport: :UDP, network_mode: :multicast, mode: :play, parameters: %{}]}
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AV"}]}
iex> Request.parse_transport_header(req)
Expand Down
61 changes: 46 additions & 15 deletions lib/membrane_rtsp/server/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ defmodule Membrane.RTSP.Server.Conn do

require Logger

alias Membrane.RTSP.Request
alias Membrane.RTSP.Server.Logic

@max_request_size 1_000_000

@spec start(map()) :: GenServer.on_start()
def start(state) do
GenServer.start(__MODULE__, state)
Expand All @@ -30,27 +29,59 @@ defmodule Membrane.RTSP.Server.Conn do

@impl true
def handle_continue(:process_client_requests, state) do
do_process_client_requests(state)
do_process_client_requests(state, state.session_timeout)
state.request_handler.handle_closed_connection(state.request_handler_state)
{:stop, :normal, state}
end

defp do_process_client_requests(state) do
with {:ok, request} <- get_request(state.socket, state.session_timeout) do
request
|> Logic.process_request(state)
|> do_process_client_requests()
defp do_process_client_requests(state, timeout) do
with {:ok, request} <- get_request(state.socket, timeout) do
case Logic.process_request(request, state) do
%{session_state: :recording} = state ->
do_process_client_requests(state, :infinity)

state ->
do_process_client_requests(state, state.session_timeout)
end
end
end

defp get_request(socket, timeout, acc \\ "") do
with {:ok, acc} <- do_recv(socket, timeout, acc) do
headers_and_body = String.split(acc, ~r/\r?\n\r?\n/, parts: 2)

case do_parse_request(headers_and_body) do
:more -> get_request(socket, timeout, acc)
other -> other
end
end
end

defp get_request(socket, timeout, request \\ "") do
with {:ok, packet} <- :gen_tcp.recv(socket, 0, timeout),
request <- request <> packet,
false <- byte_size(request) > @max_request_size do
if packet != "\r\n", do: get_request(socket, timeout, request), else: {:ok, request}
else
defp do_recv(socket, timeout, acc) do
case :gen_tcp.recv(socket, 0, timeout) do
{:ok, data} -> {:ok, acc <> data}
{:error, reason} -> {:error, reason}
true -> {:error, :max_request_size_exceeded}
end
end

defp do_parse_request([raw_request, body]) do
case Request.parse(raw_request <> "\r\n\r\n") do
{:ok, request} ->
content_length =
case Request.get_header(request, "Content-Length") do
{:ok, value} -> String.to_integer(value)
_error -> 0
end

case byte_size(body) >= content_length do
true -> {:ok, %Request{request | body: :binary.part(body, 0, content_length)}}
false -> :more
end

_error ->
{:error, :invalid_request}
end
end

defp do_parse_request(_raw_request), do: :more
end
34 changes: 29 additions & 5 deletions lib/membrane_rtsp/server/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ defmodule Membrane.RTSP.Server.Handler do
@doc """
Optional callback called when the server is initialized.
The argument is a term passed to the server as the `handler_config` option.
The returned value will be used as a state and passed as the last
The argument is a term passed to the server as the `handler_config` option.
The returned value will be used as a state and passed as the last
argument to the subsequent callbacks.
Default behavior is to return the argument unchanged.
Expand Down Expand Up @@ -142,12 +142,22 @@ defmodule Membrane.RTSP.Server.Handler do
"""
@callback handle_describe(request(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving an ANNOUNCE request.
An announce request contains media description (`body` field of the request) of
the tracks that a client wish to publish to the server.
"""
@callback handle_announce(request(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a SETUP request.
The handler should check for the validity of the requested track (`path` field of the `Request` struct).
The `mode` argument provides the context of the setup, it's either `:play` or `:record`.
"""
@callback handle_setup(request(), state()) :: {Response.t(), state()}
@callback handle_setup(request(), mode :: :play | :record, state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a PLAY request.
Expand All @@ -157,6 +167,14 @@ defmodule Membrane.RTSP.Server.Handler do
"""
@callback handle_play(configured_media_context(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a RECORD request.
`configured_media_context` contains the needed information to start receving media packets.
Refer to the type documentation for more details
"""
@callback handle_record(configured_media_context(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a PAUSE request.
Expand All @@ -174,7 +192,7 @@ defmodule Membrane.RTSP.Server.Handler do
"""
@callback handle_teardown(state()) :: {Response.t(), state()}

@optional_callbacks init: 1
@optional_callbacks init: 1, handle_announce: 2, handle_record: 2

defmacro __using__(_options) do
quote do
Expand All @@ -183,7 +201,13 @@ defmodule Membrane.RTSP.Server.Handler do
@impl true
def init(config), do: config

defoverridable init: 1
@impl true
def handle_announce(_req, state), do: {Response.new(501), state}

@impl true
def handle_record(_req, state), do: {Response.new(501), state}

defoverridable init: 1, handle_announce: 2, handle_record: 2
end
end
end
Loading

0 comments on commit 0582419

Please sign in to comment.