Skip to content

Commit

Permalink
clean: tidy up code a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
rhblind committed Sep 23, 2024
1 parent 4ead589 commit a345715
Showing 1 changed file with 57 additions and 43 deletions.
100 changes: 57 additions & 43 deletions lib/off_broadway/emqqt/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ defmodule OffBroadway.EMQTT.Broker do
{:noreply, %{state | topic_subscriptions: subscriptions, buffer_threshold_ref: ref}}
end

@impl true
def handle_cast(:stop_emqtt, state) do
if Process.alive?(state.emqtt), do: :emqtt.stop(state.emqtt)
{:noreply, state}
end

@impl true
def handle_info({:publish, message}, state) do
case {:ets.info(state.ets_table, :size), state.buffer_overflow} do
Expand All @@ -93,26 +99,6 @@ defmodule OffBroadway.EMQTT.Broker do
{:noreply, state}
end

def check_buffer_threshold(buffer_size, {min_threshold, max_threshold}, ets_table, emqtt) do
case buffer_fill_percentage(buffer_size, :ets.info(ets_table, :size)) do
fill_percentage when fill_percentage >= max_threshold ->
client_id = :emqtt.info(emqtt)[:clientid]

Logger.warning(
"Buffer fill percentage for client id #{client_id} is " <>
"#{:erlang.float_to_binary(fill_percentage, decimals: 2)}%, pausing EMQTT client"
)

:ok = :emqtt.pause(emqtt)

fill_percentage when fill_percentage < min_threshold ->
:ok = :emqtt.resume(emqtt)

_fill_percentage ->
:ok
end
end

def handle_info({:DOWN, ref, :process, _, :normal}, state) when ref == state.emqtt_ref, do: {:noreply, state}

def handle_info({:DOWN, ref, :process, _, _reason}, state) when ref == state.emqtt_ref do
Expand All @@ -129,33 +115,29 @@ defmodule OffBroadway.EMQTT.Broker do
:ets.delete(state.ets_table)
end

@spec subscribe(pid(), {String.t(), term()}) :: {:ok, {:via, port()}, [pos_integer()]} | {:error, term()}
defp subscribe(emqtt, topic) when is_tuple(topic), do: :emqtt.subscribe(emqtt, topic)
@spec stop_emqtt(pid()) :: :ok
def stop_emqtt(pid), do: GenServer.cast(pid, :stop_emqtt)

defp measure_buffer_event(client_id, topic, buffer_size, event_type) do
:telemetry.execute(
[:off_broadway_emqtt, :buffer, event_type],
%{time: System.system_time(), count: 1},
%{client_id: client_id, topic: topic, buffer_size: buffer_size}
)
end
@spec check_buffer_threshold(non_neg_integer(), {non_neg_integer(), non_neg_integer()}, atom(), pid()) :: :ok
def check_buffer_threshold(buffer_size, {min_threshold, max_threshold}, ets_table, emqtt) do
case buffer_fill_percentage(buffer_size, :ets.info(ets_table, :size)) do
fill_percentage when fill_percentage >= max_threshold ->
client_id = :emqtt.info(emqtt)[:clientid]

defp buffer_fill_percentage(buffer_size, count), do: min(100.0, count * 100.0 / buffer_size)
Logger.warning(
"Buffer fill percentage for client id #{client_id} is " <>
"#{:erlang.float_to_binary(fill_percentage, decimals: 2)}%, pausing EMQTT client"
)

# @spec emqtt_message_handler(atom() | {atom(), keyword()}) :: map()
# defp emqtt_message_handler(message_handler) do
# {message_handler, args} =
# case Producer.message_handler_module(message_handler) do
# {message_handler, args} -> {message_handler, args}
# message_handler -> {message_handler, []}
# end
:ok = :emqtt.pause(emqtt)

# %{
# connected: {message_handler, :handle_connect, args},
# disconnected: {message_handler, :handle_disconnect, args},
# pubrel: {message_handler, :handle_pubrel, args}
# }
# end
fill_percentage when fill_percentage < min_threshold ->
:ok = :emqtt.resume(emqtt)

_fill_percentage ->
:ok
end
end

@spec stream_from_buffer(atom()) :: Enumerable.t()
def stream_from_buffer(ets_table) do
Expand All @@ -171,6 +153,7 @@ defmodule OffBroadway.EMQTT.Broker do
)
end

@spec receive_first(atom(), list()) :: {term(), [non_neg_integer()]} | {:halt, list()}
defp receive_first(ets_table, acc) do
with key when is_integer(key) <- :ets.first(ets_table),
spec <- [{{:"$1", :"$2"}, [{:==, :"$1", key}], [:"$2"]}],
Expand All @@ -181,6 +164,7 @@ defmodule OffBroadway.EMQTT.Broker do
end
end

@spec receive_next(atom(), list()) :: {term(), [non_neg_integer()]} | {:halt, list()}
defp receive_next(ets_table, acc) do
with key when is_integer(key) <- :ets.next(ets_table, acc),
spec <- [{{:"$1", :"$2"}, [{:==, :"$1", key}], [:"$2"]}],
Expand All @@ -190,4 +174,34 @@ defmodule OffBroadway.EMQTT.Broker do
_ -> {:halt, acc}
end
end

@spec subscribe(pid(), {String.t(), term()}) :: {:ok, {:via, port()}, [pos_integer()]} | {:error, term()}
defp subscribe(emqtt, topic) when is_tuple(topic), do: :emqtt.subscribe(emqtt, topic)

@spec measure_buffer_event(String.t(), String.t(), non_neg_integer(), atom()) :: :ok
defp measure_buffer_event(client_id, topic, buffer_size, event_type) do
:telemetry.execute(
[:off_broadway_emqtt, :buffer, event_type],
%{time: System.system_time(), count: 1},
%{client_id: client_id, topic: topic, buffer_size: buffer_size}
)
end

@spec buffer_fill_percentage(non_neg_integer(), non_neg_integer()) :: float()
defp buffer_fill_percentage(buffer_size, count), do: min(100.0, count * 100.0 / buffer_size)

# @spec emqtt_message_handler(atom() | {atom(), keyword()}) :: map()
# defp emqtt_message_handler(message_handler) do
# {message_handler, args} =
# case Producer.message_handler_module(message_handler) do
# {message_handler, args} -> {message_handler, args}
# message_handler -> {message_handler, []}
# end

# %{
# connected: {message_handler, :handle_connect, args},
# disconnected: {message_handler, :handle_disconnect, args},
# pubrel: {message_handler, :handle_pubrel, args}
# }
# end
end

0 comments on commit a345715

Please sign in to comment.