Skip to content

Commit

Permalink
Remove handle_connected and handle_published callbacks from the clien…
Browse files Browse the repository at this point in the history
…t handler behaviour. Make handle_new_client return a struct that defines the behaviour of the newly connected client. Rename cliet_handler_for_source.ex into client_handler_impl.ex to match the module name. Update the tests and examples accordingly
  • Loading branch information
varsill committed Sep 18, 2024
1 parent 3258ba2 commit 94f9c34
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 57 deletions.
2 changes: 1 addition & 1 deletion examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ parent_process_pid = self()

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
%Membrane.RTMP.Source.ClientHandlerImpl{}
end

# Run the standalone server
{:ok, server} =
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()},
port: port,
use_ssl?: false,
handle_new_client: handle_new_client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,16 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do

@behaviour Membrane.RTMPServer.ClientHandler

defstruct [:controlling_process]
defstruct []

@impl true
def handle_init(opts) do
def handle_init(_opts) do
%{
source_pid: nil,
buffered: [],
app: nil,
stream_key: nil,
controlling_process: opts.controlling_process
buffered: []
}
end

@impl true
def handle_connected(connected_msg, state) do
%{state | app: connected_msg.app}
end

@impl true
def handle_stream_published(publish_msg, state) do
%{state | stream_key: publish_msg.stream_key}
end

@impl true
def handle_info({:send_me_data, source_pid}, state) do
buffers_to_send = Enum.reverse(state.buffered)
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ defmodule Membrane.RTMP.Source do

handle_new_client = fn client_ref, app, stream_key ->
send(parent_pid, {:client_ref, client_ref, app, stream_key})
%__MODULE__.ClientHandlerImpl{}
end

{:ok, server_pid} =
Membrane.RTMPServer.start_link(
handler: %__MODULE__.ClientHandlerImpl{controlling_process: self()},
port: port,
use_ssl?: use_ssl?,
handle_new_client: handle_new_client,
Expand Down
52 changes: 17 additions & 35 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,6 @@ defmodule Membrane.RTMPServer.ClientHandler do
"""
@callback handle_init(any()) :: t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()`
message.
"""
@callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) ::
t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()`
message.
"""
@callback handle_stream_published(
publish_msg :: Membrane.RTMP.Messages.Publish.t(),
state :: t()
) :: t()

@doc """
The callback invoked when new piece of data is received from a given client.
"""
Expand Down Expand Up @@ -73,16 +57,14 @@ defmodule Membrane.RTMPServer.ClientHandler do
message_parser_state = Handshake.init_server() |> MessageParser.init()
message_handler_state = MessageHandler.init(%{socket: opts.socket, use_ssl?: opts.use_ssl?})

%handler_module{} = opts.handler

{:ok,
%{
socket: opts.socket,
use_ssl?: opts.use_ssl?,
message_parser_state: message_parser_state,
message_handler_state: message_handler_state,
handler: handler_module,
handler_state: handler_module.handle_init(opts.handler),
handler: nil,
handler_state: nil,
app: nil,
stream_key: nil,
server: opts.server,
Expand Down Expand Up @@ -163,15 +145,22 @@ defmodule Membrane.RTMPServer.ClientHandler do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} =
message_handler_state

if is_function(state.handle_new_client) do
state.handle_new_client.(self(), state.app, stream_key)
else
raise "handle_new_client is not a function"
end
handler =
if is_function(state.handle_new_client) do
state.handle_new_client.(self(), state.app, stream_key)
else
raise "handle_new_client is not a function"
end

%handler_module{} = handler
Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout)

%{state | notified_about_client?: true}
%{
state
| notified_about_client?: true,
handler: handler_module,
handler_state: handler_module.handle_init(handler)
}
else
state
end
Expand Down Expand Up @@ -210,19 +199,12 @@ defmodule Membrane.RTMPServer.ClientHandler do
}

{:connected, connected_msg} ->
new_handler_state =
state.handler.handle_connected(connected_msg, state.handler_state)

%{state | handler_state: new_handler_state, app: connected_msg.app}
%{state | app: connected_msg.app}

{:published, publish_msg} ->
new_handler_state =
state.handler.handle_stream_published(publish_msg, state.handler_state)

%{
state
| handler_state: new_handler_state,
stream_key: publish_msg.stream_key,
| stream_key: publish_msg.stream_key,
published?: true
}
end
Expand Down
1 change: 0 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ defmodule Membrane.RTMPServer.Listener do
GenServer.start_link(ClientHandler,
socket: client,
use_ssl?: options.use_ssl?,
handler: options.handler,
server: options.server,
handle_new_client: options.handle_new_client,
client_timeout: options.client_timeout
Expand Down
4 changes: 1 addition & 3 deletions test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,11 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
%Membrane.RTMP.Source.ClientHandlerImpl{}
end

{:ok, server_pid} =
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandlerImpl{
controlling_process: self()
},
port: port,
use_ssl?: use_ssl?,
handle_new_client: handle_new_client,
Expand Down

0 comments on commit 94f9c34

Please sign in to comment.