-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add per-input metrics to libbeat pipeline client #42618
base: main
Are you sure you want to change the base?
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
f6ba6dc
to
75929e5
Compare
3f24545
to
4c32ff3
Compare
4358017
to
f48ee59
Compare
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
@andrewkroh I made the changes you requested. Now the v2.Context carries a registry for the input metrics. Which is passed to the pipeline client. So if the input register its metrics with inputmon package, then the new metrics get published. @faec, @belimawr as I ended up changing the approach, could you review it again? |
de923c7
to
8bb9bf1
Compare
/test |
@@ -1977,7 +1986,7 @@ func TestInput(t *testing.T) { | |||
t.Errorf("unexpected number of cursors events: got:%d want at least:%d", len(client.cursors), len(test.wantCursor)) | |||
test.wantCursor = test.wantCursor[:len(client.published)] | |||
} | |||
client.published = client.published[:len(test.want)] | |||
client.cursors = client.cursors[:len(test.wantCursor)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked to the team responsible for this input and they agreed this fix is correct.
CHANGELOG.next.asciidoc
Outdated
@@ -67,6 +67,7 @@ Dropped fields: `syslog.priority` and `syslog.facility` while keeping their dupl | |||
already present as `container.id` and `container.log.tag` is dropped because it is already present as | |||
`log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`, | |||
otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] | |||
- Add per-input metrics to filestream, cel and httpjson inputs {pull}42618[42618] {issue}42761[42761] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filestream
, cel
and httpjson
are the ones I added test for. Any input v2 which exposes input metrics should have the new metrics. I'm checking if I can list them all without having to go checking the code for each one of the inputs.
CHANGELOG.next.asciidoc
Outdated
@@ -67,6 +67,7 @@ Dropped fields: `syslog.priority` and `syslog.facility` while keeping their dupl | |||
already present as `container.id` and `container.log.tag` is dropped because it is already present as | |||
`log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`, | |||
otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] | |||
- Add per-input pipeline metrics to filestream, cel and httpjson inputs. {pull}42618[42618] {issue}42761[42761] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, all v2 inputs which publish metrics will have the pre-input pipeline metrics. Those 3 are the ones I added integration tests for.
I couldn't test every single one, so I'm wondering what is the best to add here.
Looking at the docs for each v1 input the only one which has a Metrics section is the gcppubsub. So apart from having a test for every affected input, we could say all inputs exposing metrics on the HTTP monitoring endpoint have the new metrics, with only one exception: gcppubsub
.
Here is what I came up with looking at the code:
v2 inputs ( with the pipeline metrics per input)
- OSS:
filestream
journald
kafka
tcp
udp
unix
winlog
- x-pack:
awscloudwatch
awss3
azureblobstorage
azureeventhub
benchmark
cel
cloudfoundry
entityanalytics
etw
gcs
http_endpoint
httpjson
lumberjack
netflow
o365audit
salesforce
streaming
unifiedlogs
v1 (without the new metrics)
- OSS:
container
log
mqtt
redis
stdin
syslog
- x-pack:
cometd
gcppubsub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log input is a V1 input, see how it is registered:
beats/filebeat/input/log/input.go
Line 64 in a964b89
err := input.Register("log", NewInput) |
Only the inputs that import/use "github.com/elastic/beats/v7/filebeat/input/v2"
are V2 inputs.
For the release note, I believe the best is to list all V2 inputs that have the metrics.
Because the inputs are so different, I'm fine with only adding tests for the main ones and relying on the current tests to ensure everything is still working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, something went wrong in the copy and paste. I fixed it now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An option when documenting all v2 inputs have the new metrics would be to add it to the Configure an HTTP endpoint for metrics > Inputs
instead of adding to each ones metrics section of the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of adding the list to the HTTP metrics docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe mention it in the release notes instead of listing the inputs there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nimarezainia @flexitrev any thoughts on the best way to make those new metrics public?
Right now I'm leaning towards adding a description of the pipeline metrics per input to to the Configure an HTTP endpoint for metrics > Inputs in a generic way, stating not all inputs have them. Then later we might go input by input checking how much value they bring and documenting as we/the teams owning the inputs seeing fit. Ideally there would be a test for each input which add those metrics to their documentation.
Also they might not bring immediate value for all inputs. For example, the awss3 dashboard does not have a direct mention to event. So I believe for these new metrics to be valuable it needs some explanation of how the current metrics relate to events. Again, that's why I think a generic explanation on the docs of the input metrics might be good for now and then each team decides how to handle it for their inputs.
cc: @pierrehilbert, @cmacknz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to make the metrics public? Remember if we make them part of our public API as GA we can't break or deprecate them easily. Are you 100% these metrics are perfect and you won't want to stop publishing some of them right now? Are they better made available for internal use initially?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to make the metrics public? Remember if we make them part of our public API as GA we can't break or deprecate them easily.
That's why I'm asking :)
Are they better made available for internal use initially?
I'm happy with that. Not documenting them for now and see how we use them and just later on making them public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to make the metrics public? Remember if we make them part of our public API as GA we can't break or deprecate them easily.
That's why I'm asking :)
Are they better made available for internal use initially?
I'm happy with that. Not documenting them for now and see how we use them and just later on making them public.
@@ -187,6 +187,10 @@ observe the activity of the input. Note that metrics from processors are not inc | |||
| `messages_truncated_total` | Total number of messages truncated. | |||
| `bytes_processed_total` | Total number of bytes processed. | |||
| `events_processed_total` | Total number of events processed. | |||
| `events_pipeline_total` | Total number of events processed by the publishing pipeline. | |||
| `events_pipeline_failed_total` | Total number of events failed at the publishing pipeline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't block on this but I'd push back a little on including the "failed" number (here and throughout). It will always be zero and it will only confuse people who think it represents something it doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep parity with the pipeline metrics. And later better explain the pipeline metrics.
Let's see how we resolve https://github.com/elastic/beats/pull/42618/files#r1980164988. The we change here or omit it all together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I'd rather not aim for parity for it's own sake. "failed" is already a broken metric that we'd prefer to remove, and I think adding more copies of it would make that worse and make misunderstandings more common.
filebeat/input/v2/input.go
Outdated
// metrics. | ||
// Consequently, inputs are not required to immediately adopt the new | ||
// registry for their metrics. | ||
MetricsRegistry *monitoring.Registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand the contract with the inputs here... can inputs create their own type-specific metrics by adding things to this registry? Is it supposed to be used at all other than that? Why does it include a cancel function, if the pipeline is able to cancel the input-specific registry automatically when the input shuts down? Right now I don't see a reason for these to be provided through the Context, but if it's important then the comments should document not just the high-level data flow but also how input code is expected to use these fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand the contract with the inputs here... can inputs create their own type-specific metrics by adding things to this registry?
yes.
It's because having the registry passed do the pipeline client created a chicken-egg problem. The pipeline client is created before the inputs create their metric registry. Also it's the case 100% of the time, filestream is an example of the exception. In addition to that, depending on the input, the name of the input they use for the metrics is different than their actual name. The httpjson-stateless
used only httpjson
in the metrics. Another input, which I don't recall right now, do the opposite, adds a suffix to its name then setting the input type in the metrics.
Given that, moving the call the inputs do to inputmon.NewInputRegistry
to create the registry to when the context is created won't work. Or at least would still require the input to later adjust the registry, like fixing the input type. Also, creating the registry by calling inputmon.NewInputRegistry
during the v2.Context creation would make the metrics to be published, even if the input does not registers any metrics. So to let the input in full control of publishing or not metrics, the input still needs to call inputmon.NewInputRegistry
which will add the id and input type to the registry and those being present are required for the metrics in the registry to be published.
So this is the registry the pipeline client and later the ES output will use to add the per-input metrics. If the input has an ID, but does not register metrics, the pipeline and output per-input metrics won't be published.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think from this comment that I'm misunderstanding some of the problem specification / desired behavior. I think I have some suggestions to simplify this approach, but I want to make sure they're relevant... let's catch up offline tomorrow to make sure we're both talking about the same things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I hijack this call? I'm also confused by this whole process.
libbeat/beat/pipeline.go
Outdated
@@ -45,6 +46,11 @@ type Client interface { | |||
// ClientConfig defines common configuration options one can pass to | |||
// Pipeline.ConnectWith to control the clients behavior and provide ACK support. | |||
type ClientConfig struct { | |||
// InputMetricsRegistry is the metrics registry used to aggregate pipeline metrics | |||
// per input. | |||
InputMetricsRegistry *monitoring.Registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the registry and cancellation function here confuses me, too... why would an input need to provide a different registry than the one already associated with it by the pipeline? Why is a cancellation function included when cancellation is linked to input shutdown, not to closing a particular client (since inputs can use multiple clients)? Aren't these fields shared by all clients for a particular input anyway? It seems to me like the pipeline should be handling all this bookkeeping unless there is a specific need for inputs to be able to customize / override the baseline behavior. (But same as previous, if this is necessary then the semantics should be documented here.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancellation is much more a precaution if the input for some reason does not registers metrics and therefore doe not cancel/unregister them. I didn't want to assume every input will register metrics and therefore, during its shutdown, cancel/unregister them.
why would an input need to provide a different registry than the one already associated with it by the pipeline?
It shouldn't. It could only happen if for some odd reason the ID it uses to publish its metrics is different from the v2.Context.ID. In that case the pipeline metrics would not be published.
[...] inputs can use multiple clients [...]
Do you know any input using multiple clients? The only example I saw, but analysing only the cel
, filestream
and httpjson
inputs is the cel
which creates derived context, so for each context there is a new pipeline client, and therefore there is still the 1-1 relationship between the context and the pipeline client.
There is my original idea of providing a different namespace and metrics registry for the pipeline client and then when publishing the metrics merging them with the input specific metrics. This lets each component to control the life-cycle of its metrics and when both are present, they're merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancellation is much more a precaution if the input for some reason does not registers metrics and therefore doe not cancel/unregister them.
But then I'm still missing something... it looked like the pipeline itself could still cancel the registry when the input shuts down, is that true? If so then I don't see why it matters if the input itself does it, whether it registered its own metrics or not.
It shouldn't. It could only happen if for some odd reason the ID it uses to publish its metrics is different from the v2.Context.ID. In that case the pipeline metrics would not be published.
Then why is the input required to pass the registry through at all? Can't that be handled by the pipeline, since it already knows the registry that will be used?
Do you know any input using multiple clients?
awss3
for one, and I believe there are many others. It can be important e.g. for ACK tracking when the input has multiple independent workers.
// The returned cancel function *must* be called when the input stops to | ||
// unregister the metrics and prevent resource leaks. | ||
// | ||
// This function might panic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't Panic! - The Hitchhiker's Guide to the Galaxy
Jokes aside, why the risk of panic now? I recall some issues in the past with duplicated IDs causing panics, but as far as I can tell, they were fixed.
Has this risk increased?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, and if there is an unavoidable panic path, the conditions that could lead to it should be detailed so that it can be avoided. See documentation in reflect for this kind of thing, e.g. "If CanAddr returns false, calling Value.Addr will panic."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove it, but this is the original behaviour, I just kept it as it was
defer func() { | ||
if r := recover(); r != nil { | ||
panic(fmt.Errorf("inoutmon.NewInputRegistry panic: %+v", r)) | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very confused here... You're recovering from the panic just to panic again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the original behaviour, I just added more context.
filebeat/input/v2/compat/compat.go
Outdated
id string | ||
// generatedID is true when the config does not include an ID and therefore | ||
// the runner uses a hash from the config as ID. | ||
generatedID bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused here: why is it need to have a flag indicating the ID was auto-generated rather than user-inputed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it would be good to add why it is needed in the comments here.
filebeat/input/v2/compat/compat.go
Outdated
} | ||
} else { | ||
// when the input has no ID, no metric is published, therefore, we | ||
// use a 'discard' registry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does a 'discard' registry mean here? It it actually a noop registry or just a registry that collects metrics and they're never reported?
filebeat/input/v2/input.go
Outdated
// The inputs still need to call inputmon.NewInputRegistry to further | ||
// populate the registry by adding `id` and `input` (the input type) so the | ||
// metrics will be published on the `/inputs/` monitoring endpoint. | ||
// This allows the inputs to decide to publish or not metrics, regardless of | ||
// the input having an ID. A valid metrics registry which will be passed | ||
// down to the pipeline client and output. Hoerver if the input does not | ||
// call inputmon.NewInputRegistry for the same ID to further populate the | ||
// Also, calling inputmon.NewInputRegistry allows inputs to set their type, | ||
// the `input` field, as they will. Not all inputs use their hard-coded | ||
// name as their type, thus the input itself needs to set it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems some of the lines are out of place, see L105-106:
// call inputmon.NewInputRegistry for the same ID to further populate the
// Also, calling inputmon.NewInputRegistry allows inputs to set their type,
Could you review/clarify this comment?
filebeat/input/v2/input.go
Outdated
// metrics. | ||
// Consequently, inputs are not required to immediately adopt the new | ||
// registry for their metrics. | ||
MetricsRegistry *monitoring.Registry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I hijack this call? I'm also confused by this whole process.
fmt.Sprintf("End of file reached: %s; Backoff now.", logFilePath), | ||
10*time.Second, "Filebeat did not close the file") | ||
|
||
// 5. Now that the reader has been closed, we can make the assertions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a detail, but End of file reached
does not mean the reader has been closed, it only means the file was read until the end.
The reader closed log:
beats/filebeat/input/filestream/input.go
Line 368 in 1db0444
log.Infof("Reader was closed. Closing. Path='%s'", path) |
after talking to Fae and Tiago I'm putting it back to draft, I'll change it and split it up |
502015d
to
40f57e8
Compare
Aggregates per-input metrics in libbeat/publisher/pipeline.client. New metrics: `events_pipeline_total`, `events_pipeline_filtered_total`, and `events_pipeline_published_total`. v2.Context includes a metric registry for context-specific metrics, created under `beat.Info.Monitoring.Namespace` and named after the context ID. If the context lacks an ID, a 'discard' metric registry, associated with no namespace, is used, preventing metric publication. v2.Input passes the v2.Context metric registry when creating the pipeline client. Introduces a new `inputmon` API, enabling inputs to publish their metrics via the HTTP monitoring endpoint, replacing the now deprecated `inputmon.NewInputRegistry`. Inputs must still register metrics using `inputmon.RegisterMetrics` for them to be published, and call `inputmon.UnregisterMetrics` to release resources on close. The deprecated `inputmon.NewInputRegistry` remains functional. Integration tests added for `filestream`, `httpjson`, and `cel` to verify new metrics.
40f57e8
to
a481f4c
Compare
Proposed commit message
Checklist
[ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Disruptive User Impact
How to test this PR locally
Check the input metrics looking for:
events_pipeline_filtered_total
events_pipeline_published_total
events_pipeline_total
Adjust the instructions for your OS/platform and the paths as needed
standalone filebeat
flog -t log -o /tmp/log.log -w -f json -l -p 1048576 -d 1s
flog -t log -o /tmp/log.log2 -w -f json -l -p 1048576 -d 1s
./filebeat -e -c ./filebeat.yml 2>&1 | grep -E 'message'
curl 'http://localhost:5066/inputs/'
:managed (under agent) beat
rm -rf ../beats/x-pack/agentbeat/build
DEV=true SNAPSHOT=true EXTERNAL=false PACKAGES="tar.gz" PLATFORMS=linux/amd64 mage -v package
/opt/Elastic/Agent/elastic-agent diagnostics
input_metrics.json
for filestream in the diagnostics:DIAGNOSTICS-FOLDER/components/filestream-monitoring/input_metrics.json
Related issues
Use cases
The new metric will help to understand how specific inputs are performing, specially regarding
filtered
events.Screenshots
Logs