Skip to content

Commit

Permalink
Merge pull request #918 from membraneframework/905-telemetry
Browse files Browse the repository at this point in the history
Telemetry (#905)
  • Loading branch information
wende authored Feb 6, 2025
2 parents 0a4ca33 + dc707ac commit 7089f8a
Show file tree
Hide file tree
Showing 28 changed files with 936 additions and 266 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904)
* Implememnt diamonds detection. [#909](https://github.com/membraneframework/membrane_core/pull/909)
* Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922)
* Added new, revised Telemetry system [#905](https://github.com/membraneframework/membrane_core/pull/918)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
14 changes: 14 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,18 @@ import Config

if config_env() == :test do
config :junit_formatter, include_filename?: true

config :membrane_core, :telemetry_flags,
tracked_callbacks: [
element: [
:handle_init,
:handle_playing,
:handle_setup,
:handle_terminate_request,
:handle_parent_notification
],
bin: :all,
pipeline: :all
],
datapoints: :all
end
6 changes: 4 additions & 2 deletions lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ defmodule Membrane.Bin.CallbackContext do
`c:Membrane.Bin.handle_crash_group_down/3`.
"""
@type t :: %{
:children => %{Membrane.Child.name() => Membrane.ChildEntry.t()},
:clock => Membrane.Clock.t(),
:parent_clock => Membrane.Clock.t(),
:pads => %{Membrane.Pad.ref() => Membrane.Bin.PadData.t()},
:module => module(),
:name => Membrane.Bin.name(),
:children => %{Membrane.Child.name() => Membrane.ChildEntry.t()},
:pads => %{Membrane.Pad.ref() => Membrane.Bin.PadData.t()},
:playback => Membrane.Playback.t(),
:resource_guard => Membrane.ResourceGuard.t(),
:setup_incomplete? => boolean(),
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane/component_path.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.ComponentPath do

@typedoc @moduledoc
@type path :: list(String.t())
@type formatted_path :: String.t()

@key :membrane_path

Expand All @@ -16,7 +17,7 @@ defmodule Membrane.ComponentPath do
If path had already existed then replaces it.
"""
@spec set(path) :: :ok
@spec set(path()) :: :ok
def set(path) do
Process.put(@key, path)
:ok
Expand All @@ -43,6 +44,6 @@ defmodule Membrane.ComponentPath do
If path has not been set, empty list is returned.
"""
@spec get() :: list(String.t())
@spec get() :: path()
def get(), do: Process.get(@key, [])
end
7 changes: 3 additions & 4 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ defmodule Membrane.Core.Bin do
Parent,
ProcessHelper,
SubprocessSupervisor,
Telemetry,
TimerController
}

alias Membrane.ResourceGuard

require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message
require Membrane.Core.Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry
require Membrane.Logger

@type options :: %{
Expand Down Expand Up @@ -127,8 +126,8 @@ defmodule Membrane.Core.Bin do
{:ok, resource_guard} =
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:bin)
ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:bin) end)
LegacyTelemetry.report_init(:bin)
ResourceGuard.register(resource_guard, fn -> LegacyTelemetry.report_terminate(:bin) end)

state =
%State{
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/core/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ defmodule Membrane.Core.Bin.CallbackContext do
def from_state(state, optional_fields \\ []) do
Map.new(optional_fields)
|> Map.merge(%{
children: state.children,
clock: state.synchronization.clock,
parent_clock: state.synchronization.parent_clock,
pads: state.pads_data,
module: state.module,
name: state.name,
children: state.children,
playback: state.playback,
resource_guard: state.resource_guard,
setup_incomplete?: state.setup_incomplete?,
utility_supervisor: state.subprocess_supervisor
})
end
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ defmodule Membrane.Core.Bin.State do

@type t :: %__MODULE__{
internal_state: Membrane.Bin.state() | nil,
module: module,
module: module(),
children: ChildrenModel.children(),
subprocess_supervisor: pid(),
name: Membrane.Bin.name() | nil,
pads_info: PadModel.pads_info() | nil,
pads_data: PadModel.pads_data() | nil,
parent_pid: pid,
parent_pid: pid(),
links: %{Link.id() => Link.t()},
crash_groups: %{CrashGroup.name() => CrashGroup.t()},
synchronization: %{
Expand Down
62 changes: 34 additions & 28 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Membrane.Core.CallbackHandler do
alias Membrane.CallbackError

require Membrane.Logger
require Membrane.Core.Telemetry, as: Telemetry

@type state :: %{
:module => module,
Expand Down Expand Up @@ -125,45 +126,50 @@ defmodule Membrane.Core.CallbackHandler do
end

@spec exec_callback(callback :: atom, args :: list, handler_params, state) ::
{list, internal_state}
{list, internal_state} | no_return()
defp exec_callback(
callback,
args,
%{context: context_fun},
%{module: module, internal_state: internal_state} = state
) do
args = args ++ [context_fun.(state), internal_state]
context = context_fun.(state)
args = args ++ [context, internal_state]

callback_result =
try do
try do
fn ->
apply(module, callback, args)
rescue
e in UndefinedFunctionError ->
_ignored =
with %{module: ^module, function: ^callback, arity: arity} <- e do
reraise CallbackError,
[
kind: :not_implemented,
callback: {module, callback},
arity: arity,
args: args
],
__STACKTRACE__
end

reraise e, __STACKTRACE__
|> validate_callback_result!(module, callback)
end
|> Telemetry.track_callback_handler(callback, args, state, context)
rescue
e in UndefinedFunctionError ->
_ignored =
with %{module: ^module, function: ^callback, arity: arity} <- e do
reraise CallbackError,
[
kind: :not_implemented,
callback: {module, callback},
arity: arity,
args: args
],
__STACKTRACE__
end

reraise e, __STACKTRACE__
end
end

case callback_result do
{actions, _state} when is_list(actions) ->
callback_result
defp validate_callback_result!({actions, _state} = callback_result, _module, _cb)
when is_list(actions) do
callback_result
end

_result ->
raise CallbackError,
kind: :bad_return,
callback: {module, callback},
value: callback_result
end
defp validate_callback_result!(callback_result, module, callback) do
raise CallbackError,
kind: :bad_return,
callback: {module, callback},
value: callback_result
end

@spec handle_callback_result(
Expand Down
17 changes: 7 additions & 10 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ defmodule Membrane.Core.Element do
require Membrane.Core.Message, as: Message
require Membrane.Core.Stalker, as: Stalker
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry
require Membrane.Logger

@type options :: %{
module: module,
module: module(),
name: Membrane.Element.name(),
node: node | nil,
node: node() | nil,
user_options: Membrane.Element.options(),
sync: Sync.t(),
parent: pid,
parent: pid(),
parent_clock: Clock.t(),
parent_path: Membrane.ComponentPath.path(),
log_metadata: Logger.metadata(),
Expand Down Expand Up @@ -118,9 +119,8 @@ defmodule Membrane.Core.Element do
{:ok, resource_guard} =
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:element)

ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:element) end)
LegacyTelemetry.report_init(:element)
ResourceGuard.register(resource_guard, fn -> LegacyTelemetry.report_terminate(:element) end)

self_pid = self()

Expand Down Expand Up @@ -206,10 +206,7 @@ defmodule Membrane.Core.Element do
@impl GenServer
def handle_info(message, state) do
Utils.log_on_error do
Telemetry.report_metric(
:queue_len,
:erlang.process_info(self(), :message_queue_len) |> elem(1)
)
Telemetry.report_queue_len(self())

do_handle_info(message, state)
end
Expand Down
9 changes: 4 additions & 5 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ defmodule Membrane.Core.Element.ActionHandler do

import Membrane.Pad, only: [is_pad_ref: 1]

alias Membrane.Telemetry

alias Membrane.{
ActionError,
Buffer,
Expand All @@ -34,8 +32,9 @@ defmodule Membrane.Core.Element.ActionHandler do

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Message, as: Message
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Logger
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry

@impl CallbackHandler
def transform_actions(actions, _callback, _handler_params, state) do
Expand Down Expand Up @@ -319,8 +318,8 @@ defmodule Membrane.Core.Element.ActionHandler do
"Sending #{length(buffers)} buffer(s) through pad #{inspect(pad_ref)}"
)

Telemetry.report_metric(:buffer, length(buffers))
Telemetry.report_bitrate(buffers)
LegacyTelemetry.report_bitrate(buffers)
Telemetry.report_buffer(buffers)

Enum.each(buffers, fn
%Buffer{} -> :ok
Expand Down
11 changes: 5 additions & 6 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.Core.Element.BufferController do
use Bunch

alias Membrane.{Buffer, Pad}
alias Membrane.Core.{CallbackHandler, Telemetry}
alias Membrane.Core.CallbackHandler
alias Membrane.Core.Child.PadModel

alias Membrane.Core.Element.{
Expand All @@ -24,7 +24,8 @@ defmodule Membrane.Core.Element.BufferController do
alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry

@doc """
Handles incoming buffer: either stores it in InputQueue, or executes element's
Expand Down Expand Up @@ -105,16 +106,14 @@ defmodule Membrane.Core.Element.BufferController do
"""
@spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric(:buffer, 1, inspect(pad_ref))
LegacyTelemetry.report_bitrate(buffers)
Telemetry.report_buffer(buffers)

do_exec_buffer_callback(pad_ref, buffers, state)
end

def exec_buffer_callback(pad_ref, buffers, %State{type: type} = state)
when type in [:sink, :endpoint] do
Telemetry.report_metric(:buffer, length(List.wrap(buffers)))
Telemetry.report_bitrate(buffers)

do_exec_buffer_callback(pad_ref, buffers, state)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/element/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ defmodule Membrane.Core.Element.CallbackContext do
pads: state.pads_data,
clock: state.synchronization.clock,
parent_clock: state.synchronization.parent_clock,
module: state.module,
name: state.name,
playback: state.playback,
resource_guard: state.resource_guard,
setup_incomplete?: state.setup_incomplete?,
utility_supervisor: state.subprocess_supervisor
})
end
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Membrane.Core.Element.EventController do
def handle_event(pad_ref, event, state) do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
Telemetry.report_metric(:event, 1, inspect(pad_ref))
Telemetry.report_incoming_event(%{pad_ref: inspect(pad_ref)})

async? = Event.async?(event)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do

alias Membrane.Buffer
alias Membrane.Core.Element.AtomicDemand
alias Membrane.Core.Telemetry
alias Membrane.Event
alias Membrane.Pad
alias Membrane.StreamFormat

require Membrane.Core.Stalker, as: Stalker
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Logger

@qe Qex
Expand Down Expand Up @@ -124,7 +124,6 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do

%__MODULE__{size: size} = input_queue = do_store_buffers(input_queue, v)

Telemetry.report_metric(:store, size, input_queue.log_tag)
:atomics.put(stalker_metrics.size, 1, size)

input_queue
Expand All @@ -135,8 +134,7 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do
def store(%__MODULE__{q: q, size: size} = input_queue, type, v)
when type in @non_buf_types do
"Storing #{type}" |> mk_log(input_queue) |> Membrane.Logger.debug_verbose()

Telemetry.report_metric(:store, size, input_queue.log_tag)
Telemetry.report_store(size, input_queue.log_tag)

%__MODULE__{input_queue | q: q |> @qe.push({:non_buffer, type, v})}
end
Expand Down Expand Up @@ -176,7 +174,7 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do
input_queue = maybe_increase_atomic_demand(input_queue)

%{size: size, stalker_metrics: stalker_metrics} = input_queue
Telemetry.report_metric(:take, size, input_queue.log_tag)
Telemetry.report_take(size, input_queue.log_tag)
:atomics.put(stalker_metrics.size, 1, size)

{out, input_queue}
Expand Down
Loading

0 comments on commit 7089f8a

Please sign in to comment.