Skip to content

Commit

Permalink
works but goes into infinite loop
Browse files Browse the repository at this point in the history
  • Loading branch information
bartkrak committed Jun 13, 2024
1 parent 45c5bbb commit dd06232
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 19 deletions.
2 changes: 1 addition & 1 deletion examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Logger.configure([{:level, :info}])

# Subscribe to receive client handler that connected to the
# server with given app id and stream key
:ok = Membrane.RTMP.Server.subscribe(server, "app", "stream_key")
# :ok = Membrane.RTMP.Server.subscribe(server, "app", "stream_key")

# Wait for the client handler
client_handler =
Expand Down
21 changes: 15 additions & 6 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ defmodule Membrane.RTMP.MessageHandler do
header_sent?: false,
events: [],
receiver_pid: nil,
#
publish_msg: nil,
publish_header: nil,
# how many times the Source tries to get control of the socket
socket_retries: 3,
# epoch required for performing a handshake with the pipeline
Expand All @@ -72,6 +75,14 @@ defmodule Membrane.RTMP.MessageHandler do
end)
end

@spec send_publish_success(map()) :: {map(), list()}
def send_publish_success(state) do
Responses.publish_success(state.publish_msg.stream_key)
|> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: state.publish_header.stream_id)

{%{state | events: []}, [{:published, state.publish_msg} | state.events]}
end

# Expected flow of messages:
# 1. [in] c0_c1 handshake -> [out] s0_s1_s2 handshake
# 2. [in] c2 handshake -> [out] empty
Expand Down Expand Up @@ -140,13 +151,11 @@ defmodule Membrane.RTMP.MessageHandler do
defp do_handle_client_message(%Messages.Publish{} = publish_msg, %Header{} = header, state) do
%Messages.UserControl{event_type: @stream_begin_type, data: <<0, 0, 0, 1>>}
|> send_rtmp_payload(state.socket, chunk_stream_id: 2)
# at this point abort handshake and ask server if someone is subscribed to this url
IO.inspect(publish_msg.stream_key, label: "paused handshake for")

# THIS
Responses.publish_success(publish_msg.stream_key)
|> send_rtmp_payload(state.socket, chunk_stream_id: 3, stream_id: header.stream_id)

state = %{state | events: [{:published, publish_msg} | state.events]}
{:cont, state}
# state = send_publish_success(publish_msg, header.stream_id, state)
{:halt, %{state | publish_msg: publish_msg, publish_header: header}}
end

# A message containing stream metadata
Expand Down
23 changes: 17 additions & 6 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ defmodule Membrane.RTMP.Server do

@impl true
def handle_cast({:subscribe, app, stream_key, subscriber_pid}, state) do
IO.puts("handle_cast :subscribe")
# IO.puts("handle_cast :subscribe")
state = put_in(state, [:subscriptions, {app, stream_key}], subscriber_pid)
maybe_send_subscription(app, stream_key, state)
{:noreply, state}
Expand All @@ -109,20 +109,31 @@ defmodule Membrane.RTMP.Server do
{:noreply, state}
end

@impl true
def handle_info({:client_register_attempt, app, stream_key, client_handler_pid}, state) do
IO.inspect("handle_info :client_register_attempt #{app} #{stream_key}")
# check if anyone is subscribed to app and stream_key
if state.subscriptions[{app, stream_key}] != nil do
IO.inspect("sub exist")
# respond to client handler that subscription exist
send(client_handler_pid, :sub_exists)
end

# maybe_send_subscription(app, stream_key, state)
{:noreply, state}
end

@impl true
def handle_info({:port, port}, state) do
Enum.each(state.to_reply, &GenServer.reply(&1, port))
{:noreply, %{state | port: port, to_reply: []}}
end

defp maybe_send_subscription(app, stream_key, state) do
IO.inspect(state, label: "maybe_send_subscription")

# IO.inspect(state, label: "maybe_send_subscription")
if state.subscriptions[{app, stream_key}] != nil and state.mapping[{app, stream_key}] != nil do
# here also send message to client hadnler to finish handshake
# IO.inspect(target_client_handler_pid, label: "target client handler")

send(state.mapping[{app, stream_key}], :subscribed)
# send(state.mapping[{app, stream_key}], :subscribed)

send(
state.subscriptions[{app, stream_key}],
Expand Down
22 changes: 16 additions & 6 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ defmodule Membrane.RTMP.Server.ClientHandler do
{:noreply, state}
end

@impl true
def handle_info(:subscribed, state) do
IO.puts("siemapl")
{:noreply, state}
end

@impl true
def handle_info(:control_granted, state) do
request_data(state)
Expand All @@ -88,6 +82,14 @@ defmodule Membrane.RTMP.Server.ClientHandler do
{:noreply, state}
end

@impl true
def handle_info(:sub_exists, state) do
# finish RTMP handshake
{message_handler_state, events} = MessageHandler.send_publish_success(state.message_handler_state)
state = Enum.reduce(events, state, &handle_event/2)
{:noreply, %{state | message_handler_state: message_handler_state}}
end

@impl true
def handle_info(other_msg, state) do
behaviour_state = state.behaviour.handle_info(other_msg, state.behaviour_state)
Expand All @@ -102,6 +104,13 @@ defmodule Membrane.RTMP.Server.ClientHandler do
{message_handler_state, events} =
MessageHandler.handle_client_messages(messages, state.message_handler_state)

if message_handler_state.publish_msg != nil do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} = message_handler_state
IO.inspect(stream_key, label: "stream_key")
# ask server if someone is subscribed to this
send(state.server, {:client_register_attempt, state.app, stream_key, self()})
end

state = Enum.reduce(events, state, &handle_event/2)

request_data(state)
Expand All @@ -114,6 +123,7 @@ defmodule Membrane.RTMP.Server.ClientHandler do
}}
end


defp handle_event(event, state) do
# call callbacks
case event do
Expand Down

0 comments on commit dd06232

Please sign in to comment.