diff --git a/CHANGELOG.md b/CHANGELOG.md index 53a69d32f..27d720595 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904) * Implememnt diamonds detection. [#909](https://github.com/membraneframework/membrane_core/pull/909) * Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922) + * Added new, revised Telemetry system [#905](https://github.com/membraneframework/membrane_core/pull/918) ## 1.1.2 * Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894) diff --git a/config/config.exs b/config/config.exs index 6e62d2427..c73f8283a 100644 --- a/config/config.exs +++ b/config/config.exs @@ -2,4 +2,18 @@ import Config if config_env() == :test do config :junit_formatter, include_filename?: true + + config :membrane_core, :telemetry_flags, + tracked_callbacks: [ + element: [ + :handle_init, + :handle_playing, + :handle_setup, + :handle_terminate_request, + :handle_parent_notification + ], + bin: :all, + pipeline: :all + ], + datapoints: :all end diff --git a/lib/membrane/bin/callback_context.ex b/lib/membrane/bin/callback_context.ex index c6aa09527..aaebc1bc4 100644 --- a/lib/membrane/bin/callback_context.ex +++ b/lib/membrane/bin/callback_context.ex @@ -16,13 +16,15 @@ defmodule Membrane.Bin.CallbackContext do `c:Membrane.Bin.handle_crash_group_down/3`. """ @type t :: %{ + :children => %{Membrane.Child.name() => Membrane.ChildEntry.t()}, :clock => Membrane.Clock.t(), :parent_clock => Membrane.Clock.t(), - :pads => %{Membrane.Pad.ref() => Membrane.Bin.PadData.t()}, + :module => module(), :name => Membrane.Bin.name(), - :children => %{Membrane.Child.name() => Membrane.ChildEntry.t()}, + :pads => %{Membrane.Pad.ref() => Membrane.Bin.PadData.t()}, :playback => Membrane.Playback.t(), :resource_guard => Membrane.ResourceGuard.t(), + :setup_incomplete? => boolean(), :utility_supervisor => Membrane.UtilitySupervisor.t(), optional(:pad_options) => map(), optional(:members) => [Membrane.Child.name()], diff --git a/lib/membrane/component_path.ex b/lib/membrane/component_path.ex index 7ef0cfe1b..31eb8be83 100644 --- a/lib/membrane/component_path.ex +++ b/lib/membrane/component_path.ex @@ -8,6 +8,7 @@ defmodule Membrane.ComponentPath do @typedoc @moduledoc @type path :: list(String.t()) + @type formatted_path :: String.t() @key :membrane_path @@ -16,7 +17,7 @@ defmodule Membrane.ComponentPath do If path had already existed then replaces it. """ - @spec set(path) :: :ok + @spec set(path()) :: :ok def set(path) do Process.put(@key, path) :ok @@ -43,6 +44,6 @@ defmodule Membrane.ComponentPath do If path has not been set, empty list is returned. """ - @spec get() :: list(String.t()) + @spec get() :: path() def get(), do: Process.get(@key, []) end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 61b6862b0..8a21ff91b 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -13,7 +13,6 @@ defmodule Membrane.Core.Bin do Parent, ProcessHelper, SubprocessSupervisor, - Telemetry, TimerController } @@ -21,7 +20,7 @@ defmodule Membrane.Core.Bin do require Membrane.Core.Utils, as: Utils require Membrane.Core.Message - require Membrane.Core.Telemetry + require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry require Membrane.Logger @type options :: %{ @@ -127,8 +126,8 @@ defmodule Membrane.Core.Bin do {:ok, resource_guard} = SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()}) - Telemetry.report_init(:bin) - ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:bin) end) + LegacyTelemetry.report_init(:bin) + ResourceGuard.register(resource_guard, fn -> LegacyTelemetry.report_terminate(:bin) end) state = %State{ diff --git a/lib/membrane/core/bin/callback_context.ex b/lib/membrane/core/bin/callback_context.ex index 205e90c2a..bf5272e4a 100644 --- a/lib/membrane/core/bin/callback_context.ex +++ b/lib/membrane/core/bin/callback_context.ex @@ -15,13 +15,15 @@ defmodule Membrane.Core.Bin.CallbackContext do def from_state(state, optional_fields \\ []) do Map.new(optional_fields) |> Map.merge(%{ + children: state.children, clock: state.synchronization.clock, parent_clock: state.synchronization.parent_clock, pads: state.pads_data, + module: state.module, name: state.name, - children: state.children, playback: state.playback, resource_guard: state.resource_guard, + setup_incomplete?: state.setup_incomplete?, utility_supervisor: state.subprocess_supervisor }) end diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index 9355018a8..2a42eefd6 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -16,13 +16,13 @@ defmodule Membrane.Core.Bin.State do @type t :: %__MODULE__{ internal_state: Membrane.Bin.state() | nil, - module: module, + module: module(), children: ChildrenModel.children(), subprocess_supervisor: pid(), name: Membrane.Bin.name() | nil, pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, - parent_pid: pid, + parent_pid: pid(), links: %{Link.id() => Link.t()}, crash_groups: %{CrashGroup.name() => CrashGroup.t()}, synchronization: %{ diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 731f2777a..dfb565372 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -10,6 +10,7 @@ defmodule Membrane.Core.CallbackHandler do alias Membrane.CallbackError require Membrane.Logger + require Membrane.Core.Telemetry, as: Telemetry @type state :: %{ :module => module, @@ -125,45 +126,50 @@ defmodule Membrane.Core.CallbackHandler do end @spec exec_callback(callback :: atom, args :: list, handler_params, state) :: - {list, internal_state} + {list, internal_state} | no_return() defp exec_callback( callback, args, %{context: context_fun}, %{module: module, internal_state: internal_state} = state ) do - args = args ++ [context_fun.(state), internal_state] + context = context_fun.(state) + args = args ++ [context, internal_state] - callback_result = - try do + try do + fn -> apply(module, callback, args) - rescue - e in UndefinedFunctionError -> - _ignored = - with %{module: ^module, function: ^callback, arity: arity} <- e do - reraise CallbackError, - [ - kind: :not_implemented, - callback: {module, callback}, - arity: arity, - args: args - ], - __STACKTRACE__ - end - - reraise e, __STACKTRACE__ + |> validate_callback_result!(module, callback) end + |> Telemetry.track_callback_handler(callback, args, state, context) + rescue + e in UndefinedFunctionError -> + _ignored = + with %{module: ^module, function: ^callback, arity: arity} <- e do + reraise CallbackError, + [ + kind: :not_implemented, + callback: {module, callback}, + arity: arity, + args: args + ], + __STACKTRACE__ + end + + reraise e, __STACKTRACE__ + end + end - case callback_result do - {actions, _state} when is_list(actions) -> - callback_result + defp validate_callback_result!({actions, _state} = callback_result, _module, _cb) + when is_list(actions) do + callback_result + end - _result -> - raise CallbackError, - kind: :bad_return, - callback: {module, callback}, - value: callback_result - end + defp validate_callback_result!(callback_result, module, callback) do + raise CallbackError, + kind: :bad_return, + callback: {module, callback}, + value: callback_result end @spec handle_callback_result( diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 0c639dacc..3631119e9 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -40,15 +40,16 @@ defmodule Membrane.Core.Element do require Membrane.Core.Message, as: Message require Membrane.Core.Stalker, as: Stalker require Membrane.Core.Telemetry, as: Telemetry + require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry require Membrane.Logger @type options :: %{ - module: module, + module: module(), name: Membrane.Element.name(), - node: node | nil, + node: node() | nil, user_options: Membrane.Element.options(), sync: Sync.t(), - parent: pid, + parent: pid(), parent_clock: Clock.t(), parent_path: Membrane.ComponentPath.path(), log_metadata: Logger.metadata(), @@ -118,9 +119,8 @@ defmodule Membrane.Core.Element do {:ok, resource_guard} = SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()}) - Telemetry.report_init(:element) - - ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:element) end) + LegacyTelemetry.report_init(:element) + ResourceGuard.register(resource_guard, fn -> LegacyTelemetry.report_terminate(:element) end) self_pid = self() @@ -206,10 +206,7 @@ defmodule Membrane.Core.Element do @impl GenServer def handle_info(message, state) do Utils.log_on_error do - Telemetry.report_metric( - :queue_len, - :erlang.process_info(self(), :message_queue_len) |> elem(1) - ) + Telemetry.report_queue_len(self()) do_handle_info(message, state) end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 4a4ec1aed..e4eb82b7f 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -8,8 +8,6 @@ defmodule Membrane.Core.Element.ActionHandler do import Membrane.Pad, only: [is_pad_ref: 1] - alias Membrane.Telemetry - alias Membrane.{ ActionError, Buffer, @@ -34,8 +32,9 @@ defmodule Membrane.Core.Element.ActionHandler do require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Message, as: Message - require Membrane.Core.Telemetry, as: Telemetry require Membrane.Logger + require Membrane.Core.Telemetry, as: Telemetry + require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry @impl CallbackHandler def transform_actions(actions, _callback, _handler_params, state) do @@ -319,8 +318,8 @@ defmodule Membrane.Core.Element.ActionHandler do "Sending #{length(buffers)} buffer(s) through pad #{inspect(pad_ref)}" ) - Telemetry.report_metric(:buffer, length(buffers)) - Telemetry.report_bitrate(buffers) + LegacyTelemetry.report_bitrate(buffers) + Telemetry.report_buffer(buffers) Enum.each(buffers, fn %Buffer{} -> :ok diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index fff2946b9..4a53831f6 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -6,7 +6,7 @@ defmodule Membrane.Core.Element.BufferController do use Bunch alias Membrane.{Buffer, Pad} - alias Membrane.Core.{CallbackHandler, Telemetry} + alias Membrane.Core.CallbackHandler alias Membrane.Core.Child.PadModel alias Membrane.Core.Element.{ @@ -24,7 +24,8 @@ defmodule Membrane.Core.Element.BufferController do alias Membrane.Core.Telemetry require Membrane.Core.Child.PadModel - require Membrane.Core.Telemetry + require Membrane.Core.Telemetry, as: Telemetry + require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry @doc """ Handles incoming buffer: either stores it in InputQueue, or executes element's @@ -105,16 +106,14 @@ defmodule Membrane.Core.Element.BufferController do """ @spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t() def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do - Telemetry.report_metric(:buffer, 1, inspect(pad_ref)) + LegacyTelemetry.report_bitrate(buffers) + Telemetry.report_buffer(buffers) do_exec_buffer_callback(pad_ref, buffers, state) end def exec_buffer_callback(pad_ref, buffers, %State{type: type} = state) when type in [:sink, :endpoint] do - Telemetry.report_metric(:buffer, length(List.wrap(buffers))) - Telemetry.report_bitrate(buffers) - do_exec_buffer_callback(pad_ref, buffers, state) end diff --git a/lib/membrane/core/element/callback_context.ex b/lib/membrane/core/element/callback_context.ex index 71cee7c9b..19daa0dbf 100644 --- a/lib/membrane/core/element/callback_context.ex +++ b/lib/membrane/core/element/callback_context.ex @@ -15,9 +15,11 @@ defmodule Membrane.Core.Element.CallbackContext do pads: state.pads_data, clock: state.synchronization.clock, parent_clock: state.synchronization.parent_clock, + module: state.module, name: state.name, playback: state.playback, resource_guard: state.resource_guard, + setup_incomplete?: state.setup_incomplete?, utility_supervisor: state.subprocess_supervisor }) end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index f97169308..c3e3a76c4 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -39,7 +39,7 @@ defmodule Membrane.Core.Element.EventController do def handle_event(pad_ref, event, state) do withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), playback: %State{playback: :playing} <- state do - Telemetry.report_metric(:event, 1, inspect(pad_ref)) + Telemetry.report_incoming_event(%{pad_ref: inspect(pad_ref)}) async? = Event.async?(event) diff --git a/lib/membrane/core/element/manual_flow_controller/input_queue.ex b/lib/membrane/core/element/manual_flow_controller/input_queue.ex index 8eb1d4ed4..e968ba1b0 100644 --- a/lib/membrane/core/element/manual_flow_controller/input_queue.ex +++ b/lib/membrane/core/element/manual_flow_controller/input_queue.ex @@ -10,12 +10,12 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do alias Membrane.Buffer alias Membrane.Core.Element.AtomicDemand + alias Membrane.Core.Telemetry alias Membrane.Event alias Membrane.Pad alias Membrane.StreamFormat require Membrane.Core.Stalker, as: Stalker - require Membrane.Core.Telemetry, as: Telemetry require Membrane.Logger @qe Qex @@ -124,7 +124,6 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do %__MODULE__{size: size} = input_queue = do_store_buffers(input_queue, v) - Telemetry.report_metric(:store, size, input_queue.log_tag) :atomics.put(stalker_metrics.size, 1, size) input_queue @@ -135,8 +134,7 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do def store(%__MODULE__{q: q, size: size} = input_queue, type, v) when type in @non_buf_types do "Storing #{type}" |> mk_log(input_queue) |> Membrane.Logger.debug_verbose() - - Telemetry.report_metric(:store, size, input_queue.log_tag) + Telemetry.report_store(size, input_queue.log_tag) %__MODULE__{input_queue | q: q |> @qe.push({:non_buffer, type, v})} end @@ -176,7 +174,7 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do input_queue = maybe_increase_atomic_demand(input_queue) %{size: size, stalker_metrics: stalker_metrics} = input_queue - Telemetry.report_metric(:take, size, input_queue.log_tag) + Telemetry.report_take(size, input_queue.log_tag) :atomics.put(stalker_metrics.size, 1, size) {out, input_queue} diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 1c4d2d898..0e03e4038 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -13,16 +13,14 @@ defmodule Membrane.Core.Element.State do alias Membrane.Core.Element.EffectiveFlowController alias Membrane.Core.Timer - require Membrane.Pad - @type t :: %__MODULE__{ - module: module, + module: module(), type: Element.type(), name: Element.name(), internal_state: Element.state() | nil, pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, - parent_pid: pid, + parent_pid: pid(), delay_demands?: boolean(), delayed_demands: MapSet.t({Pad.ref(), :supply | :redemand}), handle_demand_loop_counter: non_neg_integer(), @@ -38,7 +36,7 @@ defmodule Membrane.Core.Element.State do playback: Membrane.Playback.t(), playback_queue: Membrane.Core.Element.PlaybackQueue.t(), resource_guard: Membrane.ResourceGuard.t(), - subprocess_supervisor: pid, + subprocess_supervisor: pid(), terminating?: boolean(), setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index 4913a248e..3b9bfd238 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.StreamFormatController do withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), playback: %State{playback: :playing} <- state do %{direction: :input} = data - Telemetry.report_metric(:stream_format, 1, inspect(pad_ref)) + Telemetry.report_stream_format(stream_format, inspect(pad_ref)) queue = data.input_queue diff --git a/lib/membrane/core/filter_aggregator/context.ex b/lib/membrane/core/filter_aggregator/context.ex index e1e42463f..2ddf6b8b4 100644 --- a/lib/membrane/core/filter_aggregator/context.ex +++ b/lib/membrane/core/filter_aggregator/context.ex @@ -37,12 +37,14 @@ defmodule Membrane.Core.FilterAggregator.Context do |> Map.new(fn {name, description} -> {name, build_pad_data(description)} end) %{ + module: module, pads: pads_data, clock: nil, name: name, parent_clock: nil, playback: :stopped, resource_guard: agg_ctx.resource_guard, + setup_incomplete?: true, utility_supervisor: agg_ctx.utility_supervisor } end diff --git a/lib/membrane/core/legacy_telemetry.ex b/lib/membrane/core/legacy_telemetry.ex new file mode 100644 index 000000000..6d7145a52 --- /dev/null +++ b/lib/membrane/core/legacy_telemetry.ex @@ -0,0 +1,191 @@ +defmodule Membrane.Core.LegacyTelemetry do + @moduledoc false + + # This module is deprecated and will be removed in the next major release (2.0) + # All contents below starting from the next comment are from Legacy Telemetry system + # to retain compatibility with the old telemetry system in case of outdated configuration. + + # This module provides a way to gather events from running Membrane components, as well + # as exposing these events in a format idiomatic to [Telemetry](https://hexdocs.pm/telemetry/) + # library. It uses compile time flags from `config.exs` to determine which events should be + # collected and propagated. This avoids unnecessary runtime overhead when telemetry is not needed. + + alias Membrane.ComponentPath + + require Membrane.Pad + + @telemetry_flags Application.compile_env(:membrane_core, :telemetry_flags, []) + + @doc """ + Reports metrics such as input buffer's size inside functions, incoming events and received stream format. + """ + defmacro report_metric(metric, value, log_tag \\ nil) do + event = + quote do + [:membrane, :metric, :value] + end + + value = + quote do + %{ + component_path: ComponentPath.get_formatted() <> "/" <> (unquote(log_tag) || ""), + metric: Atom.to_string(unquote(metric)), + value: unquote(value) + } + end + + report_event( + event, + value, + Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == metric)) != nil + ) + end + + @doc """ + Given list of buffers (or a single buffer) calculates total size of their payloads in bits + and reports it. + """ + defmacro report_bitrate(buffers) do + event = + quote do + [:membrane, :metric, :value] + end + + value = + quote do + %{ + component_path: ComponentPath.get_formatted() <> "/", + metric: "bitrate", + value: + 8 * + Enum.reduce( + List.wrap(unquote(buffers)), + 0, + &(unquote(__MODULE__).get_payload_size(&1.payload) + &2) + ) + } + end + + report_event( + event, + value, + Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == :bitrate)) != nil + ) + end + + @spec get_payload_size(any()) :: non_neg_integer() + def get_payload_size(payload) when is_bitstring(payload) do + Membrane.Payload.size(payload) + end + + def get_payload_size(_payload), do: 0 + + @doc false + @spec __get_public_pad_name__(Membrane.Pad.ref()) :: Membrane.Pad.ref() + def __get_public_pad_name__(pad) do + case pad do + {:private, direction} -> direction + {Membrane.Pad, {:private, direction}, ref} -> {Membrane.Pad, direction, ref} + _pad -> pad + end + end + + @doc """ + Reports new link connection being initialized in pipeline. + """ + defmacro report_link(from, to) do + event = + quote do + [:membrane, :link, :new] + end + + value = + quote do + %{ + parent_path: Membrane.ComponentPath.get_formatted(), + from: inspect(unquote(from).child), + to: inspect(unquote(to).child), + pad_from: + unquote(__MODULE__).__get_public_pad_name__(unquote(from).pad_ref) + |> inspect(), + pad_to: + unquote(__MODULE__).__get_public_pad_name__(unquote(to).pad_ref) + |> inspect() + } + end + + report_event(event, value, Enum.find(@telemetry_flags, &(&1 == :links)) != nil) + end + + @doc """ + Reports a pipeline/bin/element initialization. + """ + defmacro report_init(type) when type in [:pipeline, :bin, :element] do + event = + quote do + [:membrane, unquote(type), :init] + end + + value = + quote do + %{path: ComponentPath.get_formatted()} + end + + metadata = + quote do + %{log_metadata: Logger.metadata()} + end + + report_event( + event, + value, + Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil, + metadata + ) + end + + @doc """ + Reports a pipeline/bin/element termination. + """ + defmacro report_terminate(type) when type in [:pipeline, :bin, :element] do + event = + quote do + [:membrane, unquote(type), :terminate] + end + + value = + quote do + %{path: ComponentPath.get_formatted()} + end + + report_event( + event, + value, + Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil + ) + end + + # Conditional event reporting of telemetry events + defp report_event(event_name, measurement, enable, metadata \\ nil) do + if enable do + quote do + :telemetry.execute( + unquote(event_name), + unquote(measurement), + unquote(metadata) || %{} + ) + end + else + # A hack to suppress the 'unused variable' warnings + quote do + _fn = fn -> + _unused = unquote(event_name) + _unused = unquote(measurement) + _unused = unquote(metadata) + end + + :ok + end + end + end +end diff --git a/lib/membrane/core/parent/child_life_controller/link_utils.ex b/lib/membrane/core/parent/child_life_controller/link_utils.ex index a276d721f..5d56be0b4 100644 --- a/lib/membrane/core/parent/child_life_controller/link_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/link_utils.ex @@ -4,8 +4,9 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do use Bunch alias Membrane.Child - alias Membrane.Core.{Bin, Message, Parent, Telemetry} + alias Membrane.Core.{Bin, Message, Parent} alias Membrane.Core.Bin.PadController + alias Membrane.Core.Telemetry alias Membrane.Core.Parent.{ ChildLifeController, diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 6f9fc95db..a20a5bfe1 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -11,8 +11,8 @@ defmodule Membrane.Core.Pipeline do require Membrane.Core.Utils, as: Utils require Membrane.Core.Message, as: Message - require Membrane.Core.Telemetry, as: Telemetry require Membrane.Core.Component + require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry @spec get_stalker(pipeline :: pid()) :: Membrane.Core.Stalker.t() def get_stalker(pipeline) do @@ -43,10 +43,10 @@ defmodule Membrane.Core.Pipeline do {:ok, resource_guard} = SubprocessSupervisor.start_utility(subprocess_supervisor, {ResourceGuard, self()}) - Telemetry.report_init(:pipeline) + LegacyTelemetry.report_init(:pipeline) ResourceGuard.register(resource_guard, fn -> - Telemetry.report_terminate(:pipeline) + LegacyTelemetry.report_terminate(:pipeline) end) {:ok, clock_proxy} = diff --git a/lib/membrane/core/pipeline/callback_context.ex b/lib/membrane/core/pipeline/callback_context.ex index 0414147a6..f48fe59b4 100644 --- a/lib/membrane/core/pipeline/callback_context.ex +++ b/lib/membrane/core/pipeline/callback_context.ex @@ -17,8 +17,10 @@ defmodule Membrane.Core.Pipeline.CallbackContext do |> Map.merge(%{ clock: state.synchronization.clock_proxy, children: state.children, + module: state.module, playback: state.playback, resource_guard: state.resource_guard, + setup_incomplete?: state.setup_incomplete?, utility_supervisor: state.subprocess_supervisor }) end diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 37f24ea2d..cd9c2c9eb 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -13,7 +13,7 @@ defmodule Membrane.Core.Pipeline.State do alias Membrane.Core.Timer @type t :: %__MODULE__{ - module: module, + module: module(), playback: Membrane.Playback.t(), internal_state: Membrane.Pipeline.state() | nil, children: ChildrenModel.children(), diff --git a/lib/membrane/core/telemetry.ex b/lib/membrane/core/telemetry.ex index ff182ea7b..0b610dc8d 100644 --- a/lib/membrane/core/telemetry.ex +++ b/lib/membrane/core/telemetry.ex @@ -1,173 +1,263 @@ defmodule Membrane.Core.Telemetry do @moduledoc false + # This module provides a way to gather datapoints and spans from running Membrane components, as well + # as exposing these events in a format idiomatic to [Telemetry](https://hexdocs.pm/telemetry/) + # library. It uses compile time flags from `config.exs` to determine which events should be + # collected and propagated. This avoids unnecessary runtime overhead when telemetry is not needed. + + alias Membrane.Core.CallbackHandler + alias Membrane.Core.Parent.Link alias Membrane.ComponentPath + alias Membrane.Element + alias Membrane.Element.WithInputPads + alias Membrane.Element.WithOutputPads + alias Membrane.Pad + alias Membrane.Telemetry - require Membrane.Pad + require Membrane.Bin, as: Bin + require Membrane.Element.Base, as: ElementBase + require Membrane.Element.WithOutputPads + require Membrane.Element.WithInputPads + require Membrane.Pipeline, as: Pipeline + require Logger - @telemetry_flags Application.compile_env(:membrane_core, :telemetry_flags, []) + require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry - @doc """ - Reports metrics such as input buffer's size inside functions, incoming events and received stream format. - """ - defmacro report_metric(metric, value, log_tag \\ nil) do - event = - quote do - [:membrane, :metric, :value] - end + @component_modules [ + bin: [Bin], + pipeline: [Pipeline], + element: [ElementBase, WithInputPads, WithOutputPads] + ] - value = - quote do - %{ - component_path: ComponentPath.get_formatted() <> "/" <> (unquote(log_tag) || ""), - metric: Atom.to_string(unquote(metric)), - value: unquote(value) - } - end + @possible_callbacks (for {elem, mods} <- @component_modules do + mods + |> Enum.flat_map(& &1.behaviour_info(:callbacks)) + |> Enum.map(&elem(&1, 0)) + |> Enum.filter(&String.starts_with?(to_string(&1), "handle_")) + |> then(&{elem, &1}) + end) + + _callbacks = @possible_callbacks - report_event( - event, - value, - Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == metric)) != nil - ) + @config Application.compile_env(:membrane_core, :telemetry_flags, []) + @legacy? Enum.any?(@config, &is_atom(&1)) || Keyword.has_key?(@config, :metrics) + + if @legacy? do + Logger.warning(""" + Legacy telemetry is deprecated and will be removed in the next major release. + Please update your configuration to use the new telemetry system. + """) end - @doc """ - Given list of buffers (or a single buffer) calculates total size of their payloads in bits - and reports it. - """ - defmacro report_bitrate(buffers) do - event = - quote do - [:membrane, :metric, :value] - end + @spec legacy?() :: boolean() + def legacy?(), do: @legacy? - value = - quote do - %{ - component_path: ComponentPath.get_formatted() <> "/", - metric: "bitrate", - value: - 8 * - Enum.reduce( - List.wrap(unquote(buffers)), - 0, - &(Membrane.Payload.size(&1.payload) + &2) - ) - } - end + # Handles telemetry datapoints that were measured before but differ in how they were emitted + # It's public to avoid dialyzer warnings. Not intended for external use + @spec do_legacy_telemetry(atom(), Macro.t()) :: Macro.t() + def do_legacy_telemetry(:link, lazy_block) do + quote do + LegacyTelemetry.report_link(unquote(lazy_block)[:from], unquote(lazy_block)[:to]) + end + end - report_event( - event, - value, - Keyword.get(@telemetry_flags, :metrics, []) |> Enum.find(&(&1 == :bitrate)) != nil - ) + def do_legacy_telemetry(metric_name, lazy_block) do + LegacyTelemetry.report_metric(metric_name, lazy_block) end - @doc false - @spec __get_public_pad_name__(Membrane.Pad.ref()) :: Membrane.Pad.ref() - def __get_public_pad_name__(pad) do - case pad do - {:private, direction} -> direction - {Membrane.Pad, {:private, direction}, ref} -> {Membrane.Pad, direction, ref} - _pad -> pad + @spec handler_reported?(atom(), atom()) :: boolean() + for {component, callbacks} <- @config[:tracked_callbacks] || [] do + case callbacks do + :all -> + def handler_reported?(unquote(component), _callback), do: true + + nil -> + nil + + callbacks_list when is_list(callbacks_list) -> + for callback <- callbacks_list do + if callback not in @possible_callbacks[component] do + raise """ + Invalid telemetry flag: #{inspect(callback)}. + Possible values for #{component} are: #{inspect(@possible_callbacks[component])} + """ + end + + def handler_reported?(unquote(component), unquote(callback)), do: true + end end end - @doc """ - Reports new link connection being initialized in pipeline. - """ - defmacro report_link(from, to) do - event = - quote do - [:membrane, :link, :new] - end + def handler_reported?(_component, _callback), do: false - value = - quote do - %{ - parent_path: Membrane.ComponentPath.get_formatted(), - from: inspect(unquote(from).child), - to: inspect(unquote(to).child), - pad_from: - Membrane.Core.Telemetry.__get_public_pad_name__(unquote(from).pad_ref) |> inspect(), - pad_to: - Membrane.Core.Telemetry.__get_public_pad_name__(unquote(to).pad_ref) |> inspect() - } - end + @spec datapoint_gathered?(any()) :: false | nil | true + def datapoint_gathered?(datapoint) do + datapoints = @config[:datapoints] + datapoints && (datapoints == :all || datapoint in datapoints) + end - report_event(event, value, Enum.find(@telemetry_flags, &(&1 == :links)) != nil) + @spec tracked_callbacks_available() :: [ + pipeline: [atom()], + bin: [atom()], + element: [atom()] + ] + def tracked_callbacks_available do + @possible_callbacks end - @doc """ - Reports a pipeline/bin/element initialization. - """ - defmacro report_init(type) when type in [:pipeline, :bin, :element] do - event = - quote do - [:membrane, unquote(type), :init] - end + @spec tracked_callbacks() :: [ + pipeline: [atom()], + bin: [atom()], + element: [atom()] + ] + def tracked_callbacks do + for {component, callbacks} <- @config[:tracked_callbacks] || [] do + case callbacks do + :all -> + {component, @possible_callbacks[component]} - value = - quote do - %{path: ComponentPath.get_formatted()} - end + nil -> + {component, []} - metadata = - quote do - %{log_metadata: Logger.metadata()} + callbacks_list when is_list(callbacks_list) -> + {component, callbacks_list} end + end + end + + defmacrop report_datapoint(datapoint_name, do: lazy_block) do + unless Macro.quoted_literal?(datapoint_name), do: raise("Datapoint type must be a literal") + + cond do + @legacy? -> + do_legacy_telemetry(datapoint_name, lazy_block) + + datapoint_gathered?(datapoint_name) -> + quote do + value = unquote(lazy_block) - report_event( - event, - value, - Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil, - metadata - ) + :telemetry.execute( + [:membrane, :datapoint, unquote(datapoint_name)], + %{value: value}, + %{ + datapoint: unquote(datapoint_name), + component_path: ComponentPath.get(), + component_metadata: Logger.metadata() + } + ) + end + + true -> + quote do + _fn = fn -> + _unused = unquote(datapoint_name) + _unused = unquote(lazy_block) + end + + :ok + end + end end @doc """ - Reports a pipeline/bin/element termination. + Reports a span of a compoment callback function in a format consistent with `span/3` in `:telementry` """ - defmacro report_terminate(type) when type in [:pipeline, :bin, :element] do - event = - quote do - [:membrane, unquote(type), :terminate] - end + @spec track_callback_handler( + (-> CallbackHandler.callback_return() | no_return()), + atom(), + list(), + Element.state() | Bin.state() | Pipeline.state(), + Telemetry.callback_context() + ) :: CallbackHandler.callback_return() | no_return() + def track_callback_handler(f, callback, args, state, context) do + meta = + callback_meta( + state, + callback, + args, + context + ) - value = - quote do - %{path: ComponentPath.get_formatted()} - end + component_type = state_module_to_atom(state.__struct__) - report_event( - event, - value, - Enum.find(@telemetry_flags, &(&1 == :inits_and_terminates)) != nil - ) - end + if handler_reported?(component_type, callback) do + :telemetry.span([:membrane, component_type, callback], meta, fn -> + {_actions, int_state} = res = f.() - # Conditional event reporting of telemetry events - defp report_event(event_name, measurement, enable, metadata \\ nil) do - if enable do - quote do - :telemetry.execute( - unquote(event_name), - unquote(measurement), - unquote(metadata) || %{} - ) - end + # Append the internal state returned from the callback to the metadata + {res, %{meta | state_after_callback: int_state}} + end) else - # A hack to suppress the 'unused variable' warnings - quote do - _fn = fn -> - _unused = unquote(event_name) - _unused = unquote(measurement) - _unused = unquote(metadata) - end + f.() + end + end - :ok - end + defp callback_meta(state, callback, args, context) do + %{ + callback: callback, + callback_args: args, + callback_context: context, + component_path: ComponentPath.get(), + component_type: state.module, + state_before_callback: state.internal_state, + state_after_callback: nil + } + end + + @spec report_incoming_event(%{pad_ref: String.t()}) :: :ok + def report_incoming_event(meta), do: report_datapoint(:event, do: meta) + + @spec report_stream_format(Membrane.StreamFormat.t(), String.t()) :: :ok + def report_stream_format(format, pad_ref), + do: report_datapoint(:stream_format, do: %{format: format, pad_ref: pad_ref}) + + @spec report_buffer(integer() | list()) :: :ok + def report_buffer(length) + + def report_buffer(length) when is_integer(length), + do: report_datapoint(:buffer, do: length) + + def report_buffer(buffers) do + report_datapoint(:buffer, do: length(buffers)) + end + + @spec report_store(integer(), String.t()) :: :ok + def report_store(size, log_tag) do + report_datapoint :store do + %{size: size, log_tag: log_tag} + end + end + + @spec report_take(integer(), String.t()) :: :ok + def report_take(size, log_tag) do + report_datapoint :take do + %{size: size, log_tag: log_tag} end end + + @spec report_queue_len(pid()) :: :ok + def report_queue_len(pid) do + report_datapoint :queue_len do + {:message_queue_len, len} = Process.info(pid, :message_queue_len) + %{len: len} + end + end + + @spec report_link(Link.Endpoint.t(), Link.Endpoint.t()) :: :ok + def report_link(from, to) do + report_datapoint :link do + %{ + parent_component_path: ComponentPath.get_formatted(), + from: inspect(from.child), + to: inspect(to.child), + pad_from: Pad.name_by_ref(from.pad_ref) |> inspect(), + pad_to: Pad.name_by_ref(to.pad_ref) |> inspect() + } + end + end + + defp state_module_to_atom(Membrane.Core.Element.State), do: :element + defp state_module_to_atom(Membrane.Core.Bin.State), do: :bin + defp state_module_to_atom(Membrane.Core.Pipeline.State), do: :pipeline end diff --git a/lib/membrane/element/callback_context.ex b/lib/membrane/element/callback_context.ex index 8e7a783fd..de6289842 100644 --- a/lib/membrane/element/callback_context.ex +++ b/lib/membrane/element/callback_context.ex @@ -19,12 +19,14 @@ defmodule Membrane.Element.CallbackContext do `c:Membrane.Element.WithInputPads.handle_stream_format/4`. """ @type t :: %{ - :pads => %{Membrane.Pad.ref() => Membrane.Element.PadData.t()}, :clock => Membrane.Clock.t() | nil, - :parent_clock => Membrane.Clock.t() | nil, + :module => module(), :name => Membrane.Element.name(), + :pads => %{Membrane.Pad.ref() => Membrane.Element.PadData.t()}, + :parent_clock => Membrane.Clock.t() | nil, :playback => Membrane.Playback.t(), :resource_guard => Membrane.ResourceGuard.t(), + :setup_incomplete? => boolean(), :utility_supervisor => Membrane.UtilitySupervisor.t(), optional(:incoming_demand) => non_neg_integer(), optional(:pad_options) => map(), diff --git a/lib/membrane/pipeline/callback_context.ex b/lib/membrane/pipeline/callback_context.ex index 3c5a6fa6d..962bd912a 100644 --- a/lib/membrane/pipeline/callback_context.ex +++ b/lib/membrane/pipeline/callback_context.ex @@ -15,10 +15,12 @@ defmodule Membrane.Pipeline.CallbackContext do `c:Membrane.Pipeline.handle_crash_group_down/3`. """ @type t :: %{ - :clock => Membrane.Clock.t(), :children => %{Membrane.Child.name() => Membrane.ChildEntry.t()}, + :clock => Membrane.Clock.t(), + :module => module(), :playback => Membrane.Playback.t(), :resource_guard => Membrane.ResourceGuard.t(), + :setup_incomplete? => boolean(), :utility_supervisor => Membrane.UtilitySupervisor.t(), optional(:from) => [GenServer.from()], optional(:members) => [Membrane.Child.name()], diff --git a/lib/membrane/telemetry.ex b/lib/membrane/telemetry.ex index 6000b6f05..e7d72eaf2 100644 --- a/lib/membrane/telemetry.ex +++ b/lib/membrane/telemetry.ex @@ -4,95 +4,154 @@ defmodule Membrane.Telemetry do Membrane uses [Telemetry Package](https://hex.pm/packages/telemetry) for instrumentation and does not store or save any measurements by itself. - It is user's responsibility to use some sort of metrics reporter - that will be attached to `:telemetry` package to consume and process generated measurements. + It is user's responsibility to use some sort of event handler and metric reporter + that will be attached to `:telemetry` package to process generated measurements. ## Instrumentation - The following events are published by Membrane's Core with following measurement types and metadata: + The following telemetric events are published by Membrane's Core components: - * `[:membrane, :metric, :value]` - used to report metrics, such as input buffer's size inside functions, incoming events and received stream format - * Measurement: `t:metric_event_value/0` - * Metadata: `%{}` + * `[:membrane, :element | :bin | :pipeline, callback, :start | :stop | :exception]` - + where callback is any possible handle_* function defined for a component. + * `:start` - when the components begins callback's execution + * `:stop` - when the components finishes callback's execution + * `:exception` - when the component crashes during callback's execution - * `[:membrane, :link, :new]` - to report new link connection being initialized in pipeline. - * Measurement: `t:link_event_value/0` - * Metadata: `%{}` + * `[:membrane, :datapoint, datapoint_type]` - + where datapoint_type is any of the available datapoint types (see below) - * `[:membrane, :pipeline | :bin | :element, :init]` - to report pipeline/element/bin initialization - * Measurement: `t:init_or_terminate_event_value/0` - * Metadata: `%{log_metadata: keyword()}`, includes Logger's metadata of created component - - * `[:membrane, :pipeline | :bin | :element, :terminate]` - to report pipeline/element/bin termination - * Measurement: `t:init_or_terminate_event_value/0` - * Metadata: `%{}` - - - ## Enabling certain metrics/events - A lot of events can happen literally hundreds times per second such as registering that a buffer has been sent/processed. + ## Enabling specific datapoints + A lot of datapoints can happen hundreds times per second such as registering that a buffer has been sent/processed. This behaviour can come with a great performance penalties but may be helpful for certain discoveries. To avoid any runtime overhead - when the reporting is not necessary all metrics/events are hidden behind a compile-time feature flags. - To enable a particular measurement one must recompile membrane core with the following snippet put inside + when the reporting is not necessary all spans/datapoints are hidden behind a compile-time feature flags. + To enable a particular measurement one must recompile membrane core with the following config put inside user's application `config.exs` file: + ``` config :membrane_core, telemetry_flags: [ - :flag_name, - ..., - {:metrics, [list of metrics]} - ... + tracked_callbacks: [ + bin: [:handle_setup, ...] | :all, + element: [:handle_init, ...] | :all, + pipeline: [:handle_init, ...] | :all + ], + datapoints: [:buffer, ...] | :all ] + ``` - Available flags are (those are of a very low overhead): - * `:links` - enables new links notifications - * `:inits_and_terminates` - enables notifications of `init` and `terminate` events for elements/bins/pipelines + Datapoint metrics are to be deprecated in the future (2.0) in favor of spans. They are still available for now. - - Additionally one can control which metric values should get measured by passing an option of format : - `{:metrics, [list of metrics]}` - - Available metrics are: + Available datapoints are: + * `:link` - reports the number of links created in the pipeline * `:buffer` - number of buffers being sent from a particular element - * `:bitrate` - total number of bits carried by the buffers mentioned above * `:queue_len` - number of messages in element's message queue during GenServer's `handle_info` invocation * `:stream_format` - indicates that given element has received new stream format, value always equals '1' * `:event` - indicates that given element has received a new event, value always equals '1' * `:store` - reports the current size of a input buffer when storing a new buffer - * `:take_and_demand` - reports the current size of a input buffer when taking buffers and making a new demand + * `:take` - reports the number of buffers taken from the input buffer + """ + + alias Membrane.{Bin, ComponentPath, Element, Pipeline} + + @typedoc """ + Atom representation of Membrane components subject to telemetry reports """ + @type component_type :: :element | :bin | :pipeline - @type event_name :: [atom(), ...] + @type callback_context :: + Element.CallbackContext.t() | Bin.CallbackContext.t() | Pipeline.CallbackContext.t() @typedoc """ - * component_path - element's or bin's path - * metric - metric's name - * value - metric's value + Metadata included with each telemetry component's handler profiled + * callback - name of the callback function + * callback_args - arguments passed to the callback + * callback_context - context of the callback consistent with Membrane.*.CallbackContext + * component_path - path of the component in the pipeline consistent with t:ComponentPath.path/0 + * component_type - atom representation of the base component type + * state_before_callback - state of the component before the callback execution + * state_after_callback - state of the component after the callback execution, it's `nil` on :start and :exception events """ - @type metric_event_value :: %{ - component_path: String.t(), - metric: String.t(), - value: integer() + @type callback_span_metadata :: %{ + callback: atom(), + callback_args: [any()], + callback_context: callback_context(), + component_path: ComponentPath.path(), + component_type: component_type(), + state_before_callback: Element.state() | Bin.state() | Pipeline.state(), + state_after_callback: Element.state() | Bin.state() | Pipeline.state() | nil } @typedoc """ - * path - element's path + Types of telemetry datapoints reported by Membrane Core + """ + @type datapoint_type :: :link | :buffer | :queue_len | :stream_format | :event | :store | :take + + @typedoc """ + Metadata included with each telemetry event """ - @type init_or_terminate_event_value :: %{ - path: Membrane.ComponentPath.path() + @type datapoint_metadata :: %{ + datapoint: datapoint_type(), + component_path: ComponentPath.path(), + component_type: component_type() } @typedoc """ - * parent_path - process path of link's parent + Value of the link datapoint + * parent_component_path - process path of link's parent * from - from element name * to - to element name * pad_from - from's pad name * pad_to - to's pad name """ - @type link_event_value :: %{ - parent_path: String.t(), + @type link_datapoint_value :: %{ + parent_component_path: String.t(), from: String.t(), to: String.t(), pad_from: String.t(), pad_to: String.t() } + @type buffer_datapoint_value :: integer() + @type queue_len_datapoint_value :: integer() + @type stream_format_datapoint_value :: %{format: map(), pad_ref: String.t()} + @type incoming_event_datapoint_value :: String.t() + @type store_datapoint_value :: %{value: integer(), log_tag: String.t()} + + @typedoc """ + Value of the specific event gathered + """ + @type datapoint_value :: %{ + value: + buffer_datapoint_value() + | queue_len_datapoint_value() + | stream_format_datapoint_value() + | incoming_event_datapoint_value() + | store_datapoint_value() + | integer() + } + + @doc """ + Returns if the event type is configured to be gathered by Membrane's Core telemetry + """ + @spec datapoint_gathered?(any()) :: boolean() + defdelegate datapoint_gathered?(type), to: Membrane.Core.Telemetry + + @doc """ + Lists all components and their callbacks tracked by Membrane's Core telemetry + """ + @spec tracked_callbacks() :: [ + pipeline: [atom()], + bin: [atom()], + element: [atom()] + ] + defdelegate tracked_callbacks, to: Membrane.Core.Telemetry + + @doc """ + Lists all possible components and their callbacks that can be gathered when telemetry is enabled + """ + @spec tracked_callbacks_available() :: [ + pipeline: [atom()], + bin: [atom()], + element: [atom()] + ] + defdelegate tracked_callbacks_available, to: Membrane.Core.Telemetry end diff --git a/test/membrane/telemetry_test.exs b/test/membrane/telemetry_test.exs new file mode 100644 index 000000000..8d3dee49e --- /dev/null +++ b/test/membrane/telemetry_test.exs @@ -0,0 +1,304 @@ +defmodule Membrane.TelemetryTest do + @moduledoc """ + Test suite for Membrane telemetry public API. It tests if telemetry datapoints are reported + properly for all event types and span types upon attaching to the :telemetry system. + + + Remove below with 2.0.0 release: + In case of a need to test legacy telemetry paste the following snippet into the config file: + + # config to test legacy version of telemetry + # config :membrane_core, :telemetry_flags, [ + # :report_init, + # :report_terminate, + # :report_buffer, + # :report_queue, + # :report_link, + # metrics: [ + # :buffer, + # :bitrate, + # :queue_len, + # :stream_format, + # :event, + # :store, + # :take_and_demand + # ] + # ] + + ``` + """ + + use ExUnit.Case, async: false + + import ExUnit.CaptureLog + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + alias Membrane.Core.Telemetry + alias Membrane.Testing + require Logger + + @moduletag if Telemetry.legacy?(), do: :skip, else: nil + + defmodule TestFilter do + use Membrane.Filter + + def_input_pad :input, accepted_format: _any + def_output_pad :output, accepted_format: _any + + @impl true + def handle_buffer(_pad, buffer, _context, state), do: {[buffer: {:output, buffer}], state} + + @impl true + def handle_parent_notification(:crash, _context, state) do + raise "Intended Crash Test" + {[], state} + end + end + + defmodule TelemetryListener do + @spec handle_measurements(atom(), any(), map(), map()) :: :ok + def handle_measurements(name, value, metadata, %{dest: pid, ref: ref}) do + pid |> send({ref, :telemetry_ack, {name, value, metadata}}) + end + end + + setup do + if Telemetry.legacy?() do + [skip: true] + else + child_spec = + child(:source, %Testing.Source{output: ["a", "b", "c"]}) + |> child(:filter, TestFilter) + |> child(:sink, Testing.Sink) + + [child_spec: child_spec] + end + end + + @paths ~w[:filter :sink :source] + @spans [:handle_init, :handle_setup, :handle_playing, :handle_terminate_request] + @steps [:start, :stop] + + describe "Telemetry reports elements'" do + setup %{child_spec: child_spec} do + ref = make_ref() + + spans = + for handler <- @spans, + step <- @steps do + [:membrane, :element, handler, step] + end + + setup_pipeline_for_callbacks(spans, child_spec, ref) + + [ref: ref] + end + + # Test each lifecycle step for each element type + for element_type <- @paths, + span <- @spans do + test "#{element_type}/#{span}", %{ref: ref} do + element_type = unquote(element_type) + span = unquote(span) + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :element, ^span, :start], results, + %{component_path: [_, ^element_type]}}}, + 1000 + + assert results.monotonic_time + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :element, ^span, :stop], results, + %{component_path: [_, ^element_type]}}}, + 1000 + + assert results.duration >= 0 + end + end + end + + describe "Telemetry reports pipelines'" do + setup %{child_spec: child_spec} do + ref = make_ref() + + spans = + for span <- @spans, + step <- @steps do + [:membrane, :pipeline, span, step] + end + + setup_pipeline_for_callbacks(spans, child_spec, ref) + + [ref: ref] + end + + test "lifecycle", %{ref: ref} do + assert_receive {^ref, :telemetry_ack, + {[:membrane, :pipeline, :handle_init, :start], results, %{}}} + + assert results.monotonic_time + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :pipeline, :handle_init, :stop], results, %{}}} + + assert results.duration >= 0 + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :pipeline, :handle_setup, :start], results, %{}}} + + assert results.monotonic_time + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :pipeline, :handle_setup, :stop], results, %{}}} + + assert results.duration >= 0 + end + end + + describe "Telemetry properly reports end of span when exception was encountered" do + setup %{child_spec: child_spec} do + ref = make_ref() + + spans = + [ + [:membrane, :element, :handle_parent_notification, :start], + [:membrane, :element, :handle_parent_notification, :stop], + [:membrane, :element, :handle_parent_notification, :exception] + ] + + :telemetry.attach_many(ref, spans, &TelemetryListener.handle_measurements/4, %{ + dest: self(), + ref: ref + }) + + pid = Testing.Pipeline.start_supervised!(spec: child_spec) + + capture_log(fn -> + :ok = Testing.Pipeline.notify_child(pid, :filter, :crash) + end) + + [ref: ref] + end + + test "in element", %{ref: ref} do + assert_receive {^ref, :telemetry_ack, + {[:membrane, :element, :handle_parent_notification, :start], _results, + %{component_path: [_, ":filter"]}}}, + 1000 + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :element, :handle_parent_notification, :exception], results, + %{component_path: [_, ":filter"]}}}, + 1000 + + assert results.duration >= 0 + + refute_received {^ref, :telemetry_ack, + {[:membrane, :element, :handle_parent_notification, :stop], _results, _}} + end + end + + describe "Telemetry properly reports following datapoints: " do + test "Link", %{child_spec: child_spec} do + ref = setup_pipeline_for(:link, child_spec) + + assert_receive {^ref, :telemetry_ack, {[:membrane, :datapoint, :link], link1, metadata}} + assert_event_metadata(metadata) + + assert_receive {^ref, :telemetry_ack, {[:membrane, :datapoint, :link], link2, metadata}} + assert_event_metadata(metadata) + + assert [ + %{ + from: ":filter", + pad_from: ":output", + pad_to: ":input", + parent_component_path: _, + to: ":sink" + }, + %{ + from: ":source", + pad_from: ":output", + pad_to: ":input", + parent_component_path: _, + to: ":filter" + } + ] = Enum.sort([link1.value, link2.value]) + end + + test "Stream Format", %{child_spec: child_spec} do + ref = setup_pipeline_for(:stream_format, child_spec) + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :datapoint, :stream_format], measurement, metadata}} + + assert measurement.value.format.type == :bytestream + assert_event_metadata(metadata) + end + + test "Buffer", %{child_spec: child_spec} do + ref = setup_pipeline_for(:buffer, child_spec) + + for _ <- 1..3 do + assert_receive {^ref, :telemetry_ack, + {[:membrane, :datapoint, :buffer], measurement, metadata}} + + assert measurement.value != 0 + assert_event_metadata(metadata) + end + end + + test "Event", %{child_spec: child_spec} do + ref = setup_pipeline_for(:event, child_spec) + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :datapoint, :event], measurement, metadata}} + + assert measurement.value.pad_ref == ":input" + assert_event_metadata(metadata) + end + + test "Queue Length", %{child_spec: child_spec} do + ref = setup_pipeline_for(:queue_len, child_spec) + + assert_receive {^ref, :telemetry_ack, + {[:membrane, :datapoint, :queue_len], measurement, metadata}} + + assert measurement.value + assert_event_metadata(metadata) + end + end + + defp assert_event_metadata(metadata) do + assert is_atom(metadata.datapoint) + assert is_list(metadata.component_path) + assert metadata.component_metadata + end + + defp setup_pipeline_for(event, child_spec) do + ref = make_ref() + + :telemetry.attach( + ref, + [:membrane, :datapoint, event], + &TelemetryListener.handle_measurements/4, + %{dest: self(), ref: ref} + ) + + Testing.Pipeline.start_link_supervised!(spec: child_spec) + + ref + end + + defp setup_pipeline_for_callbacks(spans, child_spec, ref) do + :telemetry.attach_many(ref, spans, &TelemetryListener.handle_measurements/4, %{ + dest: self(), + ref: ref + }) + + pid = Testing.Pipeline.start_link_supervised!(spec: child_spec) + assert_end_of_stream(pid, :sink) + :ok = Testing.Pipeline.terminate(pid) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 2f5b09483..10efabeb7 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -3,5 +3,4 @@ System.cmd("epmd", ["-daemon"]) Node.start(:"my_node@127.0.0.1", :longnames) ExUnit.configure(formatters: [ExUnit.CLIFormatter, JUnitFormatter]) -# ExUnit.start(exclude: [:long_running], assert_receive_timeout: 500) ExUnit.start(exclude: [:long_running], capture_log: true, assert_receive_timeout: 500)