Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite on the new core #2

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Application do

@defined_plugs [Plugs.Launch]
@plugs_in_config Application.compile_env(:membrane_opentelemetry_plugs, :plugs, [])
@enabled_plugs @defined_plugs |> Enum.filter(& &1 in @plugs_in_config)
@enabled_plugs @defined_plugs |> Enum.filter(&(&1 in @plugs_in_config))

Enum.each(@plugs_in_config, fn plug ->
if plug not in @defined_plugs do
Expand Down
62 changes: 37 additions & 25 deletions lib/lanuch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,47 +34,59 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch do

defp attach_telemetry_handlers() do
:ok =
:telemetry.attach(
{__MODULE__, :start_span},
[:membrane, :handle_init, :start],
&HandlerFunctions.start_span/4,
nil
attach(
:start_span,
:handle_init,
:start,
&HandlerFunctions.start_span/4
)

@all_callbacks
|> Enum.each(fn callback ->
:ok =
:telemetry.attach(
{__MODULE__, callback, :start},
[:membrane, callback, :start],
&HandlerFunctions.callback_start/4,
nil
attach(
{callback, :start},
callback,
:start,
&HandlerFunctions.callback_start/4
)

:ok =
:telemetry.attach(
{__MODULE__, callback, :stop},
[:membrane, callback, :stop],
&HandlerFunctions.callback_stop/4,
nil
attach(
{callback, :stop},
callback,
:stop,
&HandlerFunctions.callback_stop/4
)
end)

:ok =
:telemetry.attach(
{__MODULE__, :maybe_end_span_on_start_of_stream},
[:membrane, :handle_start_of_stream, :stop],
&HandlerFunctions.ensure_span_ended/4,
nil
attach(
:maybe_end_span_on_start_of_stream,
:handle_start_of_stream,
:stop,
&HandlerFunctions.ensure_span_ended/4
)

:ok =
attach(
:maybe_end_span_on_playing,
:handle_playing,
:stop,
&HandlerFunctions.maybe_end_span/4
)
end

defp attach(id, callback, start_or_stop, handler) do
[:pipeline, :bin, :element]
|> Enum.each(fn component_type ->
:telemetry.attach(
{__MODULE__, :maybe_end_span_on_playing},
[:membrane, :handle_playing, :stop],
&HandlerFunctions.maybe_end_span/4,
{__MODULE__, component_type, id},
[:membrane, component_type, callback, start_or_stop],
handler,
nil
)
end)
end

defp detach_telemetry_handlers() do
Expand All @@ -87,8 +99,8 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch do
end)
|> Enum.concat([
{__MODULE__, :start_span},
{__MODULE__, :end_element_span},
{__MODULE__, :end_parent_span}
{__MODULE__, :maybe_end_span_on_start_of_stream},
{__MODULE__, :maybe_end_span_on_playing}
])
|> Enum.each(fn handler_id ->
:ok = :telemetry.detach(handler_id)
Expand Down
52 changes: 30 additions & 22 deletions lib/launch/ets_wrapper.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do
@pid_to_span_and_pipeline_ets :__membrane_opentelemetry_plugs_launch_pid_to_span_and_pipeline__
@pipeline_pid_to_parents_ets :__membrane_opentelemetry_plugs_launch_pipeline_pid_to_parents__
alias Membrane.ComponentPath

@component_path_to_span_ets :__membrane_opentelemetry_plugs_component_path_to_span__
@pipeline_to_parents_ets :__membrane_opentelemetry_plugs_launch_pipeline_to_parents__

@spec setup_ets_tables() :: :ok
def setup_ets_tables() do
:ets.new(@pid_to_span_and_pipeline_ets, [
:ets.new(@component_path_to_span_ets, [
:public,
# unique keys
:set,
:named_table,
{:read_concurrency, true}
])

:ets.new(@pipeline_pid_to_parents_ets, [
:ets.new(@pipeline_to_parents_ets, [
:public,
# allows duplicates
:bag,
Expand All @@ -25,40 +27,46 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do

@spec delete_ets_tables() :: :ok
def delete_ets_tables() do
:ets.delete(@pid_to_span_and_pipeline_ets)
:ets.delete(@pipeline_pid_to_parents_ets)
:ets.delete(@component_path_to_span_ets)
:ets.delete(@pipeline_to_parents_ets)
:ok
end

@spec get_span_and_pipeline(pid()) :: {:ok, OpenTelemetry.span_ctx(), pid()} | :error
def get_span_and_pipeline(pid) do
case :ets.lookup(@pid_to_span_and_pipeline_ets, pid) do
[{^pid, {span_ctx, pipeline}}] -> {:ok, span_ctx, pipeline}
@spec get_span(ComponentPath.t()) :: {:ok, OpenTelemetry.span_ctx()} | :error
def get_span(component_path) do
case :ets.lookup(@component_path_to_span_ets, component_path) do
[{^component_path, span}] -> {:ok, span}
[] -> :error
end
end

@spec store_span_and_pipeline(OpenTelemetry.span_ctx(), pid()) :: :ok
def store_span_and_pipeline(span_ctx, pipeline) do
:ets.insert(@pid_to_span_and_pipeline_ets, {self(), {span_ctx, pipeline}})
@spec store_span(ComponentPath.t(), OpenTelemetry.span_ctx()) :: :ok
def store_span(component_path, span) do
:ets.insert(@component_path_to_span_ets, {component_path, span})
:ok
end

def delete_span_and_pipeline(component_pid, span_ctx, pipeline) do
:ets.delete(@pid_to_span_and_pipeline_ets, {component_pid, {span_ctx, pipeline}})
@spec delete_span_and_pipeline(ComponentPath.t(), OpenTelemetry.span_ctx()) :: :ok
def delete_span_and_pipeline(component_path, span) do
:ets.delete(@component_path_to_span_ets, {component_path, span})
:ok
end

def store_as_parent_within_pipeline(pipeline) do
:ets.insert(@pipeline_pid_to_parents_ets, {pipeline, self()})
@spec store_as_parent_within_pipeline(ComponentPath.t(), ComponentPath.t()) :: :ok
def store_as_parent_within_pipeline(my_component_path, pipeline_path) do
:ets.insert(@pipeline_to_parents_ets, {pipeline_path, my_component_path})
:ok
end

def get_parents_within_pipeline(pipeline) do
:ets.lookup(@pipeline_pid_to_parents_ets, pipeline)
|> Enum.map(fn {^pipeline, component} -> component end)
@spec get_parents_within_pipeline(ComponentPath.t()) :: [ComponentPath.t()]
def get_parents_within_pipeline(pipeline_path) do
:ets.lookup(@pipeline_to_parents_ets, pipeline_path)
|> Enum.map(fn {^pipeline_path, component_path} -> component_path end)
end

def delete_parent_within_pipeline(pipeline, parent) do
:ets.delete(@pipeline_pid_to_parents_ets, {pipeline, parent})
@spec delete_parent_within_pipeline(ComponentPath.t(), ComponentPath.t()) :: :ok
def delete_parent_within_pipeline(pipeline_path, parent_path) do
:ets.delete(@pipeline_to_parents_ets, {pipeline_path, parent_path})
:ok
end
end
Loading