Skip to content

Commit

Permalink
Make the handler's callback returns rtsp response
Browse files Browse the repository at this point in the history
  • Loading branch information
gBillal committed Feb 2, 2024
1 parent 80b8abd commit 7acc465
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 98 deletions.
2 changes: 1 addition & 1 deletion lib/membrane_rtsp/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Membrane.RTSP.Parser do
alpha_num
|> ignore(string(":"))
|> concat(ignored_space)
|> utf8_string([{:not, ?\r}], min: 1)
|> utf8_string([{:not, ?\r}, {:not, ?\n}], min: 1)
|> concat(ignored_delimeter)

request_line =
Expand Down
19 changes: 18 additions & 1 deletion lib/membrane_rtsp/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,31 @@ defmodule Membrane.RTSP.Response do
```
"""
@spec get_header(__MODULE__.t(), binary()) :: {:error, :no_such_header} | {:ok, binary()}
@spec get_header(t(), binary()) :: {:error, :no_such_header} | {:ok, binary()}
def get_header(%__MODULE__{headers: headers}, name) do
case List.keyfind(headers, name, 0) do
{_name, value} -> {:ok, value}
nil -> {:error, :no_such_header}
end
end

@doc """
Returns true if the response is an OK
```
iex> Response.ok?(Response.new(204))
true
iex> Response.ok?(Response.new(400))
false
```
"""
@spec ok?(t()) :: boolean()
def ok?(%__MODULE__{status: status}) do
div(status, 100) == 2
end

@spec parse_start_line(raw_response :: binary()) ::
{:ok, {response :: t(), remainder :: binary}} | {:error, :invalid_start_line}
defp parse_start_line(binary) do
Expand Down
30 changes: 24 additions & 6 deletions lib/membrane_rtsp/server.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
defmodule Membrane.RTSP.Server do
@moduledoc """
A module representing an RTSP server.
Implementation of an RTSP server.
To start a new server
```
{:ok, server} = Membrane.RTSP.Server.start_link(config)
```
The `start_link/1` accepts a keyword list configuration:
- `port` - The port where the server will listen for connections. default to: `554`
- `handler` - An implementation of the behaviour `Membrane.RTSP.Server.Handler`. Refer to the module
documentation for more details.
"""

use GenServer
Expand All @@ -9,20 +19,28 @@ defmodule Membrane.RTSP.Server do

alias __MODULE__

@spec start_link(non_neg_integer(), module()) :: GenServer.on_start()
def start_link(port, handler) do
GenServer.start_link(__MODULE__, %{port: port, handler: handler}, name: __MODULE__)
@type server_config :: [
name: term(),
port: non_neg_integer(),
handler: module()
]

@spec start_link(server_config()) :: GenServer.on_start()
def start_link(config) do
GenServer.start_link(__MODULE__, config, name: config[:name])
end

@impl true
def init(config) do
port = Keyword.get(config, :port, 554)

{:ok, socket} =
:gen_tcp.listen(config.port, [:binary, packet: :line, active: false, reuseaddr: true])
:gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])

parent_pid = self()
Task.start_link(fn -> do_listen(socket, parent_pid) end)

{:ok, %{socket: socket, conns: [], handler: config.handler}}
{:ok, %{socket: socket, conns: [], handler: config[:handler]}}
end

defp do_listen(socket, parent_pid) do
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane_rtsp/server/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Membrane.RTSP.Server.Conn do
state = %State{
socket: config.socket,
request_handler: config.handler,
request_handler_state: config.handler.init()
request_handler_state: config.handler.handle_open_connection(config.socket)
}

{:ok, state, {:continue, :process_client_requests}}
Expand Down
81 changes: 45 additions & 36 deletions lib/membrane_rtsp/server/handler.ex
Original file line number Diff line number Diff line change
@@ -1,65 +1,74 @@
defmodule Membrane.RTSP.Server.Handler do
@moduledoc """
Behaviour describing client request handling for Real Time Streaming Protocol
Behaviour describing client request handling for Real Time Streaming Protocol.
"""

alias Membrane.RTSP.Request
alias Membrane.RTSP.{Request, Response}

@typep control_path :: binary()

@type state :: term()
@type ssrc :: integer() | binary()
@type ssrc :: integer()
@type conn :: :inet.socket()
@type request :: Request.t()

@doc """
Callback for initializing state.
@typedoc """
A type representing the setupped tracks.
The type is a map from a control path to the setup details. Each track contains the
following information:
* `ssrc` - The synchronisation source to use in the `RTP` packets.
* `transport` - The transport used for carrying the media packets.
* `socket` - An opened socket to use to send `RTP` media packets.
* `rtcp_socket` - An opened socket to use to send `RTCP` packets. Available only when transport is `UDP`.
* `client_port` - A pair of ports to use to send `RTP` and `RTCP` packets respectively. Available only when transport is `UDP`
* `channels` - A pair of channel numbers to use to mux `RTP` and `RTCP` packets. Available only when transport is `TCP`
The state will be passed on the subsequent callbacks as the first argument
"""
@callback init() :: state()
@type setupped_tracks :: %{
control_path() => %{
:ssrc => binary(),
:transport => :UDP | :TCP,
:socket => conn(),
optional(:rtcp_socket) => conn(),
optional(:client_port) => {:inet.port_number(), :inet.port_number()},
optional(:channels) => {non_neg_integer(), non_neg_integer()}
}
}

@doc """
Callback for handling OPTIONS client request.
Callback called when a new connection is established.
The result of this callback is ignored.
The returned value is used as a state and is passed as the last argument to
the subsequent callbacks
"""
@callback handle_options(state(), Request.t()) :: state()
@callback handle_open_connection(conn()) :: state()

@doc """
Callback for handling DESCRIBE client request.
Callback called when receiving a DESCRIBE request.
`{return_value, new_state}` is the result from calling this callback, the `new_state` will be sent to the subsequent callbacks and
the `return_value` is used by the server to set the right status code:
- `{:ok, sdp or binary}` - The server will send a 200 status code with the returned binary or SDP as the body.
- `{:error, :unauthorized}` - The server will send a 401 status code.
- `{:error, :forbidden}` - The server will send a 403 status code.
- `{:error, :not_found}` - The server will send a 404 status code.
- `{:error, other}` - The server will send a 400 status code.
The return value is the response to be sent back to the client. The implementing
module need at least set the status of the response.
"""
@callback handle_describe(state(), Request.t()) ::
{{:ok, ExSDP.t() | binary()}, state()} | {{:error, term()}, state()}
@callback handle_describe(state(), request()) :: {Response.t(), state()}

@doc """
Callback for handling SETUP client request.
The handler is responsible for checking the validity of the requested URI (`path` field of the `Request` struct )
and return an error in case the same URI is setup more than once.
Callback called when receiving a SETUP request.
It should return the `ssrc` that'll be used in the RTP paylad. The server will set the proper status code
depending on the response. Check `handle_describe` for details.
The handler should check for the validity of the requested track (`path` field of the `Request` struct).
"""
@callback handle_setup(state(), Request.t()) ::
{{:ok, ssrc}, state()} | {{:error, term()}, state()}
@callback handle_setup(request(), state()) :: {Response.t(), state()}

@doc """
Callback for handling PLAY client request.
Callback called when receiving a PLAY request.
The implementer should start sending media data to the client. Since the
client will use the same connection as the RTSP session, a `socket` will passed
as a second argument to this callback. Be careful to not to try reading from the
socket as this may cause issues with the RTSP session.
`setupped_tracks` contains the needed information to start sending media packets.
Refer to the type documentation for more details
"""
@callback handle_play(state(), :inet.socket()) :: {:ok, state()} | {{:error, term()}, state()}
@callback handle_play(state(), setupped_tracks()) :: {Response.t(), state()}

@doc """
Callback for handling TEARDOWN client request.
Callback called when receiving TEARDOWN request.
"""
@callback handle_teardown(state()) :: :ok
@callback handle_teardown(state()) :: {Response.t(), state()}
end
Loading

0 comments on commit 7acc465

Please sign in to comment.