-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Telemetry (#905) #918
Telemetry (#905) #918
Conversation
I took a brief look at your changes and I commented things that caught my attention |
@FelonEkonom, could you provide an example of how the .config.exs file should look for the end user? This would help me a lot with separation of concerns.
Is this example requiring excessive detail from the end user? Or would you consider it an adequate level of granularity? |
@wende the example you proposed demands enough details from the user and in general it looks OK 👍 I would replace |
It could look like
or simply
|
lib/membrane/core/telemetry.ex
Outdated
def report_metric(metric_name, measurement, metadata \\ %{}) do | ||
if metric_measured?(metric_name) do | ||
:telemetry.execute([:membrane, :metric, metric_name], %{value: measurement}, metadata) | ||
else | ||
# A hack to suppress the 'unused variable' warnings | ||
quote do | ||
_fn = fn -> | ||
_unused = unquote(event_name) | ||
_unused = unquote(measurement) | ||
_unused = unquote(metadata) | ||
end | ||
|
||
:ok | ||
end | ||
measurement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is currently unused. There used to be some metrics measured by legacy code of this purpose, but since it wasn't tested anywhere I treated it as dead code and removed it
A list of metrics that you want to measure would be appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reimplemented metrics for: [link, buffer, bitrate, queue, stream_format,
event, store]
This is an MVP version of this PR. I'm going to need a list of desirable metrics to be measured, as well as some guidance on how to document |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't finished my review, but I am publishing some of the comments that I've made.
Further review soon 🔜
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a piece of documentation that describes
- how to enable metrics in config
- what events are emitted and when
- what is passed as events metadata
but you can wait with it until the code and API are more or less stable within this PR
Another thing is, please take a look at the CI: test_performance
job doesn't work at all for now, but test
and lint
should pass
Telemetry.report_span( | ||
state.__struct__, | ||
callback, | ||
fn -> | ||
apply(module, callback, args) | ||
|> Telemetry.state_result(internal_state, state) | ||
end | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the structure of the code here and in related functions (Telemetry.report_span/3
, Telemetry.state_result/3
) is overcomplicated and it is hard to understand, which function or lambda does and returns what. I recommend to avoid any type of circular dependencies like the one we have here between Telemetry
module and CallbackHandler
.
What about creating a macro that would be used like
callback_result =
Telemetry.with_callback_execution_span callback, internal_state, state, do
apply(module, callback, args)
end
Or eventually a function in Telemetry
module that will take all data necessary to 1. execute a callback and 2. pass proper metadata to :telemetry
event?
If you will have any question regarding writing such a macro - let me know
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of Telemetry.state_result
was exactly that to avoid circular dependency. I did incorrectly use it after all though.
Take a look at the most recent version. I don't believe there is a way to avoid circ dependency with a macro. Using this state_result/2
workaround while not ideal achieves better decoupling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not to mention there is little to no benefit of using a macro here, I don't think it's warranted but i'm open to discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The circular dependency could by avoided by writing function looking more or less like that
Telemetry.apply_within_span(
fn () -> apply(module, callback, args) end,
callback,
args,
internal_state,
state
)
and in Telemetry
module:
def apply_within_span(lambda, callback, callback_args, internal_state, state) do
report_span(
state.__struct__,
callback,
fn -> apply(lambda) |> state_result(internal_state, state) end
)
end
This same could be done with macro that could be used like in the example from my previous comment
defmacro with_callback_execution_span(callback, internal_state, state, do_block) do
quote do
if unquote(callback) |> tracked?(unquote(state).__struct__) do
unquote(__MODULE__).apply_within_span(
fn -> unquote(do_block) end,
unquote(callback),
unquote(args),
unquote(internal_state),
unquote(state)
)
else
unquote(do_block)
end
end
end
The advantage of macro is that the code will look similar to e.g. OpenTelemetry.Tracer.with_span "id", do .... end
and the usage of macro will be easier and more intuitive to read for somebody else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def apply_within_span(lambda, callback, callback_args, internal_state, state) do
report_span(
state.__struct__,
callback,
fn -> apply(lambda) |> state_result(internal_state, state) end
)
end
This solution has the problem that internal state passed this way would be an old internal state, instead of the new one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get the new one from the value returned from apply(lambda)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me it is overcomplicated and I still think that it is possible to write functions in Membrane.Core.Telemetry
in a way that they will be much more intuitive to read. We can do pair-coding session to do so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get the new one from the value returned from
apply(lambda)
But then we get a circular dependency of CallbackHandler using Telemetry and Telemetry module being specifically coded for CallbackHandler response of {actions, state}
@wende @FelonEkonom performance tests should be fixed, you just need to merge changes from current master branch for them to run properly |
It's all there in lib/membrane/telemetry.ex, but still requires some work. I don't want to document it mid-development especially when measured metrics are still unspecified
I am planning to restore these in accordance with new changes. Please keep in mind though that these were calls at the very heart of the core functionality while being 100% uncovered by tests. I want to make sure these are properly TDDed before they are reintroduced |
using :telemetry.span
lib/membrane/core/telemetry.ex
Outdated
quote do | ||
LegacyTelemetry.report_link(unquote(lazy_block)[:from], unquote(lazy_block)[:to]) | ||
end | ||
end | ||
|
||
defp do_legacy_telemetry(metric_name, lazy_block) do | ||
def do_legacy_telemetry(metric_name, lazy_block) do | ||
LegacyTelemetry.report_metric(metric_name, lazy_block) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can the first clause return quote
, while the second clause doesn't?
Telemetry.report_span( | ||
state.__struct__, | ||
callback, | ||
fn -> | ||
apply(module, callback, args) | ||
|> Telemetry.state_result(internal_state, state) | ||
end | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me it is overcomplicated and I still think that it is possible to write functions in Membrane.Core.Telemetry
in a way that they will be much more intuitive to read. We can do pair-coding session to do so
test/membrane/telemetry_test.exs
Outdated
%{component_path: [_, ^element_type]}}}, | ||
1000 | ||
|
||
assert results.monotonic_time < 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you check, if monotonic_time
is smaller than zero? Shouldn't it be a positive number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On systems I checked at the monotonic time was always negative. But I did some research and it appears it can be positive on some systems. I'll change it to just be different than 0 to make sure it doesn't fail randomly for some
lib/membrane/telemetry.ex
Outdated
* `[:membrane, :element | :bin | :pipeline, handler, :start | :stop | :exception]` - | ||
where handler is any possible handle_* function defined for a component. | ||
* `:start` - when the handler begins execution | ||
* `:stop` - when the handler finishes execution | ||
* `:exception` - when the handler crashes during execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still undone
lib/membrane/telemetry.ex
Outdated
@typedoc """ | ||
Value of the specific event gathered | ||
""" | ||
@type event_value :: %{ | ||
value: | ||
buffer_event_value() | ||
| queue_len_event_value() | ||
| stream_format_event_value() | ||
| incoming_event_value() | ||
| store_event_value() | ||
| integer() | ||
} | ||
|
||
@typedoc """ | ||
Metadata included with each telemetry event | ||
""" | ||
@type event_metadata :: %{ | ||
event: event_type(), | ||
component_path: ComponentPath.path(), | ||
component_metadata: any() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for legacy events, right?
@@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.StreamFormatController do | |||
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), | |||
playback: %State{playback: :playing} <- state do | |||
%{direction: :input} = data | |||
Telemetry.report_stream_format(stream_format, %{pad_ref: inspect(pad_ref)}) | |||
Telemetry.report_stream_format(stream_format, inspect(pad_ref)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be in legacy telemetry?
lib/membrane/core/telemetry.ex
Outdated
@type telemetry_callback_metadata() :: %{ | ||
args: list(any()), | ||
module: module(), | ||
internal_state_before: any(), | ||
internal_state_after: any(), | ||
component_context: Membrane.Telemetry.component_context() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add module
, component_path and setup_incomplete?
(the last one is the value stored in state) to Membrane.Element/Bin/Pipeline.CallbackContext.t()
and to values returned from functions defined in Membrane.Core.Element/Bin/Pipeline.CallbackContext
Since module
will be inside callback_context, there will be no need to have it explicitly in telemetry_callback_metadata
lib/membrane/telemetry.ex
Outdated
@type component_context :: | ||
Element.CallbackContext.t() | Bin.CallbackContext.t() | Pipeline.CallbackContext.t() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename it to callback_context
* `:bitrate` - total number of bits carried by the buffers mentioned above | ||
* `:queue_len` - number of messages in element's message queue during GenServer's `handle_info` invocation | ||
* `:stream_format` - indicates that given element has received new stream format, value always equals '1' | ||
* `:event` - indicates that given element has received a new event, value always equals '1' | ||
* `:store` - reports the current size of a input buffer when storing a new buffer | ||
* `:take_and_demand` - reports the current size of a input buffer when taking buffers and making a new demand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a mention, that events
will be removed in v2.0
lib/membrane/telemetry.ex
Outdated
@type event_value :: %{ | ||
value: | ||
buffer_event_value() | ||
| queue_len_event_value() | ||
| stream_format_event_value() | ||
| incoming_event_value() | ||
| store_event_value() | ||
| integer() | ||
} | ||
|
||
@typedoc """ | ||
Metadata included with each telemetry event | ||
""" | ||
@type event_metadata :: %{ | ||
event: event_type(), | ||
component_path: ComponentPath.path(), | ||
component_metadata: any() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any idea what prefix could be added to names of these two types, to avoid confusion if it has anything to do with newly added callback events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wende are you planning on making any major changes to the documentation?
%{ | ||
callback: callback, | ||
callback_args: args, | ||
callback_context: context, | ||
component_path: ComponentPath.get(), | ||
component_type: state.module, | ||
internal_state_before: state.internal_state, | ||
internal_state_after: nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move creating this map into Telemetry.span_component_callback/4
lib/membrane/telemetry.ex
Outdated
@type callback_span_metadata :: %{ | ||
callback: atom(), | ||
callback_args: [any()], | ||
callback_context: callback_context(), | ||
component_path: ComponentPath.path(), | ||
component_type: component_type(), | ||
internal_state_before: Element.state() | Bin.state() | Pipeline.state(), | ||
internal_state_after: Element.state() | Bin.state() | Pipeline.state() | ||
internal_state_after: Element.state() | Bin.state() | Pipeline.state() | nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename fields internal_state_before
and internal_state_after
to state_before_callback
and state_after_callback
.
The name internal_state
occurs only inside membrane_core
private modules and nowhere beyond them. In every other place it is called just state
config/config.exs
Outdated
# config to test legacy version of telemetry | ||
# config :membrane_core, :telemetry_flags, [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove the legacy config
lib/membrane/core/telemetry.ex
Outdated
atom(), | ||
Telemetry.callback_span_metadata() | ||
) :: CallbackHandler.callback_return() | no_return() | ||
def span_component_callback(f, component_module, callback, meta) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def span_component_callback(f, component_module, callback, meta) do | |
def apply_callback_within_span(callback_lambda, component_module, callback, meta) do |
IMO name of this function is little weird
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It uses span as a verb, but I'll find a better name for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of "track_callback_handler" it has less ambiguous verb, it still specifies it's for a callback and also adds handler to be more descriptive and consistent with "callback_tracked?/1" function name
Co-authored-by: Feliks Pobiedziński <38541925+FelonEkonom@users.noreply.github.com>
Not unless they're needed. I want to have this merged as soon as possible and the documentation can always be improved upon without breaking changes |
bin: :all, | ||
pipeline: :all | ||
], | ||
datapoints: :all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, this enables the old, deprecated metrics. Why do we add a new config key for a deprecated feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not for deprecated metrics. It's for those that were renewed. So:
link, buffer, queue_len, stream_format, event, store and take (no bitrate, demand, init or terminate)
And it's only for those to be tested
end | ||
|> report_telemetry(callback, args, state, context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API of Telemetry
module should allow on sth like
|> report_telemetry(callback, args, state, context) | |
|> Telemetry.track_callback_handler(callback, args, context, state) |
and the creation of event metadata should be fully hidden inside Telemetry
module
defp report_telemetry(f, callback, args, state, context) do | ||
Telemetry.track_callback_handler( | ||
f, | ||
state.__struct__, | ||
callback, | ||
Telemetry.callback_meta( | ||
state, | ||
callback, | ||
args, | ||
context | ||
) | ||
) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be necessary, take a look at the comment to the code above
defp report_telemetry(f, callback, args, state, context) do | |
Telemetry.track_callback_handler( | |
f, | |
state.__struct__, | |
callback, | |
Telemetry.callback_meta( | |
state, | |
callback, | |
args, | |
context | |
) | |
) | |
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix these two things and it should be fine
* `:bitrate` - total number of bits carried by the buffers mentioned above | ||
* `:queue_len` - number of messages in element's message queue during GenServer's `handle_info` invocation | ||
* `:stream_format` - indicates that given element has received new stream format, value always equals '1' | ||
* `:event` - indicates that given element has received a new event, value always equals '1' | ||
* `:store` - reports the current size of a input buffer when storing a new buffer | ||
* `:take_and_demand` - reports the current size of a input buffer when taking buffers and making a new demand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a mention, that events will be removed in v2.0
Undone
Co-authored-by: Feliks Pobiedziński <38541925+FelonEkonom@users.noreply.github.com>
Relates to #905