diff --git a/examples/source_with_standalone_server.exs b/examples/source_with_standalone_server.exs index 1f22a78..66566b3 100644 --- a/examples/source_with_standalone_server.exs +++ b/examples/source_with_standalone_server.exs @@ -53,12 +53,12 @@ parent_process_pid = self() handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) + %Membrane.RTMP.Source.ClientHandlerImpl{} end # Run the standalone server {:ok, server} = Membrane.RTMPServer.start_link( - handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: false, handle_new_client: handle_new_client, diff --git a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex similarity index 74% rename from lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex rename to lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex index 6039e11..ad5164b 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex @@ -6,29 +6,16 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do @behaviour Membrane.RTMPServer.ClientHandler - defstruct [:controlling_process] + defstruct [] @impl true - def handle_init(opts) do + def handle_init(_opts) do %{ source_pid: nil, - buffered: [], - app: nil, - stream_key: nil, - controlling_process: opts.controlling_process + buffered: [] } end - @impl true - def handle_connected(connected_msg, state) do - %{state | app: connected_msg.app} - end - - @impl true - def handle_stream_published(publish_msg, state) do - %{state | stream_key: publish_msg.stream_key} - end - @impl true def handle_info({:send_me_data, source_pid}, state) do buffers_to_send = Enum.reverse(state.buffered) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index 41881fd..f1c063f 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -86,11 +86,11 @@ defmodule Membrane.RTMP.Source do handle_new_client = fn client_ref, app, stream_key -> send(parent_pid, {:client_ref, client_ref, app, stream_key}) + %__MODULE__.ClientHandlerImpl{} end {:ok, server_pid} = Membrane.RTMPServer.start_link( - handler: %__MODULE__.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client, diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 5a08a66..f173b42 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -23,22 +23,6 @@ defmodule Membrane.RTMPServer.ClientHandler do """ @callback handle_init(any()) :: t() - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()` - message. - """ - @callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) :: - t() - - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()` - message. - """ - @callback handle_stream_published( - publish_msg :: Membrane.RTMP.Messages.Publish.t(), - state :: t() - ) :: t() - @doc """ The callback invoked when new piece of data is received from a given client. """ @@ -73,16 +57,14 @@ defmodule Membrane.RTMPServer.ClientHandler do message_parser_state = Handshake.init_server() |> MessageParser.init() message_handler_state = MessageHandler.init(%{socket: opts.socket, use_ssl?: opts.use_ssl?}) - %handler_module{} = opts.handler - {:ok, %{ socket: opts.socket, use_ssl?: opts.use_ssl?, message_parser_state: message_parser_state, message_handler_state: message_handler_state, - handler: handler_module, - handler_state: handler_module.handle_init(opts.handler), + handler: nil, + handler_state: nil, app: nil, stream_key: nil, server: opts.server, @@ -163,15 +145,22 @@ defmodule Membrane.RTMPServer.ClientHandler do %{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} = message_handler_state - if is_function(state.handle_new_client) do - state.handle_new_client.(self(), state.app, stream_key) - else - raise "handle_new_client is not a function" - end + handler = + if is_function(state.handle_new_client) do + state.handle_new_client.(self(), state.app, stream_key) + else + raise "handle_new_client is not a function" + end + %handler_module{} = handler Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout) - %{state | notified_about_client?: true} + %{ + state + | notified_about_client?: true, + handler: handler_module, + handler_state: handler_module.handle_init(handler) + } else state end @@ -210,19 +199,12 @@ defmodule Membrane.RTMPServer.ClientHandler do } {:connected, connected_msg} -> - new_handler_state = - state.handler.handle_connected(connected_msg, state.handler_state) - - %{state | handler_state: new_handler_state, app: connected_msg.app} + %{state | app: connected_msg.app} {:published, publish_msg} -> - new_handler_state = - state.handler.handle_stream_published(publish_msg, state.handler_state) - %{ state - | handler_state: new_handler_state, - stream_key: publish_msg.stream_key, + | stream_key: publish_msg.stream_key, published?: true } end diff --git a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex index 6cf2865..a24312e 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex @@ -51,7 +51,6 @@ defmodule Membrane.RTMPServer.Listener do GenServer.start_link(ClientHandler, socket: client, use_ssl?: options.use_ssl?, - handler: options.handler, server: options.server, handle_new_client: options.handle_new_client, client_timeout: options.client_timeout diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index 9d5d489..9b9bd31 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -222,13 +222,11 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) + %Membrane.RTMP.Source.ClientHandlerImpl{} end {:ok, server_pid} = Membrane.RTMPServer.start_link( - handler: %Membrane.RTMP.Source.ClientHandlerImpl{ - controlling_process: self() - }, port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client,