Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry (#905) #918

Merged
merged 46 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4fcfcb2
Telemetry refresh and element/pipeline telemetry testing for init and…
wende Dec 5, 2024
328d30a
Tests async false. Telemetry captured other tests' processes
wende Dec 5, 2024
e599fcb
Unnecessary code trim
wende Dec 5, 2024
e60b127
Telemetry spans for init and setup
wende Dec 6, 2024
42f099e
Telemetry for spans, metrics and events for elements and pipelines
wende Dec 12, 2024
56890e0
Report playing and links
wende Dec 16, 2024
3cb267e
CQ and cleanup
wende Dec 16, 2024
7a69d1e
Cleanup of old telemetry and more universal method of measuring spans…
wende Jan 7, 2025
a77d6c0
Tests for handlers throwing exceptions and proper santization of comp…
wende Jan 9, 2025
3861859
Proper naming, dead code cleanup, no unnecessary optimization, crash …
wende Jan 13, 2025
66eea5a
CQ
wende Jan 16, 2025
0e95d43
Comments by @FelonEkonom adressed
wende Jan 20, 2025
f6b5ce1
Comments by @FelonEkonom adressed
wende Jan 24, 2025
db19995
Old metrics revamped and adjusted to new telemetry implementation
wende Jan 24, 2025
a442758
Small changes to Telemetry.report_buffer/2
wende Jan 27, 2025
f970d20
Legacy telemetry revived and rerouted based on config usage
wende Jan 28, 2025
2b1a474
Tests and documentation for measured events and their metadata
wende Jan 28, 2025
b00b8c9
Final touches
wende Jan 28, 2025
e508240
Update lib/membrane/telemetry.ex
wende Jan 30, 2025
9c65c71
Update lib/membrane/telemetry.ex
wende Jan 30, 2025
d53708b
Update lib/membrane/telemetry.ex
wende Jan 30, 2025
ec80e2a
PR and CI Comments addressed
wende Jan 30, 2025
f910961
Credo changes
wende Jan 30, 2025
083f65e
Unnecessary element and bin state fields removed
wende Jan 30, 2025
95bdab1
Credo satisfied
wende Jan 30, 2025
c905ec5
Changed component context and added tests reflecting the changes
wende Jan 30, 2025
b1a9627
LegacyTelemetry sometimes didn't work properly
wende Jan 30, 2025
8a01d9b
Dialyzer fix
wende Jan 30, 2025
fe9e359
Dialyzer satisfied
wende Jan 31, 2025
6c7b091
Appeasing the dialyzer
wende Jan 31, 2025
4bc4c25
Credo
wende Jan 31, 2025
d7f083a
Context instead of state
wende Jan 31, 2025
66cdefd
Simpler(?) telemetry private API
wende Feb 3, 2025
24f6cb6
Documentation of metadata passed to and from telemetry modules
wende Feb 3, 2025
7041bf9
Satisfy FilterAggregator.Context
wende Feb 3, 2025
72ca1be
Documentation of metadata passed to and from telemetry modules
wende Feb 3, 2025
fe86693
Event -> Datapoint
wende Feb 3, 2025
17dfe27
Update lib/membrane/telemetry.ex
wende Feb 4, 2025
cf8efa1
Comments addressed
wende Feb 4, 2025
fe9d0eb
Track callback handler name
wende Feb 4, 2025
70cc188
Config cleanup
wende Feb 4, 2025
e846d7a
Faulty f+r
wende Feb 5, 2025
9583a5d
Better public API
wende Feb 6, 2025
9894437
Update lib/membrane/telemetry.ex
wende Feb 6, 2025
94ba04a
Documentation changes
wende Feb 6, 2025
dc707ac
faulty merge
wende Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904)
* Implememnt diamonds detection. [#909](https://github.com/membraneframework/membrane_core/pull/909)
* Incorporate `Membrane.Funnel`, `Membrane.Tee` and `Membane.Fake.Sink`. [#922](https://github.com/membraneframework/membrane_core/issues/922)
* Added new, revised Telemetry system [#905](https://github.com/membraneframework/membrane_core/pull/918)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
11 changes: 9 additions & 2 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ defmodule Membrane.Core.CallbackHandler do
%{context: context_fun},
%{module: module, internal_state: internal_state} = state
) do
args = args ++ [context_fun.(state), internal_state]
context = context_fun.(state)
args = args ++ [context, internal_state]

callback_result =
try do
Expand All @@ -143,7 +144,13 @@ defmodule Membrane.Core.CallbackHandler do
fn ->
res = {_actions, new_internal_state} = apply(module, callback, args)

Telemetry.state_result(res, args, internal_state, new_internal_state, state)
Telemetry.state_result(res, %{
args: args,
module: module,
internal_state_before: internal_state,
internal_state_after: new_internal_state,
component_context: context
})
end
)
rescue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do

alias Membrane.Buffer
alias Membrane.Core.Element.AtomicDemand
alias Membrane.Core.Telemetry
alias Membrane.Event
alias Membrane.Pad
alias Membrane.StreamFormat
alias Membrane.Core.Telemetry

require Membrane.Core.Stalker, as: Stalker
require Membrane.Logger
Expand Down Expand Up @@ -174,6 +174,7 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do
input_queue = maybe_increase_atomic_demand(input_queue)

%{size: size, stalker_metrics: stalker_metrics} = input_queue
Telemetry.report_take(size, input_queue.log_tag)
:atomics.put(stalker_metrics.size, 1, size)

{out, input_queue}
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/stream_format_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.StreamFormatController do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
%{direction: :input} = data
Telemetry.report_stream_format(stream_format, %{pad_ref: inspect(pad_ref)})
Telemetry.report_stream_format(stream_format, inspect(pad_ref))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be in legacy telemetry?


queue = data.input_queue

Expand Down
22 changes: 14 additions & 8 deletions lib/membrane/core/legacy_telemetry.ex
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is almost entirely identical to what it was before this PR. Small adjustments were made to make it more resilient, discontinued telemetry events are rerouted to this module. Events that appear in both implementations are routed based on telemetry config indicating legacy format or the new one.

Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
defmodule Membrane.Core.LegacyTelemetry do
@moduledoc false

# This module is deprecated and will be removed in the next major release (2.0)
# All contents below starting from the next comment are from Legacy Telemetry system
# to retain compatibility with the old telemetry system in case of outdated configuration.

# This module provides a way to gather events from running Membrane components, as well
# as exposing these events in a format idiomatic to [Telemetry](https://hexdocs.pm/telemetry/)
# library. It uses compile time flags from `config.exs` to determine which events should be
Expand All @@ -14,7 +19,7 @@ defmodule Membrane.Core.LegacyTelemetry do
@doc """
Reports metrics such as input buffer's size inside functions, incoming events and received stream format.
"""
def report_metric(metric, value, log_tag \\ nil) do
defmacro report_metric(metric, value, log_tag \\ nil) do
event =
quote do
[:membrane, :metric, :value]
Expand All @@ -40,7 +45,7 @@ defmodule Membrane.Core.LegacyTelemetry do
Given list of buffers (or a single buffer) calculates total size of their payloads in bits
and reports it.
"""
def report_bitrate(buffers) do
defmacro report_bitrate(buffers) do
event =
quote do
[:membrane, :metric, :value]
Expand All @@ -56,7 +61,7 @@ defmodule Membrane.Core.LegacyTelemetry do
Enum.reduce(
List.wrap(unquote(buffers)),
0,
&(Membrane.Core.LegacyTelemetry.get_payload_size(&1.payload) + &2)
&(unquote(__MODULE__).get_payload_size(&1.payload) + &2)
)
}
end
Expand All @@ -68,6 +73,7 @@ defmodule Membrane.Core.LegacyTelemetry do
)
end

@spec get_payload_size(any()) :: non_neg_integer()
def get_payload_size(payload) when is_bitstring(payload) do
Membrane.Payload.size(payload)
end
Expand All @@ -87,7 +93,7 @@ defmodule Membrane.Core.LegacyTelemetry do
@doc """
Reports new link connection being initialized in pipeline.
"""
def report_link(from, to) do
defmacro report_link(from, to) do
event =
quote do
[:membrane, :link, :new]
Expand All @@ -100,10 +106,10 @@ defmodule Membrane.Core.LegacyTelemetry do
from: inspect(unquote(from).child),
to: inspect(unquote(to).child),
pad_from:
Membrane.Core.LegacyTelemetry.__get_public_pad_name__(unquote(from).pad_ref)
unquote(__MODULE__).__get_public_pad_name__(unquote(from).pad_ref)
|> inspect(),
pad_to:
Membrane.Core.LegacyTelemetry.__get_public_pad_name__(unquote(to).pad_ref)
unquote(__MODULE__).__get_public_pad_name__(unquote(to).pad_ref)
|> inspect()
}
end
Expand All @@ -114,7 +120,7 @@ defmodule Membrane.Core.LegacyTelemetry do
@doc """
Reports a pipeline/bin/element initialization.
"""
def report_init(type) when type in [:pipeline, :bin, :element] do
defmacro report_init(type) when type in [:pipeline, :bin, :element] do
event =
quote do
[:membrane, unquote(type), :init]
Expand All @@ -141,7 +147,7 @@ defmodule Membrane.Core.LegacyTelemetry do
@doc """
Reports a pipeline/bin/element termination.
"""
def report_terminate(type) when type in [:pipeline, :bin, :element] do
defmacro report_terminate(type) when type in [:pipeline, :bin, :element] do
event =
quote do
[:membrane, unquote(type), :terminate]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do

alias Membrane.Child
alias Membrane.Core.{Bin, Message, Parent}
alias Membrane.Core.Telemetry
alias Membrane.Core.Bin.PadController
alias Membrane.Core.Telemetry

alias Membrane.Core.Parent.{
ChildLifeController,
Expand Down
Loading
Loading