Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry (#905) #918

Merged
merged 46 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4fcfcb2
Telemetry refresh and element/pipeline telemetry testing for init and…
wende Dec 5, 2024
328d30a
Tests async false. Telemetry captured other tests' processes
wende Dec 5, 2024
e599fcb
Unnecessary code trim
wende Dec 5, 2024
e60b127
Telemetry spans for init and setup
wende Dec 6, 2024
42f099e
Telemetry for spans, metrics and events for elements and pipelines
wende Dec 12, 2024
56890e0
Report playing and links
wende Dec 16, 2024
3cb267e
CQ and cleanup
wende Dec 16, 2024
7a69d1e
Cleanup of old telemetry and more universal method of measuring spans…
wende Jan 7, 2025
a77d6c0
Tests for handlers throwing exceptions and proper santization of comp…
wende Jan 9, 2025
3861859
Proper naming, dead code cleanup, no unnecessary optimization, crash …
wende Jan 13, 2025
66eea5a
CQ
wende Jan 16, 2025
0e95d43
Comments by @FelonEkonom adressed
wende Jan 20, 2025
f6b5ce1
Comments by @FelonEkonom adressed
wende Jan 24, 2025
db19995
Old metrics revamped and adjusted to new telemetry implementation
wende Jan 24, 2025
a442758
Small changes to Telemetry.report_buffer/2
wende Jan 27, 2025
f970d20
Legacy telemetry revived and rerouted based on config usage
wende Jan 28, 2025
2b1a474
Tests and documentation for measured events and their metadata
wende Jan 28, 2025
b00b8c9
Final touches
wende Jan 28, 2025
e508240
Update lib/membrane/telemetry.ex
wende Jan 30, 2025
9c65c71
Update lib/membrane/telemetry.ex
wende Jan 30, 2025
d53708b
Update lib/membrane/telemetry.ex
wende Jan 30, 2025
ec80e2a
PR and CI Comments addressed
wende Jan 30, 2025
f910961
Credo changes
wende Jan 30, 2025
083f65e
Unnecessary element and bin state fields removed
wende Jan 30, 2025
95bdab1
Credo satisfied
wende Jan 30, 2025
c905ec5
Changed component context and added tests reflecting the changes
wende Jan 30, 2025
b1a9627
LegacyTelemetry sometimes didn't work properly
wende Jan 30, 2025
8a01d9b
Dialyzer fix
wende Jan 30, 2025
fe9e359
Dialyzer satisfied
wende Jan 31, 2025
6c7b091
Appeasing the dialyzer
wende Jan 31, 2025
4bc4c25
Credo
wende Jan 31, 2025
d7f083a
Context instead of state
wende Jan 31, 2025
66cdefd
Simpler(?) telemetry private API
wende Feb 3, 2025
24f6cb6
Documentation of metadata passed to and from telemetry modules
wende Feb 3, 2025
7041bf9
Satisfy FilterAggregator.Context
wende Feb 3, 2025
72ca1be
Documentation of metadata passed to and from telemetry modules
wende Feb 3, 2025
fe86693
Event -> Datapoint
wende Feb 3, 2025
17dfe27
Update lib/membrane/telemetry.ex
wende Feb 4, 2025
cf8efa1
Comments addressed
wende Feb 4, 2025
fe9d0eb
Track callback handler name
wende Feb 4, 2025
70cc188
Config cleanup
wende Feb 4, 2025
e846d7a
Faulty f+r
wende Feb 5, 2025
9583a5d
Better public API
wende Feb 6, 2025
9894437
Update lib/membrane/telemetry.ex
wende Feb 6, 2025
94ba04a
Documentation changes
wende Feb 6, 2025
dc707ac
faulty merge
wende Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904)
* Implememnt diamonds detection. [#909](https://github.com/membraneframework/membrane_core/pull/909)
* Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922)
* Added new, revised Telemetry system [#905](https://github.com/membraneframework/membrane_core/pull/918)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
32 changes: 32 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,36 @@ import Config

if config_env() == :test do
config :junit_formatter, include_filename?: true

config :membrane_core, :telemetry_flags,
tracked_callbacks: [
element: [
:handle_init,
:handle_playing,
:handle_setup,
:handle_terminate_request,
:handle_parent_notification
],
bin: :all,
pipeline: :all
],
datapoints: :all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this enables the old, deprecated metrics. Why do we add a new config key for a deprecated feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not for deprecated metrics. It's for those that were renewed. So:
link, buffer, queue_len, stream_format, event, store and take (no bitrate, demand, init or terminate)

And it's only for those to be tested


# config to test legacy version of telemetry
# config :membrane_core, :telemetry_flags, [
# :report_init,
# :report_terminate,
# :report_buffer,
# :report_queue,
# :report_link,
# metrics: [
# :buffer,
# :bitrate,
# :queue_len,
# :stream_format,
# :event,
# :store,
# :take_and_demand
# ]
# ]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to delete it before merging PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I want to leave it in some documentation since it makes it much easier to manually test behaviour on legacy settings. Any idea where I could put it?

end
6 changes: 4 additions & 2 deletions lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ defmodule Membrane.Bin.CallbackContext do
`c:Membrane.Bin.handle_crash_group_down/3`.
"""
@type t :: %{
:children => %{Membrane.Child.name() => Membrane.ChildEntry.t()},
:clock => Membrane.Clock.t(),
:parent_clock => Membrane.Clock.t(),
:pads => %{Membrane.Pad.ref() => Membrane.Bin.PadData.t()},
:module => module(),
:name => Membrane.Bin.name(),
:children => %{Membrane.Child.name() => Membrane.ChildEntry.t()},
:pads => %{Membrane.Pad.ref() => Membrane.Bin.PadData.t()},
:playback => Membrane.Playback.t(),
:resource_guard => Membrane.ResourceGuard.t(),
:setup_incomplete? => boolean(),
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane/component_path.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.ComponentPath do

@typedoc @moduledoc
@type path :: list(String.t())
@type formatted_path :: String.t()

@key :membrane_path

Expand All @@ -16,7 +17,7 @@ defmodule Membrane.ComponentPath do

If path had already existed then replaces it.
"""
@spec set(path) :: :ok
@spec set(path()) :: :ok
def set(path) do
Process.put(@key, path)
:ok
Expand All @@ -43,6 +44,6 @@ defmodule Membrane.ComponentPath do

If path has not been set, empty list is returned.
"""
@spec get() :: list(String.t())
@spec get() :: path()
def get(), do: Process.get(@key, [])
end
7 changes: 3 additions & 4 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ defmodule Membrane.Core.Bin do
Parent,
ProcessHelper,
SubprocessSupervisor,
Telemetry,
TimerController
}

alias Membrane.ResourceGuard

require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message
require Membrane.Core.Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry
require Membrane.Logger

@type options :: %{
Expand Down Expand Up @@ -127,8 +126,8 @@ defmodule Membrane.Core.Bin do
{:ok, resource_guard} =
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:bin)
ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:bin) end)
LegacyTelemetry.report_init(:bin)
ResourceGuard.register(resource_guard, fn -> LegacyTelemetry.report_terminate(:bin) end)

state =
%State{
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/core/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ defmodule Membrane.Core.Bin.CallbackContext do
def from_state(state, optional_fields \\ []) do
Map.new(optional_fields)
|> Map.merge(%{
children: state.children,
clock: state.synchronization.clock,
parent_clock: state.synchronization.parent_clock,
pads: state.pads_data,
module: state.module,
name: state.name,
children: state.children,
playback: state.playback,
resource_guard: state.resource_guard,
setup_incomplete?: state.setup_incomplete?,
utility_supervisor: state.subprocess_supervisor
})
end
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ defmodule Membrane.Core.Bin.State do

@type t :: %__MODULE__{
internal_state: Membrane.Bin.state() | nil,
module: module,
module: module(),
children: ChildrenModel.children(),
subprocess_supervisor: pid(),
name: Membrane.Bin.name() | nil,
pads_info: PadModel.pads_info() | nil,
pads_data: PadModel.pads_data() | nil,
parent_pid: pid,
parent_pid: pid(),
links: %{Link.id() => Link.t()},
crash_groups: %{CrashGroup.name() => CrashGroup.t()},
synchronization: %{
Expand Down
80 changes: 52 additions & 28 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ defmodule Membrane.Core.CallbackHandler do
use Bunch

alias Membrane.CallbackError
alias Membrane.ComponentPath

require Membrane.Logger
require Membrane.Core.Telemetry, as: Telemetry

@type state :: %{
:module => module,
Expand Down Expand Up @@ -125,45 +127,67 @@ defmodule Membrane.Core.CallbackHandler do
end

@spec exec_callback(callback :: atom, args :: list, handler_params, state) ::
{list, internal_state}
{list, internal_state} | no_return()
defp exec_callback(
callback,
args,
%{context: context_fun},
%{module: module, internal_state: internal_state} = state
) do
args = args ++ [context_fun.(state), internal_state]
context = context_fun.(state)
args = args ++ [context, internal_state]

callback_result =
try do
try do
fn ->
apply(module, callback, args)
rescue
e in UndefinedFunctionError ->
_ignored =
with %{module: ^module, function: ^callback, arity: arity} <- e do
reraise CallbackError,
[
kind: :not_implemented,
callback: {module, callback},
arity: arity,
args: args
],
__STACKTRACE__
end

reraise e, __STACKTRACE__
|> validate_callback_result!(module, callback)
end
|> report_telemetry(callback, args, state, context)
Copy link
Member

@FelonEkonom FelonEkonom Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API of Telemetry module should allow on sth like

Suggested change
|> report_telemetry(callback, args, state, context)
|> Telemetry.track_callback_handler(callback, args, context, state)

and the creation of event metadata should be fully hidden inside Telemetry module

rescue
e in UndefinedFunctionError ->
_ignored =
with %{module: ^module, function: ^callback, arity: arity} <- e do
reraise CallbackError,
[
kind: :not_implemented,
callback: {module, callback},
arity: arity,
args: args
],
__STACKTRACE__
end

reraise e, __STACKTRACE__
end
end

case callback_result do
{actions, _state} when is_list(actions) ->
callback_result
defp validate_callback_result!({actions, _state} = callback_result, _module, _cb)
when is_list(actions) do
callback_result
end

_result ->
raise CallbackError,
kind: :bad_return,
callback: {module, callback},
value: callback_result
end
defp validate_callback_result!(callback_result, module, callback) do
raise CallbackError,
kind: :bad_return,
callback: {module, callback},
value: callback_result
end

defp report_telemetry(f, callback, args, state, context) do
Telemetry.span_component_callback(
f,
state.__struct__,
callback,
%{
callback: callback,
callback_args: args,
callback_context: context,
component_path: ComponentPath.get(),
component_type: state.module,
internal_state_before: state.internal_state,
internal_state_after: nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move creating this map into Telemetry.span_component_callback/4

)
end

@spec handle_callback_result(
Expand Down
17 changes: 7 additions & 10 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ defmodule Membrane.Core.Element do
require Membrane.Core.Message, as: Message
require Membrane.Core.Stalker, as: Stalker
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry
require Membrane.Logger

@type options :: %{
module: module,
module: module(),
name: Membrane.Element.name(),
node: node | nil,
node: node() | nil,
user_options: Membrane.Element.options(),
sync: Sync.t(),
parent: pid,
parent: pid(),
parent_clock: Clock.t(),
parent_path: Membrane.ComponentPath.path(),
log_metadata: Logger.metadata(),
Expand Down Expand Up @@ -118,9 +119,8 @@ defmodule Membrane.Core.Element do
{:ok, resource_guard} =
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()})

Telemetry.report_init(:element)

ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:element) end)
LegacyTelemetry.report_init(:element)
ResourceGuard.register(resource_guard, fn -> LegacyTelemetry.report_terminate(:element) end)

self_pid = self()

Expand Down Expand Up @@ -206,10 +206,7 @@ defmodule Membrane.Core.Element do
@impl GenServer
def handle_info(message, state) do
Utils.log_on_error do
Telemetry.report_metric(
:queue_len,
:erlang.process_info(self(), :message_queue_len) |> elem(1)
)
Telemetry.report_queue_len(self())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be LegacyTelemetry?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relates to #918 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All calls that were removed go directly to LegacyTelemetry so that they can be easily identified to be removed later on. Those that are both in the old and the new one go to Telemetry that redirects the calls based on the config having old or new format. When the legacy telemetry is removed this will allow it only to delete lines with LegacyTelemetry as well as the small block of redirection in new Telemetry


do_handle_info(message, state)
end
Expand Down
9 changes: 4 additions & 5 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ defmodule Membrane.Core.Element.ActionHandler do

import Membrane.Pad, only: [is_pad_ref: 1]

alias Membrane.Telemetry

alias Membrane.{
ActionError,
Buffer,
Expand All @@ -34,8 +32,9 @@ defmodule Membrane.Core.Element.ActionHandler do

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Message, as: Message
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Logger
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry

@impl CallbackHandler
def transform_actions(actions, _callback, _handler_params, state) do
Expand Down Expand Up @@ -319,8 +318,8 @@ defmodule Membrane.Core.Element.ActionHandler do
"Sending #{length(buffers)} buffer(s) through pad #{inspect(pad_ref)}"
)

Telemetry.report_metric(:buffer, length(buffers))
Telemetry.report_bitrate(buffers)
LegacyTelemetry.report_bitrate(buffers)
Telemetry.report_buffer(buffers)

Enum.each(buffers, fn
%Buffer{} -> :ok
Expand Down
11 changes: 5 additions & 6 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.Core.Element.BufferController do
use Bunch

alias Membrane.{Buffer, Pad}
alias Membrane.Core.{CallbackHandler, Telemetry}
alias Membrane.Core.CallbackHandler
alias Membrane.Core.Child.PadModel

alias Membrane.Core.Element.{
Expand All @@ -24,7 +24,8 @@ defmodule Membrane.Core.Element.BufferController do
alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry
require Membrane.Core.Telemetry, as: Telemetry
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry

@doc """
Handles incoming buffer: either stores it in InputQueue, or executes element's
Expand Down Expand Up @@ -105,16 +106,14 @@ defmodule Membrane.Core.Element.BufferController do
"""
@spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric(:buffer, 1, inspect(pad_ref))
LegacyTelemetry.report_bitrate(buffers)
Telemetry.report_buffer(buffers)

do_exec_buffer_callback(pad_ref, buffers, state)
end

def exec_buffer_callback(pad_ref, buffers, %State{type: type} = state)
when type in [:sink, :endpoint] do
Telemetry.report_metric(:buffer, length(List.wrap(buffers)))
Telemetry.report_bitrate(buffers)

do_exec_buffer_callback(pad_ref, buffers, state)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/element/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ defmodule Membrane.Core.Element.CallbackContext do
pads: state.pads_data,
clock: state.synchronization.clock,
parent_clock: state.synchronization.parent_clock,
module: state.module,
name: state.name,
playback: state.playback,
resource_guard: state.resource_guard,
setup_incomplete?: state.setup_incomplete?,
utility_supervisor: state.subprocess_supervisor
})
end
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Membrane.Core.Element.EventController do
def handle_event(pad_ref, event, state) do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
Telemetry.report_metric(:event, 1, inspect(pad_ref))
Telemetry.report_incoming_event(%{pad_ref: inspect(pad_ref)})

async? = Event.async?(event)

Expand Down
Loading
Loading