From 3dfca7aa5f50bf2fb1c0817f68886119ccf4d8c8 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 10 Mar 2025 17:26:09 +0100 Subject: [PATCH 1/4] Rewrite on new core wip --- lib/launch/ets_wrapper.ex | 36 ++++++----- lib/launch/handler_functions.ex | 105 ++++++++++++++++++-------------- mix.exs | 6 +- 3 files changed, 82 insertions(+), 65 deletions(-) diff --git a/lib/launch/ets_wrapper.ex b/lib/launch/ets_wrapper.ex index f688294..ac7e31a 100644 --- a/lib/launch/ets_wrapper.ex +++ b/lib/launch/ets_wrapper.ex @@ -1,10 +1,14 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do + alias Membrane.ComponentPath @pid_to_span_and_pipeline_ets :__membrane_opentelemetry_plugs_launch_pid_to_span_and_pipeline__ @pipeline_pid_to_parents_ets :__membrane_opentelemetry_plugs_launch_pipeline_pid_to_parents__ + @component_path_to_span_ets :__membrane_opentelemetry_plugs_component_path_to_span__ + @pipeline_to_parents_ets :__membrane_opentelemetry_plugs_launch_pipeline_to_parents__ + @spec setup_ets_tables() :: :ok def setup_ets_tables() do - :ets.new(@pid_to_span_and_pipeline_ets, [ + :ets.new(@component_path_to_span_ets, [ :public, # unique keys :set, @@ -12,7 +16,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do {:read_concurrency, true} ]) - :ets.new(@pipeline_pid_to_parents_ets, [ + :ets.new(@pipeline_to_parents_ets, [ :public, # allows duplicates :bag, @@ -25,8 +29,8 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do @spec delete_ets_tables() :: :ok def delete_ets_tables() do - :ets.delete(@pid_to_span_and_pipeline_ets) - :ets.delete(@pipeline_pid_to_parents_ets) + :ets.delete(@component_path_to_span_ets) + :ets.delete(@pipeline_to_parents_ets) :ok end @@ -38,27 +42,27 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do end end - @spec store_span_and_pipeline(OpenTelemetry.span_ctx(), pid()) :: :ok - def store_span_and_pipeline(span_ctx, pipeline) do - :ets.insert(@pid_to_span_and_pipeline_ets, {self(), {span_ctx, pipeline}}) + @spec store_span(ComponentPath.t(), OpenTelemetry.span_ctx()) :: :ok + def store_span(component_path, span_ctx) do + :ets.insert(@pid_to_span_and_pipeline_ets, {component_path, span_ctx}) :ok end - def delete_span_and_pipeline(component_pid, span_ctx, pipeline) do - :ets.delete(@pid_to_span_and_pipeline_ets, {component_pid, {span_ctx, pipeline}}) + def delete_span_and_pipeline(component_path, span_ctx) do + :ets.delete(@pid_to_span_and_pipeline_ets, {component_path, span_ctx}) end - def store_as_parent_within_pipeline(pipeline) do - :ets.insert(@pipeline_pid_to_parents_ets, {pipeline, self()}) + def store_as_parent_within_pipeline(my_component_path, pipeline_path) do + :ets.insert(@pipeline_to_parents_ets, {pipeline, my_component_path}) :ok end - def get_parents_within_pipeline(pipeline) do - :ets.lookup(@pipeline_pid_to_parents_ets, pipeline) - |> Enum.map(fn {^pipeline, component} -> component end) + def get_parents_within_pipeline(pipeline_path) do + :ets.lookup(@pipeline_pid_to_parents_ets, pipeline_path) + |> Enum.map(fn {^pipeline_path, component_path} -> component_path end) end - def delete_parent_within_pipeline(pipeline, parent) do - :ets.delete(@pipeline_pid_to_parents_ets, {pipeline, parent}) + def delete_parent_within_pipeline(pipeline_path, parent_path) do + :ets.delete(@pipeline_pid_to_parents_ets, {pipeline_path, parent_path}) end end diff --git a/lib/launch/handler_functions.ex b/lib/launch/handler_functions.ex index 8b1b3c1..ee5b410 100644 --- a/lib/launch/handler_functions.ex +++ b/lib/launch/handler_functions.ex @@ -3,6 +3,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do require Membrane.OpenTelemetry require Membrane.Logger + alias Membrane.ComponentPath alias Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper @tracked_pipelines Application.compile_env( @@ -15,62 +16,64 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do @spec tracked_pipelines() :: atom() | [module()] def tracked_pipelines(), do: @tracked_pipelines - @spec start_span(:telemetry.event_name(), map(), map(), any()) :: :ok + @spec start_span(:telemetry.event_name(), measurements :: map(), metadata :: Membrane.Telemetry.callback_span_metadata(), config :: any()) :: :ok def start_span(_name, _measurements, metadata, _config) do - metadata.component_state.module.membrane_component_type() - |> do_start_span(metadata.component_state) - + do_start_span(metadata) :ok end - defp do_start_span(component_type, component_state) + @spec do_start_span(Membrane.Telemetry.callback_span_metadata()) :: any() + defp do_start_span(metadata) - defp do_start_span(:pipeline, component_state) do + defp do_start_span(%{component_type: :pipeline} = metadata) do # trick to mute dialyzer tracked_pipelines = apply(__MODULE__, :tracked_pipelines, []) - if tracked_pipelines == :all or component_state.module in tracked_pipelines do - span_id = get_span_id(:pipeline, component_state) + if tracked_pipelines == :all or metadata.callback_context.module in tracked_pipelines do + span_id = get_span_id(metadata) Process.put(@pdict_span_id_key, span_id) Membrane.OpenTelemetry.start_span(span_id) - pipeline = self() + pipeline_path = ComponentPath.get() - Membrane.OpenTelemetry.get_span(span_id) - |> ETSWrapper.store_span_and_pipeline(pipeline) + span = Membrane.OpenTelemetry.get_span(span_id) + ETSWrapper.store_span(pipeline_path, span) - ETSWrapper.store_as_parent_within_pipeline(pipeline) - set_span_attributes(component_state) + ETSWrapper.metadata(pipeline_path, pipeline_path) + set_span_attributes(metadata) - Task.start(__MODULE__, :pipeline_monitor, [pipeline]) + Task.start(__MODULE__, :pipeline_monitor, [self(), pipeline_path]) end end - defp do_start_span(:bin, component_state) do - with {:ok, parent_span_ctx, pipeline} <- - ETSWrapper.get_span_and_pipeline(component_state.parent_pid) do - span_id = get_span_id(:bin, component_state) + defp do_start_span(%{component_type: :bin} = metadata) do + with {:ok, parent_span} <- + get_parent_component_path() |> ETSWrapper.get_span() do + span_id = get_span_id(metadata) Process.put(@pdict_span_id_key, span_id) - Membrane.OpenTelemetry.start_span(span_id, parent_span: parent_span_ctx) + Membrane.OpenTelemetry.start_span(span_id, parent_span: parent_span) + + span = Membrane.OpenTelemetry.get_span(span_id) - Membrane.OpenTelemetry.get_span(span_id) - |> ETSWrapper.store_span_and_pipeline(pipeline) + [pipeline_name | _tail] = my_path = ComponentPath.get() - ETSWrapper.store_as_parent_within_pipeline(pipeline) - set_span_attributes(component_state) + ETSWrapper.store_span(my_path, span) + ETSWrapper.store_as_parent_within_pipeline(my_path, [pipeline_name]) + set_span_attributes(metadata) end end - defp do_start_span(:element, component_state) do - with {:ok, parent_span_ctx, _pipeline} <- - ETSWrapper.get_span_and_pipeline(component_state.parent_pid) do - span_id = get_span_id(:element, component_state) + defp do_start_span(%{component_type: :element} = metadata) do + with {:ok, parent_span} <- + get_parent_component_path() |> ETSWrapper.get_span() do + + span_id = get_span_id(metadata) Process.put(@pdict_span_id_key, span_id) Membrane.OpenTelemetry.start_span(span_id, parent_span: parent_span_ctx) - set_span_attributes(component_state) + set_span_attributes(metadata) end end @@ -129,12 +132,13 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do :ok end - @spec pipeline_monitor(pid()) :: :ok - def pipeline_monitor(pipeline) do - ref = Process.monitor(pipeline) + @spec pipeline_monitor(pid(), ComponentPath.t()) :: :ok + def pipeline_monitor(pipeline_pid, pipeline_path) do + ref = Process.monitor(pipeline_pid) receive do - {:DOWN, ^ref, _process, _pid, _reason} -> cleanup_pipeline(pipeline) + {:DOWN, ^ref, _process, _pid, _reason} -> + cleanup_pipeline(pipeline_path) end :ok @@ -143,21 +147,22 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do defp cleanup_pipeline(pipeline) do ETSWrapper.get_parents_within_pipeline(pipeline) |> Enum.each(fn parent -> - {:ok, span_ctx, ^pipeline} = ETSWrapper.get_span_and_pipeline(parent) + {:ok, span_ctx, ^pipeline} = ETSWrapper.get_span(parent) ETSWrapper.delete_span_and_pipeline(parent, span_ctx, pipeline) ETSWrapper.delete_parent_within_pipeline(pipeline, parent) end) end - defp set_span_attributes(component_state) do + @spec set_span_attributes(Membrane.Telemetry.callback_span_metadata()) :: :ok + defp set_span_attributes(metadata) do with span_id when span_id != nil <- Process.get(@pdict_span_id_key) do - type = component_state.module.membrane_component_type() |> inspect() + type = metadata |> get_type() |> inspect() Membrane.OpenTelemetry.set_attribute(span_id, :component_type, type) - name = get_pretty_name(component_state) + name = get_pretty_name(metadata) Membrane.OpenTelemetry.set_attribute(span_id, :component_name, name) - module = component_state.module |> inspect() + module = metadata.callback_context.module |> inspect() Membrane.OpenTelemetry.set_attribute(span_id, :component_module, module) end @@ -177,29 +182,35 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do |> Enum.map(fn {key, value} -> {key, inspect(value)} end) end - defp get_span_id(:pipeline, component_state) do - "membrane_pipeline_launch_#{inspect(component_state.module)}" + defp get_span_id(%{component_type: :pipeline} = metadata) do + "membrane_pipeline_launch_#{inspect(metadata.callback_context.module)}" end - defp get_span_id(_bin_or_element, component_state) do - "membrane_#{get_type(component_state)}_launch_#{get_pretty_name(component_state)}" + defp get_span_id(metadata) do + "membrane_#{get_type(metadata)}_launch_#{get_pretty_name(metadata)}" end - defp get_pretty_name(component_state) do - type = get_type(component_state) + defp get_pretty_name(metadata) do + type = get_type(metadata) - case component_state do + case metadata.callback_context do %{name: name} when is_binary(name) -> name %{name: name} when name != nil -> inspect(name) %{} -> "#{Atom.to_string(type) |> String.capitalize()} #{self() |> inspect()}" end end - defp get_type(component_state) do - case component_state.module.membrane_component_type() do - :element -> component_state.module.membrane_element_type() + defp get_type(%{component_type: component_type} = metadata) do + case component_type do + :element -> metadata.callback_context.module.membrane_element_type() :bin -> :bin :pipeline -> :pipeline end end + + def get_parent_component_path() do + my_path = ComponentPath.get() + {_my_name, parent_path} = List.pop_at(my_path, length(my_path) - 1) + parent_path + end end diff --git a/mix.exs b/mix.exs index 017b1bd..357ef83 100644 --- a/mix.exs +++ b/mix.exs @@ -38,8 +38,10 @@ defmodule Membrane.Template.Mixfile do defp deps do [ - {:membrane_core, - github: "membraneframework/membrane_core", branch: "additional-telemetry-events"}, + # {:membrane_core, + # github: "membraneframework/membrane_core", branch: "additional-telemetry-events"}, + # {:membrane_core, "~> 1.2"}, + {:membrane_core, path: "../membrane_core"}, {:membrane_opentelemetry, "~> 0.1.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, From 7f385ff658cf195d2c2a229b7670b7eda04de391 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 11 Mar 2025 16:48:08 +0100 Subject: [PATCH 2/4] Rewrite on new core WiP --- lib/application.ex | 2 +- lib/lanuch.ex | 62 ++++++++++++++++++++------------- lib/launch/ets_wrapper.ex | 28 ++++++++------- lib/launch/handler_functions.ex | 40 ++++++++++++--------- mix.exs | 3 +- 5 files changed, 80 insertions(+), 55 deletions(-) diff --git a/lib/application.ex b/lib/application.ex index 161f4af..23d01aa 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -6,7 +6,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Application do @defined_plugs [Plugs.Launch] @plugs_in_config Application.compile_env(:membrane_opentelemetry_plugs, :plugs, []) - @enabled_plugs @defined_plugs |> Enum.filter(& &1 in @plugs_in_config) + @enabled_plugs @defined_plugs |> Enum.filter(&(&1 in @plugs_in_config)) Enum.each(@plugs_in_config, fn plug -> if plug not in @defined_plugs do diff --git a/lib/lanuch.ex b/lib/lanuch.ex index df4dd7f..9827141 100644 --- a/lib/lanuch.ex +++ b/lib/lanuch.ex @@ -34,47 +34,59 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch do defp attach_telemetry_handlers() do :ok = - :telemetry.attach( - {__MODULE__, :start_span}, - [:membrane, :handle_init, :start], - &HandlerFunctions.start_span/4, - nil + attach( + :start_span, + :handle_init, + :start, + &HandlerFunctions.start_span/4 ) @all_callbacks |> Enum.each(fn callback -> :ok = - :telemetry.attach( - {__MODULE__, callback, :start}, - [:membrane, callback, :start], - &HandlerFunctions.callback_start/4, - nil + attach( + {callback, :start}, + callback, + :start, + &HandlerFunctions.callback_start/4 ) :ok = - :telemetry.attach( - {__MODULE__, callback, :stop}, - [:membrane, callback, :stop], - &HandlerFunctions.callback_stop/4, - nil + attach( + {callback, :stop}, + callback, + :stop, + &HandlerFunctions.callback_stop/4 ) end) :ok = - :telemetry.attach( - {__MODULE__, :maybe_end_span_on_start_of_stream}, - [:membrane, :handle_start_of_stream, :stop], - &HandlerFunctions.ensure_span_ended/4, - nil + attach( + :maybe_end_span_on_start_of_stream, + :handle_start_of_stream, + :stop, + &HandlerFunctions.ensure_span_ended/4 ) :ok = + attach( + :maybe_end_span_on_playing, + :handle_playing, + :stop, + &HandlerFunctions.maybe_end_span/4 + ) + end + + defp attach(id, callback, start_or_stop, handler) do + [:pipeline, :bin, :element] + |> Enum.each(fn component_type -> :telemetry.attach( - {__MODULE__, :maybe_end_span_on_playing}, - [:membrane, :handle_playing, :stop], - &HandlerFunctions.maybe_end_span/4, + {__MODULE__, component_type, id}, + [:membrane, component_type, callback, start_or_stop], + handler, nil ) + end) end defp detach_telemetry_handlers() do @@ -87,8 +99,8 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch do end) |> Enum.concat([ {__MODULE__, :start_span}, - {__MODULE__, :end_element_span}, - {__MODULE__, :end_parent_span} + {__MODULE__, :maybe_end_span_on_start_of_stream}, + {__MODULE__, :maybe_end_span_on_playing} ]) |> Enum.each(fn handler_id -> :ok = :telemetry.detach(handler_id) diff --git a/lib/launch/ets_wrapper.ex b/lib/launch/ets_wrapper.ex index ac7e31a..b0046bf 100644 --- a/lib/launch/ets_wrapper.ex +++ b/lib/launch/ets_wrapper.ex @@ -1,7 +1,5 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do alias Membrane.ComponentPath - @pid_to_span_and_pipeline_ets :__membrane_opentelemetry_plugs_launch_pid_to_span_and_pipeline__ - @pipeline_pid_to_parents_ets :__membrane_opentelemetry_plugs_launch_pipeline_pid_to_parents__ @component_path_to_span_ets :__membrane_opentelemetry_plugs_component_path_to_span__ @pipeline_to_parents_ets :__membrane_opentelemetry_plugs_launch_pipeline_to_parents__ @@ -34,35 +32,41 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do :ok end - @spec get_span_and_pipeline(pid()) :: {:ok, OpenTelemetry.span_ctx(), pid()} | :error - def get_span_and_pipeline(pid) do - case :ets.lookup(@pid_to_span_and_pipeline_ets, pid) do - [{^pid, {span_ctx, pipeline}}] -> {:ok, span_ctx, pipeline} + @spec get_span(ComponentPath.t()) :: {:ok, OpenTelemetry.span_ctx()} | :error + def get_span(component_path) do + case :ets.lookup(@component_path_to_span_ets, component_path) do + [{^component_path, span}] -> {:ok, span} [] -> :error end end @spec store_span(ComponentPath.t(), OpenTelemetry.span_ctx()) :: :ok - def store_span(component_path, span_ctx) do - :ets.insert(@pid_to_span_and_pipeline_ets, {component_path, span_ctx}) + def store_span(component_path, span) do + :ets.insert(@component_path_to_span_ets, {component_path, span}) :ok end - def delete_span_and_pipeline(component_path, span_ctx) do - :ets.delete(@pid_to_span_and_pipeline_ets, {component_path, span_ctx}) + @spec delete_span_and_pipeline(ComponentPath.t(), OpenTelemetry.span_ctx()) :: :ok + def delete_span_and_pipeline(component_path, span) do + :ets.delete(@component_path_to_span_ets, {component_path, span}) + :ok end + @spec store_as_parent_within_pipeline(ComponentPath.t(), ComponentPath.t()) :: :ok def store_as_parent_within_pipeline(my_component_path, pipeline_path) do :ets.insert(@pipeline_to_parents_ets, {pipeline, my_component_path}) :ok end + @spec get_parents_within_pipeline(ComponentPath.t()) :: [ComponentPath.t()] def get_parents_within_pipeline(pipeline_path) do - :ets.lookup(@pipeline_pid_to_parents_ets, pipeline_path) + :ets.lookup(@pipeline_to_parents_ets, pipeline_path) |> Enum.map(fn {^pipeline_path, component_path} -> component_path end) end + @spec delete_parent_within_pipeline(ComponentPath.t(), ComponentPath.t()) :: :ok def delete_parent_within_pipeline(pipeline_path, parent_path) do - :ets.delete(@pipeline_pid_to_parents_ets, {pipeline_path, parent_path}) + :ets.delete(@pipeline_to_parents_ets, {pipeline_path, parent_path}) + :ok end end diff --git a/lib/launch/handler_functions.ex b/lib/launch/handler_functions.ex index ee5b410..408ff56 100644 --- a/lib/launch/handler_functions.ex +++ b/lib/launch/handler_functions.ex @@ -16,7 +16,12 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do @spec tracked_pipelines() :: atom() | [module()] def tracked_pipelines(), do: @tracked_pipelines - @spec start_span(:telemetry.event_name(), measurements :: map(), metadata :: Membrane.Telemetry.callback_span_metadata(), config :: any()) :: :ok + @spec start_span( + :telemetry.event_name(), + measurements :: map(), + metadata :: Membrane.Telemetry.callback_span_metadata(), + config :: any() + ) :: :ok def start_span(_name, _measurements, metadata, _config) do do_start_span(metadata) :ok @@ -49,7 +54,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do defp do_start_span(%{component_type: :bin} = metadata) do with {:ok, parent_span} <- - get_parent_component_path() |> ETSWrapper.get_span() do + get_parent_component_path() |> ETSWrapper.get_span() do span_id = get_span_id(metadata) Process.put(@pdict_span_id_key, span_id) @@ -67,12 +72,11 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do defp do_start_span(%{component_type: :element} = metadata) do with {:ok, parent_span} <- - get_parent_component_path() |> ETSWrapper.get_span() do - + get_parent_component_path() |> ETSWrapper.get_span() do span_id = get_span_id(metadata) Process.put(@pdict_span_id_key, span_id) - Membrane.OpenTelemetry.start_span(span_id, parent_span: parent_span_ctx) + Membrane.OpenTelemetry.start_span(span_id, parent_span: parent_span) set_span_attributes(metadata) end end @@ -90,16 +94,18 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do @spec maybe_end_span(:telemetry.event_name(), map(), map(), any()) :: :ok def maybe_end_span([:membrane, :handle_playing, :stop], _mesaurements, metadata, _config) do - component_state = metadata.component_state - - if get_type(component_state) in [:source, :bin, :pipeline] or - Enum.all?(component_state.pads_data, fn {_pad, data} -> data.direction == :output end) do + if get_type(metadata) in [:source, :bin, :pipeline] or only_output_pads?(metadata) do do_ensure_span_ended() end :ok end + defp only_output_pads?(metadata) do + metadata.callback_context.pads + |> Enum.all?(fn {_pad, data} -> data.direction == :output end) + end + defp do_ensure_span_ended() do with span_id when span_id != nil <- Process.delete(@pdict_span_id_key) do Membrane.OpenTelemetry.end_span(span_id) @@ -138,19 +144,21 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do receive do {:DOWN, ^ref, _process, _pid, _reason} -> - cleanup_pipeline(pipeline_path) + :ok = cleanup_pipeline(pipeline_path) end :ok end - defp cleanup_pipeline(pipeline) do - ETSWrapper.get_parents_within_pipeline(pipeline) - |> Enum.each(fn parent -> - {:ok, span_ctx, ^pipeline} = ETSWrapper.get_span(parent) - ETSWrapper.delete_span_and_pipeline(parent, span_ctx, pipeline) - ETSWrapper.delete_parent_within_pipeline(pipeline, parent) + defp cleanup_pipeline(pipeline_path) do + ETSWrapper.get_parents_within_pipeline(pipeline_path) + |> Enum.each(fn parent_path -> + {:ok, span} = ETSWrapper.get_span(parent_path) + ETSWrapper.delete_span_and_pipeline(parent_path, span) + ETSWrapper.delete_parent_within_pipeline(pipeline_path, parent_path) end) + + :ok end @spec set_span_attributes(Membrane.Telemetry.callback_span_metadata()) :: :ok diff --git a/mix.exs b/mix.exs index 357ef83..06c8c50 100644 --- a/mix.exs +++ b/mix.exs @@ -41,7 +41,8 @@ defmodule Membrane.Template.Mixfile do # {:membrane_core, # github: "membraneframework/membrane_core", branch: "additional-telemetry-events"}, # {:membrane_core, "~> 1.2"}, - {:membrane_core, path: "../membrane_core"}, + {:membrane_core, + github: "membraneframework/membrane_core", branch: "add-pipeline-name-to-callback-context"}, {:membrane_opentelemetry, "~> 0.1.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, From 22378a2bc1f62f6637f31cfd9afba90423302fbe Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 11 Mar 2025 17:07:43 +0100 Subject: [PATCH 3/4] Fix bug --- lib/launch/ets_wrapper.ex | 2 +- lib/launch/handler_functions.ex | 2 +- mix.lock | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/launch/ets_wrapper.ex b/lib/launch/ets_wrapper.ex index b0046bf..f650c3e 100644 --- a/lib/launch/ets_wrapper.ex +++ b/lib/launch/ets_wrapper.ex @@ -54,7 +54,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.ETSWrapper do @spec store_as_parent_within_pipeline(ComponentPath.t(), ComponentPath.t()) :: :ok def store_as_parent_within_pipeline(my_component_path, pipeline_path) do - :ets.insert(@pipeline_to_parents_ets, {pipeline, my_component_path}) + :ets.insert(@pipeline_to_parents_ets, {pipeline_path, my_component_path}) :ok end diff --git a/lib/launch/handler_functions.ex b/lib/launch/handler_functions.ex index 408ff56..62be1d3 100644 --- a/lib/launch/handler_functions.ex +++ b/lib/launch/handler_functions.ex @@ -45,7 +45,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do span = Membrane.OpenTelemetry.get_span(span_id) ETSWrapper.store_span(pipeline_path, span) - ETSWrapper.metadata(pipeline_path, pipeline_path) + ETSWrapper.store_as_parent_within_pipeline(pipeline_path, pipeline_path) set_span_attributes(metadata) Task.start(__MODULE__, :pipeline_monitor, [self(), pipeline_path]) diff --git a/mix.lock b/mix.lock index ce8fa88..62efd99 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, - "membrane_core": {:git, "https://github.com/membraneframework/membrane_core.git", "72e599aaaef2b1f5a7568c0bd52be7f6f8d0c352", [branch: "additional-telemetry-events"]}, + "membrane_core": {:git, "https://github.com/membraneframework/membrane_core.git", "de6318f24ac8de2c9a2ec1b7227a347c4e0cf512", [branch: "add-pipeline-name-to-callback-context"]}, "membrane_opentelemetry": {:hex, :membrane_opentelemetry, "0.1.0", "af774bc5b9bad3a822e9a26d8530819b0291b569a282c65a7dd51cc498e6e9cd", [:mix], [{:opentelemetry_api, "~> 1.0.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "2e4072a5f14eb95514701e242a7667a7178dfc6afd313d4643ec0726391c243b"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, From f3454cf77568b62828b61280698d0b583550b0cc Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 12 Mar 2025 14:57:19 +0100 Subject: [PATCH 4/4] Add init_to_playing span --- lib/launch/handler_functions.ex | 76 +++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/lib/launch/handler_functions.ex b/lib/launch/handler_functions.ex index 62be1d3..b80e5bd 100644 --- a/lib/launch/handler_functions.ex +++ b/lib/launch/handler_functions.ex @@ -11,7 +11,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do :tracked_pipelines, :all ) - @pdict_span_id_key :__membrane_opentelemetry_launch_span_name__ + @pdict_launch_span_id_key :__membrane_opentelemetry_launch_span_name__ @spec tracked_pipelines() :: atom() | [module()] def tracked_pipelines(), do: @tracked_pipelines @@ -35,8 +35,8 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do tracked_pipelines = apply(__MODULE__, :tracked_pipelines, []) if tracked_pipelines == :all or metadata.callback_context.module in tracked_pipelines do - span_id = get_span_id(metadata) - Process.put(@pdict_span_id_key, span_id) + span_id = get_launch_span_id(metadata) + Process.put(@pdict_launch_span_id_key, span_id) Membrane.OpenTelemetry.start_span(span_id) @@ -46,7 +46,9 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do ETSWrapper.store_span(pipeline_path, span) ETSWrapper.store_as_parent_within_pipeline(pipeline_path, pipeline_path) - set_span_attributes(metadata) + set_launch_span_attributes(metadata) + + start_init_to_playing_span(metadata) Task.start(__MODULE__, :pipeline_monitor, [self(), pipeline_path]) end @@ -55,8 +57,8 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do defp do_start_span(%{component_type: :bin} = metadata) do with {:ok, parent_span} <- get_parent_component_path() |> ETSWrapper.get_span() do - span_id = get_span_id(metadata) - Process.put(@pdict_span_id_key, span_id) + span_id = get_launch_span_id(metadata) + Process.put(@pdict_launch_span_id_key, span_id) Membrane.OpenTelemetry.start_span(span_id, parent_span: parent_span) @@ -66,18 +68,22 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do ETSWrapper.store_span(my_path, span) ETSWrapper.store_as_parent_within_pipeline(my_path, [pipeline_name]) - set_span_attributes(metadata) + set_launch_span_attributes(metadata) + + start_init_to_playing_span(metadata) end end defp do_start_span(%{component_type: :element} = metadata) do with {:ok, parent_span} <- get_parent_component_path() |> ETSWrapper.get_span() do - span_id = get_span_id(metadata) - Process.put(@pdict_span_id_key, span_id) + span_id = get_launch_span_id(metadata) + Process.put(@pdict_launch_span_id_key, span_id) Membrane.OpenTelemetry.start_span(span_id, parent_span: parent_span) - set_span_attributes(metadata) + set_launch_span_attributes(metadata) + + start_init_to_playing_span(metadata) end end @@ -88,14 +94,16 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do _metadata, _config ) do - do_ensure_span_ended() + do_ensure_launch_span_ended() :ok end @spec maybe_end_span(:telemetry.event_name(), map(), map(), any()) :: :ok def maybe_end_span([:membrane, :handle_playing, :stop], _mesaurements, metadata, _config) do + end_init_to_playing_span(metadata) + if get_type(metadata) in [:source, :bin, :pipeline] or only_output_pads?(metadata) do - do_ensure_span_ended() + do_ensure_launch_span_ended() end :ok @@ -106,15 +114,15 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do |> Enum.all?(fn {_pad, data} -> data.direction == :output end) end - defp do_ensure_span_ended() do - with span_id when span_id != nil <- Process.delete(@pdict_span_id_key) do + defp do_ensure_launch_span_ended() do + with span_id when span_id != nil <- Process.delete(@pdict_launch_span_id_key) do Membrane.OpenTelemetry.end_span(span_id) end end @spec callback_start(:telemetry.event_name(), map(), map(), any()) :: :ok def callback_start([:membrane, callback, :start] = name, _measurements, metadata, _config) do - with span_id when span_id != nil <- Process.get(@pdict_span_id_key) do + with span_id when span_id != nil <- Process.get(@pdict_launch_span_id_key) do event_name = name |> Enum.map_join("_", &Atom.to_string/1) event_attributes = get_callback_attributes(callback, metadata.callback_args) Membrane.OpenTelemetry.add_event(span_id, event_name, event_attributes) @@ -130,7 +138,7 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do _metadata, _config ) do - with span_id when span_id != nil <- Process.get(@pdict_span_id_key) do + with span_id when span_id != nil <- Process.get(@pdict_launch_span_id_key) do event_name = name |> Enum.map_join("_", &Atom.to_string/1) Membrane.OpenTelemetry.add_event(span_id, event_name, duration: duration) end @@ -161,9 +169,9 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do :ok end - @spec set_span_attributes(Membrane.Telemetry.callback_span_metadata()) :: :ok - defp set_span_attributes(metadata) do - with span_id when span_id != nil <- Process.get(@pdict_span_id_key) do + @spec set_launch_span_attributes(Membrane.Telemetry.callback_span_metadata()) :: :ok + defp set_launch_span_attributes(metadata) do + with span_id when span_id != nil <- Process.get(@pdict_launch_span_id_key) do type = metadata |> get_type() |> inspect() Membrane.OpenTelemetry.set_attribute(span_id, :component_type, type) @@ -190,12 +198,34 @@ defmodule Membrane.OpenTelemetry.Plugs.Launch.HandlerFunctions do |> Enum.map(fn {key, value} -> {key, inspect(value)} end) end - defp get_span_id(%{component_type: :pipeline} = metadata) do - "membrane_pipeline_launch_#{inspect(metadata.callback_context.module)}" + defp start_init_to_playing_span(metadata) do + launch_span = + get_launch_span_id(metadata) + |> Membrane.OpenTelemetry.get_span() + + get_init_to_playing_span_id(metadata) + |> Membrane.OpenTelemetry.start_span(launch_span) + + :ok + end + + defp end_init_to_playing_span(metadata) do + get_init_to_playing_span_id(metadata) + |> Membrane.OpenTelemetry.end_span() end - defp get_span_id(metadata) do - "membrane_#{get_type(metadata)}_launch_#{get_pretty_name(metadata)}" + defp get_launch_span_id(metadata), do: get_span_id("launch", metadata) + + defp get_init_to_playing_span_id(metadata), do: get_span_id("init_to_playing", metadata) + + defp get_span_id(span_type, metadata) do + pretty_name_or_module = + if metadata.component_type == :pipeline, + do: inspect(metadata.callback_context.module), + else: get_pretty_name(metadata) + + ["membrane", get_type(metadata), span_type, pretty_name_or_module] + |> Enum.join("_") end defp get_pretty_name(metadata) do