Skip to content

Commit

Permalink
Make tests pass, change get_transport
Browse files Browse the repository at this point in the history
  • Loading branch information
Noarkhh committed Apr 24, 2024
1 parent 9fe8b8a commit 293c231
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 217 deletions.
65 changes: 34 additions & 31 deletions lib/membrane_rtsp/rtsp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.RTSP do

alias Membrane.RTSP
alias Membrane.RTSP.Logic.State
alias Membrane.RTSP.{Request, Response}
alias Membrane.RTSP.{Request, Response, TCPSocket}

@type t() :: pid()

Expand All @@ -22,21 +22,20 @@ defmodule Membrane.RTSP do
* options - a keyword list that shall be passed when executing request over
transport
"""
@spec start_link(binary(), module() | URI.t(), Keyword.t()) :: GenServer.on_start()
def start_link(url, transport \\ Membrane.RTSP.Transport.TCPSocket, options \\ []) do
do_start(url, transport, options, &GenServer.start_link/2)
@spec start_link(binary() | URI.t(), Keyword.t()) :: GenServer.on_start()
def start_link(url, options \\ []) do
do_start(url, options, &GenServer.start_link/2)
end

@spec start(binary(), module() | URI.t(), Keyword.t()) :: GenServer.on_start()
def start(url, transport \\ Membrane.RTSP.Transport.TCPSocket, options \\ []) do
do_start(url, transport, options, &GenServer.start/2)
@spec start(binary() | URI.t(), Keyword.t()) :: GenServer.on_start()
def start(url, options \\ []) do
do_start(url, options, &GenServer.start/2)
end

defp do_start(url, transport, options, start_fun) do
defp do_start(url, options, start_fun) do
case URI.parse(url) do
%URI{host: host, scheme: "rtsp"} = url when is_binary(host) ->
start_fun.(__MODULE__, %{
transport: transport,
url: %URI{url | port: url.port || @default_rtsp_port},
options: options
})
Expand All @@ -47,7 +46,7 @@ defmodule Membrane.RTSP do
end

@impl true
def init(%{url: url, options: options, transport: transport_module}) do
def init(%{url: url, options: options}) do
auth_type =
case url do
%URI{userinfo: nil} -> nil
Expand All @@ -56,10 +55,9 @@ defmodule Membrane.RTSP do
%URI{userinfo: info} when is_binary(info) -> :basic
end

with {:ok, transport} <- transport_module.init(url, options) do
with {:ok, socket} <- TCPSocket.init(url, options) do
state = %State{
transport: transport,
transport_module: transport_module,
socket: socket,
uri: url,
execution_options: options,
auth: auth_type
Expand All @@ -81,8 +79,13 @@ defmodule Membrane.RTSP do
end
end

def handle_call(:get_transport, _from, %State{transport: transport} = state) do
{:reply, transport, state}
def handle_call(
{:get_socket_control, new_controlling_process},
_from,
%State{socket: socket} = state
) do
:ok = :gen_tcp.controlling_process(socket, new_controlling_process)
{:reply, socket, state}
end

def handle_call({:parse_response, raw_response}, _from, state) do
Expand All @@ -108,16 +111,16 @@ defmodule Membrane.RTSP do
end
end

@impl true
# this might be a message for transport layer. Redirect
def handle_info(msg, %State{} = state) do
state.transport_module.handle_info(msg, state.transport)
|> translate(:transport, state)
end
# @impl true
# # this might be a message for transport layer. Redirect
# def handle_info(msg, %State{} = state) do
# TCPSocket.handle_info(msg, state.transport)
# |> translate(:transport, state)
# end

@impl true
def terminate(_reason, state) do
state.transport_module.close(state.transport)
TCPSocket.close(state.socket)
end

@spec request(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: Response.result()
Expand All @@ -135,19 +138,19 @@ defmodule Membrane.RTSP do
@spec close(pid()) :: :ok
def close(session), do: GenServer.cast(session, :terminate)

defp translate({action, new_state}, key, state) do
{action, Map.put(state, key, new_state)}
end
# defp translate({action, new_state}, key, state) do
# {action, Map.put(state, key, new_state)}
# end

defp translate({action, reply, new_state}, key, state) do
{action, reply, Map.put(state, key, new_state)}
end
# defp translate({action, reply, new_state}, key, state) do
# {action, reply, Map.put(state, key, new_state)}
# end

@type headers :: [{binary(), binary()}]

@spec get_transport(t()) :: any()
def get_transport(session) do
GenServer.call(session, :get_transport)
@spec get_socket_control(t(), pid()) :: :gen_tcp.socket()
def get_socket_control(session, new_controlling_process) do
GenServer.call(session, {:get_socket_control, new_controlling_process})
end

@spec get_parameter_no_response(t(), headers(), binary()) :: :ok
Expand Down
11 changes: 5 additions & 6 deletions lib/membrane_rtsp/session/logic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ defmodule Membrane.RTSP.Logic do
@moduledoc """
Logic for RTSP session
"""
alias Membrane.RTSP.{Request, Response}
alias Membrane.RTSP.{Request, Response, TCPSocket}
@user_agent "MembraneRTSP/#{Mix.Project.config()[:version]} (Membrane Framework RTSP Client)"

defmodule State do
@moduledoc "Struct representing the state of RTSP session"
@enforce_keys [:transport, :uri, :transport_module]
@enforce_keys [:socket, :uri]
defstruct @enforce_keys ++
[
:session_id,
Expand All @@ -24,7 +24,7 @@ defmodule Membrane.RTSP.Logic do
@type auth_t() :: nil | :basic | {:digest, digest_opts()}

@type t :: %__MODULE__{
transport: any(),
socket: :gen_tcp.socket(),
cseq: non_neg_integer(),
uri: URI.t(),
session_id: binary() | nil,
Expand All @@ -41,8 +41,7 @@ defmodule Membrane.RTSP.Logic do
def execute(request, state, get_response \\ true) do
%State{
cseq: cseq,
transport: transport,
transport_module: transport_module,
socket: socket,
uri: uri,
session_id: session_id,
execution_options: execution_options
Expand All @@ -55,7 +54,7 @@ defmodule Membrane.RTSP.Logic do
|> Request.with_header("User-Agent", @user_agent)
|> apply_credentials(uri, state.auth)
|> Request.stringify(uri)
|> transport_module.execute(transport, execution_options ++ [get_response: get_response])
|> TCPSocket.execute(socket, execution_options ++ [get_response: get_response])
end

@spec inject_session_header(Request.t(), binary()) :: Request.t()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.RTSP.Transport.TCPSocket do
defmodule Membrane.RTSP.TCPSocket do
@moduledoc """
This module implements the Transport behaviour and transmits requests over TCP
Socket keeping connection until either session is closed or connection is
Expand All @@ -8,13 +8,11 @@ defmodule Membrane.RTSP.Transport.TCPSocket do
* timeout - time after request will be deemed missing and error shall be
returned.
"""
use Membrane.RTSP.Transport
import Mockery.Macro

@connection_timeout 1000
@response_timeout 5000

@impl true
def init(%URI{} = connection_info, options \\ []) do
connection_timeout = options[:connection_timeout] || @connection_timeout

Expand All @@ -34,20 +32,19 @@ defmodule Membrane.RTSP.Transport.TCPSocket do
)
end

@impl true
@spec execute(binary(), :gen_tcp.socket(), Keyword.t()) ::
:ok | {:ok, binary()} | {:error, :closed | :timeout | :inet.posix()}
def execute(request, socket, options) do
case mockable(:gen_tcp).send(socket, request) do
:ok -> if options[:get_response], do: recv(socket, options), else: :ok
error -> error
end
end

@impl true
def handle_info({:tcp_closed, _socket}, state) do
{:stop, :socket_closed, state}
end

@impl true
def close(_state), do: :ok

defp recv(socket, options, length \\ 0, acc \\ <<>>) do
Expand Down
51 changes: 0 additions & 51 deletions lib/membrane_rtsp/transport/transport.ex

This file was deleted.

7 changes: 3 additions & 4 deletions test/membrane_rtsp/session/session_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@ defmodule Membrane.RTSP.IntegrationTest do

alias Membrane.RTSP
alias Membrane.RTSP.{Request, Response}
alias Membrane.RTSP.Transport.TCPSocket

@uri "rtsp://wowzaec2demo.streamlock.net:554/vod/mp4:BigBuckBunny_115k.mov"

describe "Session works in combination with" do
@tag external: true
test "real transport" do
integration_test(@uri, TCPSocket)
integration_test(@uri)
end
end

defp integration_test(uri, transport, options \\ []) do
{:ok, pid} = RTSP.start_link(uri, transport, options)
defp integration_test(uri, options \\ []) do
{:ok, pid} = RTSP.start_link(uri, options)

request = %Request{
method: "DESCRIBE",
Expand Down
Loading

0 comments on commit 293c231

Please sign in to comment.