-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Telemetry (#905) #918
Telemetry (#905) #918
Changes from 37 commits
4fcfcb2
328d30a
e599fcb
e60b127
42f099e
56890e0
3cb267e
7a69d1e
a77d6c0
3861859
66eea5a
0e95d43
f6b5ce1
db19995
a442758
f970d20
2b1a474
b00b8c9
e508240
9c65c71
d53708b
ec80e2a
f910961
083f65e
95bdab1
c905ec5
b1a9627
8a01d9b
fe9e359
6c7b091
4bc4c25
d7f083a
66cdefd
24f6cb6
7041bf9
72ca1be
fe86693
17dfe27
cf8efa1
fe9d0eb
70cc188
e846d7a
9583a5d
9894437
94ba04a
dc707ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,4 +2,36 @@ import Config | |
|
||
if config_env() == :test do | ||
config :junit_formatter, include_filename?: true | ||
|
||
config :membrane_core, :telemetry_flags, | ||
tracked_callbacks: [ | ||
element: [ | ||
:handle_init, | ||
:handle_playing, | ||
:handle_setup, | ||
:handle_terminate_request, | ||
:handle_parent_notification | ||
], | ||
bin: :all, | ||
pipeline: :all | ||
], | ||
datapoints: :all | ||
|
||
# config to test legacy version of telemetry | ||
# config :membrane_core, :telemetry_flags, [ | ||
# :report_init, | ||
# :report_terminate, | ||
# :report_buffer, | ||
# :report_queue, | ||
# :report_link, | ||
# metrics: [ | ||
# :buffer, | ||
# :bitrate, | ||
# :queue_len, | ||
# :stream_format, | ||
# :event, | ||
# :store, | ||
# :take_and_demand | ||
# ] | ||
# ] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remember to delete it before merging PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I want to leave it in some documentation since it makes it much easier to manually test behaviour on legacy settings. Any idea where I could put it? |
||
end |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -8,8 +8,10 @@ defmodule Membrane.Core.CallbackHandler do | |||||
use Bunch | ||||||
|
||||||
alias Membrane.CallbackError | ||||||
alias Membrane.ComponentPath | ||||||
|
||||||
require Membrane.Logger | ||||||
require Membrane.Core.Telemetry, as: Telemetry | ||||||
|
||||||
@type state :: %{ | ||||||
:module => module, | ||||||
|
@@ -125,45 +127,67 @@ defmodule Membrane.Core.CallbackHandler do | |||||
end | ||||||
|
||||||
@spec exec_callback(callback :: atom, args :: list, handler_params, state) :: | ||||||
{list, internal_state} | ||||||
{list, internal_state} | no_return() | ||||||
defp exec_callback( | ||||||
callback, | ||||||
args, | ||||||
%{context: context_fun}, | ||||||
%{module: module, internal_state: internal_state} = state | ||||||
) do | ||||||
args = args ++ [context_fun.(state), internal_state] | ||||||
context = context_fun.(state) | ||||||
args = args ++ [context, internal_state] | ||||||
|
||||||
callback_result = | ||||||
try do | ||||||
try do | ||||||
fn -> | ||||||
apply(module, callback, args) | ||||||
rescue | ||||||
e in UndefinedFunctionError -> | ||||||
_ignored = | ||||||
with %{module: ^module, function: ^callback, arity: arity} <- e do | ||||||
reraise CallbackError, | ||||||
[ | ||||||
kind: :not_implemented, | ||||||
callback: {module, callback}, | ||||||
arity: arity, | ||||||
args: args | ||||||
], | ||||||
__STACKTRACE__ | ||||||
end | ||||||
|
||||||
reraise e, __STACKTRACE__ | ||||||
|> validate_callback_result!(module, callback) | ||||||
end | ||||||
|> report_telemetry(callback, args, state, context) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The API of
Suggested change
and the creation of event metadata should be fully hidden inside |
||||||
rescue | ||||||
e in UndefinedFunctionError -> | ||||||
_ignored = | ||||||
with %{module: ^module, function: ^callback, arity: arity} <- e do | ||||||
reraise CallbackError, | ||||||
[ | ||||||
kind: :not_implemented, | ||||||
callback: {module, callback}, | ||||||
arity: arity, | ||||||
args: args | ||||||
], | ||||||
__STACKTRACE__ | ||||||
end | ||||||
|
||||||
reraise e, __STACKTRACE__ | ||||||
end | ||||||
end | ||||||
|
||||||
case callback_result do | ||||||
{actions, _state} when is_list(actions) -> | ||||||
callback_result | ||||||
defp validate_callback_result!({actions, _state} = callback_result, _module, _cb) | ||||||
when is_list(actions) do | ||||||
callback_result | ||||||
end | ||||||
|
||||||
_result -> | ||||||
raise CallbackError, | ||||||
kind: :bad_return, | ||||||
callback: {module, callback}, | ||||||
value: callback_result | ||||||
end | ||||||
defp validate_callback_result!(callback_result, module, callback) do | ||||||
raise CallbackError, | ||||||
kind: :bad_return, | ||||||
callback: {module, callback}, | ||||||
value: callback_result | ||||||
end | ||||||
|
||||||
defp report_telemetry(f, callback, args, state, context) do | ||||||
Telemetry.span_component_callback( | ||||||
f, | ||||||
state.__struct__, | ||||||
callback, | ||||||
%{ | ||||||
callback: callback, | ||||||
callback_args: args, | ||||||
callback_context: context, | ||||||
component_path: ComponentPath.get(), | ||||||
component_type: state.module, | ||||||
internal_state_before: state.internal_state, | ||||||
internal_state_after: nil | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move creating this map into |
||||||
) | ||||||
end | ||||||
|
||||||
@spec handle_callback_result( | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,15 +40,16 @@ defmodule Membrane.Core.Element do | |
require Membrane.Core.Message, as: Message | ||
require Membrane.Core.Stalker, as: Stalker | ||
require Membrane.Core.Telemetry, as: Telemetry | ||
require Membrane.Core.LegacyTelemetry, as: LegacyTelemetry | ||
require Membrane.Logger | ||
|
||
@type options :: %{ | ||
module: module, | ||
module: module(), | ||
name: Membrane.Element.name(), | ||
node: node | nil, | ||
node: node() | nil, | ||
user_options: Membrane.Element.options(), | ||
sync: Sync.t(), | ||
parent: pid, | ||
parent: pid(), | ||
wende marked this conversation as resolved.
Show resolved
Hide resolved
|
||
parent_clock: Clock.t(), | ||
parent_path: Membrane.ComponentPath.path(), | ||
log_metadata: Logger.metadata(), | ||
|
@@ -118,9 +119,8 @@ defmodule Membrane.Core.Element do | |
{:ok, resource_guard} = | ||
SubprocessSupervisor.start_utility(options.subprocess_supervisor, {ResourceGuard, self()}) | ||
|
||
Telemetry.report_init(:element) | ||
|
||
ResourceGuard.register(resource_guard, fn -> Telemetry.report_terminate(:element) end) | ||
LegacyTelemetry.report_init(:element) | ||
ResourceGuard.register(resource_guard, fn -> LegacyTelemetry.report_terminate(:element) end) | ||
|
||
self_pid = self() | ||
|
||
|
@@ -206,10 +206,7 @@ defmodule Membrane.Core.Element do | |
@impl GenServer | ||
def handle_info(message, state) do | ||
Utils.log_on_error do | ||
Telemetry.report_metric( | ||
:queue_len, | ||
:erlang.process_info(self(), :message_queue_len) |> elem(1) | ||
) | ||
Telemetry.report_queue_len(self()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't it be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. relates to #918 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All calls that were removed go directly to LegacyTelemetry so that they can be easily identified to be removed later on. Those that are both in the old and the new one go to Telemetry that redirects the calls based on the config having old or new format. When the legacy telemetry is removed this will allow it only to delete lines with LegacyTelemetry as well as the small block of redirection in new Telemetry |
||
|
||
do_handle_info(message, state) | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, this enables the old, deprecated metrics. Why do we add a new config key for a deprecated feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not for deprecated metrics. It's for those that were renewed. So:
link, buffer, queue_len, stream_format, event, store and take (no bitrate, demand, init or terminate)
And it's only for those to be tested