Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for announce and record to server #48

Merged
merged 7 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtsp.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtsp)


The RTSP client for Elixir
The RTSP client and server for Elixir

Currently supports only RTSP 1.1 defined by
Currently supports only RTSP 1.0 defined by
[RFC2326](https://tools.ietf.org/html/rfc2326)

## Installation
Expand Down
8 changes: 6 additions & 2 deletions lib/membrane_rtsp/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ defmodule Membrane.RTSP.Parser do
{:ok, RTSP.Request.transport_header()} | {:error, :invalid_header}
def parse_transport_header(header) do
case Transport.parse_transport_header(header) do
{:ok, [transport, mode | parameters], _rest, _context, _line, _byte_offset} ->
{:ok, [transport, network_mode | parameters], _rest, _context, _line, _byte_offset} ->
parameters = Map.new(parameters)
mode = String.downcase(parameters["mode"] || "play")

{:ok,
[
transport: String.to_atom(transport),
network_mode: String.to_atom(network_mode),
mode: String.to_atom(mode),
parameters: Map.new(parameters)
parameters: Map.delete(parameters, "mode")
]}

{:error, _reason, _rest, _context, _line, _byte_offset} ->
Expand Down
8 changes: 7 additions & 1 deletion lib/membrane_rtsp/parser/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ defmodule Membrane.RTSP.Parser.Transport do
|> optional(string("=") |> concat(param_value))

single_value_param =
choice(Enum.map(["ssrc", "mode", "source"], &string/1))
choice(Enum.map(["ssrc", "source"], &string/1))
|> string("=")
|> concat(param_value)

play_mode = to_charlist("PLAY") |> Enum.reduce(empty(), &ascii_char(&2, [&1, &1 + 32]))
record_mode = to_charlist("RECORD") |> Enum.reduce(empty(), &ascii_char(&2, [&1, &1 + 32]))
mode_param = string("mode") |> string("=") |> choice([play_mode, record_mode])

integer_value_param =
choice(Enum.map(["ttl", "layers"], &string/1))
|> string("=")
Expand All @@ -33,6 +37,7 @@ defmodule Membrane.RTSP.Parser.Transport do
|> choice([
string("append"),
optional_value_param,
mode_param,
single_value_param,
integer_value_param,
integer_range_param
Expand All @@ -54,6 +59,7 @@ defmodule Membrane.RTSP.Parser.Transport do
case Enum.reverse(args) do
[key] -> {key, nil}
[key, "=", value] when is_binary(value) or key in ["ttl", "layers"] -> {key, value}
["mode", "=" | value] -> {"mode", to_string(value)}
[key, "=", value] -> {key, {value, value + 1}}
[key, "=", min_value, max_value] -> {key, {min_value, max_value}}
end
Expand Down
13 changes: 7 additions & 6 deletions lib/membrane_rtsp/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ defmodule Membrane.RTSP.Request do

@type transport_header :: [
transport: :TCP | :UDP,
mode: :unicast | :multicast,
network_mode: :unicast | :multicast,
mode: :play | :record,
parameters: map()
]

Expand Down Expand Up @@ -123,19 +124,19 @@ defmodule Membrane.RTSP.Request do
```
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP;unicast;client_port=30001-30002"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :UDP, mode: :unicast, parameters: %{"client_port" => {30001, 30002}}]}
{:ok, [transport: :UDP, network_mode: :unicast, mode: :play, parameters: %{"client_port" => {30001, 30002}}]}

iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP;ttl=15"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :UDP, mode: :multicast, parameters: %{"ttl" => 15}]}
{:ok, [transport: :UDP, network_mode: :multicast, mode: :play, parameters: %{"ttl" => 15}]}

iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP/TCP;unicast;interleaved=0-1"}]}
iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :TCP, mode: :unicast, parameters: %{"interleaved" => {0, 1}}]}
{:ok, [transport: :TCP, network_mode: :unicast, mode: :record, parameters: %{"interleaved" => {0, 1}}]}

iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AVP"}]}
iex> Request.parse_transport_header(req)
{:ok, [transport: :UDP, mode: :multicast, parameters: %{}]}
{:ok, [transport: :UDP, network_mode: :multicast, mode: :play, parameters: %{}]}

iex> req = %Request{method: "SETUP", headers: [{"Transport", "RTP/AV"}]}
iex> Request.parse_transport_header(req)
Expand Down
61 changes: 46 additions & 15 deletions lib/membrane_rtsp/server/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ defmodule Membrane.RTSP.Server.Conn do

require Logger

alias Membrane.RTSP.Request
alias Membrane.RTSP.Server.Logic

@max_request_size 1_000_000

@spec start(map()) :: GenServer.on_start()
def start(state) do
GenServer.start(__MODULE__, state)
Expand All @@ -30,27 +29,59 @@ defmodule Membrane.RTSP.Server.Conn do

@impl true
def handle_continue(:process_client_requests, state) do
do_process_client_requests(state)
do_process_client_requests(state, state.session_timeout)
state.request_handler.handle_closed_connection(state.request_handler_state)
{:stop, :normal, state}
end

defp do_process_client_requests(state) do
with {:ok, request} <- get_request(state.socket, state.session_timeout) do
request
|> Logic.process_request(state)
|> do_process_client_requests()
defp do_process_client_requests(state, timeout) do
with {:ok, request} <- get_request(state.socket, timeout) do
case Logic.process_request(request, state) do
%{session_state: :recording} = state ->
do_process_client_requests(state, :infinity)

state ->
do_process_client_requests(state, state.session_timeout)
end
end
end

defp get_request(socket, timeout, acc \\ "") do
with {:ok, acc} <- do_recv(socket, timeout, acc) do
headers_and_body = String.split(acc, ~r/\r?\n\r?\n/, parts: 2)

case do_parse_request(headers_and_body) do
:more -> get_request(socket, timeout, acc)
other -> other
end
end
end

defp get_request(socket, timeout, request \\ "") do
with {:ok, packet} <- :gen_tcp.recv(socket, 0, timeout),
request <- request <> packet,
false <- byte_size(request) > @max_request_size do
if packet != "\r\n", do: get_request(socket, timeout, request), else: {:ok, request}
else
defp do_recv(socket, timeout, acc) do
case :gen_tcp.recv(socket, 0, timeout) do
{:ok, data} -> {:ok, acc <> data}
{:error, reason} -> {:error, reason}
true -> {:error, :max_request_size_exceeded}
end
end

defp do_parse_request([raw_request, body]) do
case Request.parse(raw_request <> "\r\n\r\n") do
{:ok, request} ->
content_length =
case Request.get_header(request, "Content-Length") do
{:ok, value} -> String.to_integer(value)
_error -> 0
end

case byte_size(body) >= content_length do
true -> {:ok, %Request{request | body: :binary.part(body, 0, content_length)}}
false -> :more
end

_error ->
{:error, :invalid_request}
end
end

defp do_parse_request(_raw_request), do: :more
end
34 changes: 29 additions & 5 deletions lib/membrane_rtsp/server/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ defmodule Membrane.RTSP.Server.Handler do
@doc """
Optional callback called when the server is initialized.

The argument is a term passed to the server as the `handler_config` option.
The returned value will be used as a state and passed as the last
The argument is a term passed to the server as the `handler_config` option.
The returned value will be used as a state and passed as the last
argument to the subsequent callbacks.

Default behavior is to return the argument unchanged.
Expand Down Expand Up @@ -142,12 +142,22 @@ defmodule Membrane.RTSP.Server.Handler do
"""
@callback handle_describe(request(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving an ANNOUNCE request.

An announce request contains media description (`body` field of the request) of
the tracks that a client wish to publish to the server.
"""
@callback handle_announce(request(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a SETUP request.

The handler should check for the validity of the requested track (`path` field of the `Request` struct).

The `mode` argument provides the context of the setup, it's either `:play` or `:record`.
"""
@callback handle_setup(request(), state()) :: {Response.t(), state()}
@callback handle_setup(request(), mode :: :play | :record, state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a PLAY request.
Expand All @@ -157,6 +167,14 @@ defmodule Membrane.RTSP.Server.Handler do
"""
@callback handle_play(configured_media_context(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a RECORD request.

`configured_media_context` contains the needed information to start receving media packets.
Refer to the type documentation for more details
"""
@callback handle_record(configured_media_context(), state()) :: {Response.t(), state()}

@doc """
Callback called when receiving a PAUSE request.

Expand All @@ -174,7 +192,7 @@ defmodule Membrane.RTSP.Server.Handler do
"""
@callback handle_teardown(state()) :: {Response.t(), state()}

@optional_callbacks init: 1
@optional_callbacks init: 1, handle_announce: 2, handle_record: 2

defmacro __using__(_options) do
quote do
Expand All @@ -183,7 +201,13 @@ defmodule Membrane.RTSP.Server.Handler do
@impl true
def init(config), do: config

defoverridable init: 1
@impl true
def handle_announce(_req, state), do: {Response.new(501), state}

@impl true
def handle_record(_req, state), do: {Response.new(501), state}

defoverridable init: 1, handle_announce: 2, handle_record: 2
end
end
end
Loading