Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-input metrics to libbeat pipeline client #42618

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

AndersonQ
Copy link
Member

@AndersonQ AndersonQ commented Feb 6, 2025

Proposed commit message

add per-input metrics to libbeat pipeline client

Aggregates per-input metrics in libbeat/publisher/pipeline.client. New metrics:
`events_pipeline_total`, `events_pipeline_filtered_total`, and
`events_pipeline_published_total`.

v2.Context includes a metric registry for context-specific metrics, created
under `beat.Info.Monitoring.Namespace` and named after the context ID. If the
context lacks an ID, a 'discard' metric registry, associated with no namespace,
is used, preventing metric publication.

v2.Input passes the v2.Context metric registry when creating the pipeline 
client.

Introduces a new `inputmon` API, enabling inputs to publish their metrics via
the HTTP monitoring endpoint, replacing the now deprecated
`inputmon.NewInputRegistry`. Inputs must still register metrics using
`inputmon.RegisterMetrics` for them to be published, and call
`inputmon.UnregisterMetrics` to release resources on close.
The deprecated `inputmon.NewInputRegistry` remains functional.

Integration tests added for `filestream`, `httpjson`, and `cel` to verify new
metrics.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Disruptive User Impact

  • None

How to test this PR locally

Check the input metrics looking for:

  • events_pipeline_filtered_total
  • events_pipeline_published_total
  • events_pipeline_total

Adjust the instructions for your OS/platform and the paths as needed

standalone filebeat

  • compile x-pack/filebeat and run it with the following configuration:
http:
  enabled: true
#  port: 5067

filebeat.inputs:
# ========================= filestream inputs
- type: filestream
  id: my-filestream-id
  enabled: true
  paths:
    - /tmp/log.log
  processors:
    - drop_event:
        when:
          contains:
            message: "PUT"

- type: filestream
  enabled: true
  paths:
    - /tmp/log.log2

# ========================= benchmark inputs
- type: benchmark
  id: my-benchmark-id
  enabled: true
  message: "test message"
  threads: 1
  eps: 5

- type: benchmark
  enabled: true
  message: "another benchmark message"
  threads: 1
  eps: 5

# ========================= httpjson and cel inputs
- type: httpjson
  id: my-httpjson-id
  interval: 1s
  request.url: https://api.ipify.org/?format=json

- type: httpjson
  interval: 1s
  request.url: https://api.ipify.org/?format=json

# ========================= cel inputs
- type: cel
  id: my-cel-id
  interval: 1s
  resource.url: https://api.ipify.org/?format=json
  program: bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})

- type: cel
  interval: 1s
  resource.url: https://api.ipify.org/?format=json
  program: bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})

# ========================= log input, a input which does not publish metrics
- type: log
  id: log-input-id
  paths:
    - /home/ainsoph/tmp/*.log
  allow_deprecated_use: true

output.file:
  path: /tmp/
  filename: "output-file"
  rotate_every_kb: 10000

logging.level: info
logging.metrics:
  - enabled:
  - namespaces: [stats, dataset]
  • create 2 log files:
    • flog -t log -o /tmp/log.log -w -f json -l -p 1048576 -d 1s
    • flog -t log -o /tmp/log.log2 -w -f json -l -p 1048576 -d 1s
  • run filebeat: ./filebeat -e -c ./filebeat.yml 2>&1 | grep -E 'message'
  • check the input metrics: curl 'http://localhost:5066/inputs/':
[{"batch_processing_time":{"histogram":{"count":153,"max":3887208,"mean":51739.62745098039,"median":25493,"min":8096,"p75":32687.5,"p95":48809.799999999945,"p99":1827684.7200000302,"p999":3887208,"stddev":311294.0642671061}},"batches_published_total":153,"batches_received_total":153,"cel_executions":153,"cel_processing_time":{"histogram":{"count":153,"max":186709251,"mean":164538269.32026145,"median":164547517,"min":143200202,"p75":170071079.5,"p95":177281553.5,"p99":186409732.44,"p999":186709251,"stddev":8318454.118578345}},"events_pipeline_filtered_total":0,"events_pipeline_published_total":153,"events_pipeline_total":153,"events_published_total":153,"events_received_total":153,"http_request_body_bytes":{"histogram":{"count":153,"max":0,"mean":0,"median":0,"min":0,"p75":0,"p95":0,"p99":0,"p999":0,"stddev":0}},"http_request_body_bytes_total":0,"http_request_delete_total":0,"http_request_errors_total":0,"http_request_get_total":153,"http_request_head_total":0,"http_request_options_total":0,"http_request_patch_total":0,"http_request_post_total":0,"http_request_put_total":0,"http_request_total":153,"http_response_1xx_total":0,"http_response_2xx_total":153,"http_response_3xx_total":0,"http_response_4xx_total":0,"http_response_5xx_total":0,"http_response_body_bytes":{"histogram":{"count":153,"max":23,"mean":23,"median":23,"min":23,"p75":23,"p95":23,"p99":23,"p999":23,"stddev":0}},"http_response_body_bytes_total":3519,"http_response_errors_total":0,"http_response_total":153,"http_round_trip_time":{"histogram":{"count":153,"max":186469501,"mean":164352040.05228758,"median":164177021,"min":143044222,"p75":169912158.5,"p95":177098617,"p99":186182690.8,"p999":186469501,"stddev":8314959.896409168}},"id":"my-cel-id::https://api.ipify.org/?format=json","input":"cel","resource":"https://api.ipify.org/?format=json"},{"events_pipeline_filtered_total":0,"events_pipeline_published_total":760,"events_pipeline_total":760,"events_published_total":760,"id":"my-benchmark-id","input":"benchmark","publishing_time":{"histogram":{"count":760,"max":142873,"mean":15755.582894736843,"median":8368.5,"min":2017,"p75":21385.5,"p95":50887.899999999994,"p99":81388.73999999995,"p999":142873,"stddev":17699.705136808647}}},{"events_pipeline_filtered_total":0,"events_pipeline_published_total":760,"events_pipeline_total":760,"events_published_total":760,"id":"755D5DF325A7FDBF","input":"benchmark","publishing_time":{"histogram":{"count":760,"max":175042,"mean":15433.890789473684,"median":8095,"min":2177,"p75":20586.25,"p95":52056.14999999997,"p99":83584.28999999998,"p999":175042,"stddev":18215.08020807916}}},{"events_pipeline_filtered_total":0,"events_pipeline_published_total":153,"events_pipeline_total":153,"events_published_total":0,"http_request_body_bytes":{"histogram":{"count":153,"max":0,"mean":0,"median":0,"min":0,"p75":0,"p95":0,"p99":0,"p999":0,"stddev":0}},"http_request_body_bytes_total":0,"http_request_delete_total":0,"http_request_errors_total":0,"http_request_get_total":153,"http_request_head_total":0,"http_request_options_total":0,"http_request_patch_total":0,"http_request_post_total":0,"http_request_put_total":0,"http_request_total":153,"http_response_1xx_total":0,"http_response_2xx_total":153,"http_response_3xx_total":0,"http_response_4xx_total":0,"http_response_5xx_total":0,"http_response_body_bytes":{"histogram":{"count":153,"max":23,"mean":23,"median":23,"min":23,"p75":23,"p95":23,"p99":23,"p999":23,"stddev":0}},"http_response_body_bytes_total":3519,"http_response_errors_total":0,"http_response_total":153,"http_round_trip_time":{"histogram":{"count":153,"max":328353252,"mean":166115255.65359476,"median":165613920,"min":145535979,"p75":168939270,"p95":181150658.29999998,"p99":253904712.9000011,"p999":328353252,"stddev":15133493.12118082}},"httpjson_interval_errors_total":0,"httpjson_interval_execution_time":{"histogram":{"count":153,"max":328685360,"mean":166373788.74509802,"median":165916593,"min":145690086,"p75":169239897.5,"p95":181800487,"p99":254136953.30000108,"p999":328685360,"stddev":15139992.12741519}},"httpjson_interval_pages":{"histogram":{"count":0,"max":0,"mean":0,"median":0,"min":0,"p75":0,"p95":0,"p99":0,"p999":0,"stddev":0}},"httpjson_interval_pages_execution_time":{"histogram":{"count":153,"max":568165,"mean":69280.75816993463,"median":59288,"min":16768,"p75":73949.5,"p95":152598.4999999999,"p99":361156.54000000306,"p999":568165,"stddev":49886.95807418775}},"httpjson_interval_total":153,"id":"my-httpjson-id","input":"httpjson","pages_published_total":153},{"batch_processing_time":{"histogram":{"count":153,"max":3872931,"mean":52324.94117647059,"median":25255,"min":7964,"p75":32306.5,"p95":56138.89999999999,"p99":1825094.4000000302,"p999":3872931,"stddev":310146.3571651302}},"batches_published_total":153,"batches_received_total":153,"cel_executions":153,"cel_processing_time":{"histogram":{"count":153,"max":335157016,"mean":166408238.50980392,"median":165266373,"min":146165396,"p75":170722524,"p95":179548976.9,"p99":288916196.0800007,"p999":335157016,"stddev":17682013.96172901}},"events_pipeline_filtered_total":0,"events_pipeline_published_total":153,"events_pipeline_total":153,"events_published_total":153,"events_received_total":153,"http_request_body_bytes":{"histogram":{"count":153,"max":0,"mean":0,"median":0,"min":0,"p75":0,"p95":0,"p99":0,"p999":0,"stddev":0}},"http_request_body_bytes_total":0,"http_request_delete_total":0,"http_request_errors_total":0,"http_request_get_total":153,"http_request_head_total":0,"http_request_options_total":0,"http_request_patch_total":0,"http_request_post_total":0,"http_request_put_total":0,"http_request_total":153,"http_response_1xx_total":0,"http_response_2xx_total":153,"http_response_3xx_total":0,"http_response_4xx_total":0,"http_response_5xx_total":0,"http_response_body_bytes":{"histogram":{"count":153,"max":23,"mean":23,"median":23,"min":23,"p75":23,"p95":23,"p99":23,"p999":23,"stddev":0}},"http_response_body_bytes_total":3519,"http_response_errors_total":0,"http_response_total":153,"http_round_trip_time":{"histogram":{"count":153,"max":335030877,"mean":166227826.7908497,"median":165048060,"min":146015539,"p75":170549274,"p95":179342099.6,"p99":288685097.8200007,"p999":335030877,"stddev":17679426.681866124}},"id":"3472F81E3F68759D::https://api.ipify.org/?format=json","input":"cel","resource":"https://api.ipify.org/?format=json"},{"events_pipeline_filtered_total":0,"events_pipeline_published_total":153,"events_pipeline_total":153,"events_published_total":0,"http_request_body_bytes":{"histogram":{"count":153,"max":0,"mean":0,"median":0,"min":0,"p75":0,"p95":0,"p99":0,"p999":0,"stddev":0}},"http_request_body_bytes_total":0,"http_request_delete_total":0,"http_request_errors_total":0,"http_request_get_total":153,"http_request_head_total":0,"http_request_options_total":0,"http_request_patch_total":0,"http_request_post_total":0,"http_request_put_total":0,"http_request_total":153,"http_response_1xx_total":0,"http_response_2xx_total":153,"http_response_3xx_total":0,"http_response_4xx_total":0,"http_response_5xx_total":0,"http_response_body_bytes":{"histogram":{"count":153,"max":23,"mean":23,"median":23,"min":23,"p75":23,"p95":23,"p99":23,"p999":23,"stddev":0}},"http_response_body_bytes_total":3519,"http_response_errors_total":0,"http_response_total":153,"http_round_trip_time":{"histogram":{"count":153,"max":254715442,"mean":165821151.11764705,"median":164748952,"min":145475708,"p75":169973688.5,"p95":178754770.29999998,"p99":218265792.46000054,"p999":254715442,"stddev":10491581.299251346}},"httpjson_interval_errors_total":0,"httpjson_interval_execution_time":{"histogram":{"count":153,"max":255000523,"mean":166075323.74509802,"median":164963605,"min":145756796,"p75":170136302,"p95":178968159.89999998,"p99":218616293.92000052,"p999":255000523,"stddev":10500393.631722212}},"httpjson_interval_pages":{"histogram":{"count":0,"max":0,"mean":0,"median":0,"min":0,"p75":0,"p95":0,"p99":0,"p999":0,"stddev":0}},"httpjson_interval_pages_execution_time":{"histogram":{"count":153,"max":654194,"mean":68283.91503267974,"median":60530,"min":23805,"p75":73008.5,"p95":113680.4999999998,"p99":406049.9600000037,"p999":654194,"stddev":53728.922333904244}},"httpjson_interval_total":153,"id":"CB808954C3AA5FA3","input":"httpjson","pages_published_total":153},{"bytes_processed_total":6989961,"events_pipeline_filtered_total":4220,"events_pipeline_published_total":21053,"events_pipeline_total":25273,"events_processed_total":25273,"files_active":2,"files_closed_total":0,"files_opened_total":2,"id":"my-filestream-id","input":"filestream","messages_read_total":25273,"messages_truncated_total":0,"processing_errors_total":0,"processing_time":{"histogram":{"count":25273,"max":11424032,"mean":34211.57421875,"median":10663,"min":2699,"p75":14096.25,"p95":37287,"p99":74415,"p999":11375368.150000043,"stddev":463067.376129514}}}]

managed (under agent) beat

  • compile an Elastic Agent with this branch version of agentbeat:
    • in the agent folder:
      • rm -rf ../beats/x-pack/agentbeat/build
      • DEV=true SNAPSHOT=true EXTERNAL=false PACKAGES="tar.gz" PLATFORMS=linux/amd64 mage -v package
  • enroll the agent into fleet. A new policy with the default options will have filestream to collect the monitoring logs
  • get a diagnostic from the agent: /opt/Elastic/Agent/elastic-agent diagnostics
  • inspect the input_metrics.json for filestream in the diagnostics:
    • DIAGNOSTICS-FOLDER/components/filestream-monitoring/input_metrics.json

Related issues

Use cases

The new metric will help to understand how specific inputs are performing, specially regarding filtered events.

Screenshots

  • N/A

Logs

  • N/A

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 6, 2025
Copy link
Contributor

mergify bot commented Feb 6, 2025

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @AndersonQ? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@AndersonQ AndersonQ force-pushed the 423225-input-event-report branch 3 times, most recently from f6ba6dc to 75929e5 Compare February 7, 2025 09:26
@AndersonQ AndersonQ added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Feb 7, 2025
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 7, 2025
@AndersonQ AndersonQ force-pushed the 423225-input-event-report branch 4 times, most recently from 3f24545 to 4c32ff3 Compare February 11, 2025 17:10
@AndersonQ AndersonQ force-pushed the 423225-input-event-report branch 2 times, most recently from 4358017 to f48ee59 Compare February 17, 2025 11:26
@AndersonQ AndersonQ added the backport-active-8 Automated backport with mergify to all the active 8.[0-9]+ branches label Feb 17, 2025
@AndersonQ AndersonQ changed the title WIP - just to run all the tests Add per-input metrics to libbeat pipeline client Feb 17, 2025
@AndersonQ AndersonQ marked this pull request as ready for review February 17, 2025 15:34
@AndersonQ AndersonQ requested review from a team as code owners February 17, 2025 15:34
@AndersonQ AndersonQ requested review from belimawr and rdner February 17, 2025 15:34
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@AndersonQ AndersonQ requested a review from faec February 17, 2025 15:34
@AndersonQ AndersonQ marked this pull request as draft February 17, 2025 15:40
@AndersonQ AndersonQ marked this pull request as ready for review February 17, 2025 16:00
@pierrehilbert pierrehilbert added Team:obs-ds-hosted-services Label for the Observability Hosted Services team Team:Security-Linux Platform Linux Platform Team in Security Solution Team:Security-Windows Platform Windows Platform Team in Security Solution labels Feb 18, 2025
@AndersonQ AndersonQ added windows-2022 macos-m1 Enable stages in the CI for the MacOS M1 windows-11 Enable builds in the CI for windows-10 labels Mar 3, 2025
@AndersonQ AndersonQ marked this pull request as ready for review March 3, 2025 10:47
@AndersonQ AndersonQ requested a review from a team as a code owner March 3, 2025 10:47
@AndersonQ
Copy link
Member Author

@andrewkroh I made the changes you requested. Now the v2.Context carries a registry for the input metrics. Which is passed to the pipeline client. So if the input register its metrics with inputmon package, then the new metrics get published.

@faec, @belimawr as I ended up changing the approach, could you review it again?

@AndersonQ AndersonQ force-pushed the 423225-input-event-report branch from de923c7 to 8bb9bf1 Compare March 3, 2025 11:09
@AndersonQ AndersonQ added macOS Enable builds in the CI for darwin testing macOSArm Enable build and removed macos-m1 Enable stages in the CI for the MacOS M1 labels Mar 3, 2025
@AndersonQ
Copy link
Member Author

/test

@@ -1977,7 +1986,7 @@ func TestInput(t *testing.T) {
t.Errorf("unexpected number of cursors events: got:%d want at least:%d", len(client.cursors), len(test.wantCursor))
test.wantCursor = test.wantCursor[:len(client.published)]
}
client.published = client.published[:len(test.want)]
client.cursors = client.cursors[:len(test.wantCursor)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked to the team responsible for this input and they agreed this fix is correct.

@@ -67,6 +67,7 @@ Dropped fields: `syslog.priority` and `syslog.facility` while keeping their dupl
already present as `container.id` and `container.log.tag` is dropped because it is already present as
`log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`,
otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Add per-input metrics to filestream, cel and httpjson inputs {pull}42618[42618] {issue}42761[42761]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filestream, cel and httpjson are the ones I added test for. Any input v2 which exposes input metrics should have the new metrics. I'm checking if I can list them all without having to go checking the code for each one of the inputs.

@@ -67,6 +67,7 @@ Dropped fields: `syslog.priority` and `syslog.facility` while keeping their dupl
already present as `container.id` and `container.log.tag` is dropped because it is already present as
`log.syslog.appname`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`,
otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Add per-input pipeline metrics to filestream, cel and httpjson inputs. {pull}42618[42618] {issue}42761[42761]
Copy link
Member Author

@AndersonQ AndersonQ Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, all v2 inputs which publish metrics will have the pre-input pipeline metrics. Those 3 are the ones I added integration tests for.

I couldn't test every single one, so I'm wondering what is the best to add here.

Looking at the docs for each v1 input the only one which has a Metrics section is the gcppubsub. So apart from having a test for every affected input, we could say all inputs exposing metrics on the HTTP monitoring endpoint have the new metrics, with only one exception: gcppubsub.

Here is what I came up with looking at the code:

v2 inputs ( with the pipeline metrics per input)

  • OSS:
filestream
journald
kafka
tcp
udp
unix
winlog
  • x-pack:
awscloudwatch
awss3
azureblobstorage
azureeventhub
benchmark
cel
cloudfoundry
entityanalytics
etw
gcs
http_endpoint
httpjson
lumberjack
netflow
o365audit
salesforce
streaming
unifiedlogs

v1 (without the new metrics)

  • OSS:
container
log
mqtt
redis
stdin
syslog
  • x-pack:
cometd
gcppubsub

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log input is a V1 input, see how it is registered:

err := input.Register("log", NewInput)
.

Only the inputs that import/use "github.com/elastic/beats/v7/filebeat/input/v2" are V2 inputs.

For the release note, I believe the best is to list all V2 inputs that have the metrics.

Because the inputs are so different, I'm fine with only adding tests for the main ones and relying on the current tests to ensure everything is still working.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, something went wrong in the copy and paste. I fixed it now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An option when documenting all v2 inputs have the new metrics would be to add it to the Configure an HTTP endpoint for metrics > Inputs

instead of adding to each ones metrics section of the docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of adding the list to the HTTP metrics docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe mention it in the release notes instead of listing the inputs there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nimarezainia @flexitrev any thoughts on the best way to make those new metrics public?

Right now I'm leaning towards adding a description of the pipeline metrics per input to to the Configure an HTTP endpoint for metrics > Inputs in a generic way, stating not all inputs have them. Then later we might go input by input checking how much value they bring and documenting as we/the teams owning the inputs seeing fit. Ideally there would be a test for each input which add those metrics to their documentation.

Also they might not bring immediate value for all inputs. For example, the awss3 dashboard does not have a direct mention to event. So I believe for these new metrics to be valuable it needs some explanation of how the current metrics relate to events. Again, that's why I think a generic explanation on the docs of the input metrics might be good for now and then each team decides how to handle it for their inputs.

cc: @pierrehilbert, @cmacknz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to make the metrics public? Remember if we make them part of our public API as GA we can't break or deprecate them easily. Are you 100% these metrics are perfect and you won't want to stop publishing some of them right now? Are they better made available for internal use initially?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to make the metrics public? Remember if we make them part of our public API as GA we can't break or deprecate them easily.

That's why I'm asking :)

Are they better made available for internal use initially?

I'm happy with that. Not documenting them for now and see how we use them and just later on making them public.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to make the metrics public? Remember if we make them part of our public API as GA we can't break or deprecate them easily.

That's why I'm asking :)

Are they better made available for internal use initially?

I'm happy with that. Not documenting them for now and see how we use them and just later on making them public.

@@ -187,6 +187,10 @@ observe the activity of the input. Note that metrics from processors are not inc
| `messages_truncated_total` | Total number of messages truncated.
| `bytes_processed_total` | Total number of bytes processed.
| `events_processed_total` | Total number of events processed.
| `events_pipeline_total` | Total number of events processed by the publishing pipeline.
| `events_pipeline_failed_total` | Total number of events failed at the publishing pipeline.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't block on this but I'd push back a little on including the "failed" number (here and throughout). It will always be zero and it will only confuse people who think it represents something it doesn't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep parity with the pipeline metrics. And later better explain the pipeline metrics.

Let's see how we resolve https://github.com/elastic/beats/pull/42618/files#r1980164988. The we change here or omit it all together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'd rather not aim for parity for it's own sake. "failed" is already a broken metric that we'd prefer to remove, and I think adding more copies of it would make that worse and make misunderstandings more common.

// metrics.
// Consequently, inputs are not required to immediately adopt the new
// registry for their metrics.
MetricsRegistry *monitoring.Registry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand the contract with the inputs here... can inputs create their own type-specific metrics by adding things to this registry? Is it supposed to be used at all other than that? Why does it include a cancel function, if the pipeline is able to cancel the input-specific registry automatically when the input shuts down? Right now I don't see a reason for these to be provided through the Context, but if it's important then the comments should document not just the high-level data flow but also how input code is expected to use these fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand the contract with the inputs here... can inputs create their own type-specific metrics by adding things to this registry?

yes.

It's because having the registry passed do the pipeline client created a chicken-egg problem. The pipeline client is created before the inputs create their metric registry. Also it's the case 100% of the time, filestream is an example of the exception. In addition to that, depending on the input, the name of the input they use for the metrics is different than their actual name. The httpjson-stateless used only httpjson in the metrics. Another input, which I don't recall right now, do the opposite, adds a suffix to its name then setting the input type in the metrics.

Given that, moving the call the inputs do to inputmon.NewInputRegistry to create the registry to when the context is created won't work. Or at least would still require the input to later adjust the registry, like fixing the input type. Also, creating the registry by calling inputmon.NewInputRegistry during the v2.Context creation would make the metrics to be published, even if the input does not registers any metrics. So to let the input in full control of publishing or not metrics, the input still needs to call inputmon.NewInputRegistry which will add the id and input type to the registry and those being present are required for the metrics in the registry to be published.

So this is the registry the pipeline client and later the ES output will use to add the per-input metrics. If the input has an ID, but does not register metrics, the pipeline and output per-input metrics won't be published.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from this comment that I'm misunderstanding some of the problem specification / desired behavior. I think I have some suggestions to simplify this approach, but I want to make sure they're relevant... let's catch up offline tomorrow to make sure we're both talking about the same things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I hijack this call? I'm also confused by this whole process.

@@ -45,6 +46,11 @@ type Client interface {
// ClientConfig defines common configuration options one can pass to
// Pipeline.ConnectWith to control the clients behavior and provide ACK support.
type ClientConfig struct {
// InputMetricsRegistry is the metrics registry used to aggregate pipeline metrics
// per input.
InputMetricsRegistry *monitoring.Registry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the registry and cancellation function here confuses me, too... why would an input need to provide a different registry than the one already associated with it by the pipeline? Why is a cancellation function included when cancellation is linked to input shutdown, not to closing a particular client (since inputs can use multiple clients)? Aren't these fields shared by all clients for a particular input anyway? It seems to me like the pipeline should be handling all this bookkeeping unless there is a specific need for inputs to be able to customize / override the baseline behavior. (But same as previous, if this is necessary then the semantics should be documented here.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancellation is much more a precaution if the input for some reason does not registers metrics and therefore doe not cancel/unregister them. I didn't want to assume every input will register metrics and therefore, during its shutdown, cancel/unregister them.

why would an input need to provide a different registry than the one already associated with it by the pipeline?

It shouldn't. It could only happen if for some odd reason the ID it uses to publish its metrics is different from the v2.Context.ID. In that case the pipeline metrics would not be published.

[...] inputs can use multiple clients [...]

Do you know any input using multiple clients? The only example I saw, but analysing only the cel, filestream and httpjson inputs is the cel which creates derived context, so for each context there is a new pipeline client, and therefore there is still the 1-1 relationship between the context and the pipeline client.

There is my original idea of providing a different namespace and metrics registry for the pipeline client and then when publishing the metrics merging them with the input specific metrics. This lets each component to control the life-cycle of its metrics and when both are present, they're merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancellation is much more a precaution if the input for some reason does not registers metrics and therefore doe not cancel/unregister them.

But then I'm still missing something... it looked like the pipeline itself could still cancel the registry when the input shuts down, is that true? If so then I don't see why it matters if the input itself does it, whether it registered its own metrics or not.

It shouldn't. It could only happen if for some odd reason the ID it uses to publish its metrics is different from the v2.Context.ID. In that case the pipeline metrics would not be published.

Then why is the input required to pass the registry through at all? Can't that be handled by the pipeline, since it already knows the registry that will be used?

Do you know any input using multiple clients?

awss3 for one, and I believe there are many others. It can be important e.g. for ACK tracking when the input has multiple independent workers.

@AndersonQ AndersonQ dismissed faec’s stale review March 5, 2025 17:14

it needs a new review.

// The returned cancel function *must* be called when the input stops to
// unregister the metrics and prevent resource leaks.
//
// This function might panic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't Panic! - The Hitchhiker's Guide to the Galaxy

Jokes aside, why the risk of panic now? I recall some issues in the past with duplicated IDs causing panics, but as far as I can tell, they were fixed.

Has this risk increased?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, and if there is an unavoidable panic path, the conditions that could lead to it should be detailed so that it can be avoided. See documentation in reflect for this kind of thing, e.g. "If CanAddr returns false, calling Value.Addr will panic."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove it, but this is the original behaviour, I just kept it as it was

Comment on lines +45 to +137
defer func() {
if r := recover(); r != nil {
panic(fmt.Errorf("inoutmon.NewInputRegistry panic: %+v", r))
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very confused here... You're recovering from the panic just to panic again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the original behaviour, I just added more context.

Comment on lines 57 to 59
id string
// generatedID is true when the config does not include an ID and therefore
// the runner uses a hash from the config as ID.
generatedID bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused here: why is it need to have a flag indicating the ID was auto-generated rather than user-inputed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it would be good to add why it is needed in the comments here.

}
} else {
// when the input has no ID, no metric is published, therefore, we
// use a 'discard' registry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a 'discard' registry mean here? It it actually a noop registry or just a registry that collects metrics and they're never reported?

Comment on lines 99 to 108
// The inputs still need to call inputmon.NewInputRegistry to further
// populate the registry by adding `id` and `input` (the input type) so the
// metrics will be published on the `/inputs/` monitoring endpoint.
// This allows the inputs to decide to publish or not metrics, regardless of
// the input having an ID. A valid metrics registry which will be passed
// down to the pipeline client and output. Hoerver if the input does not
// call inputmon.NewInputRegistry for the same ID to further populate the
// Also, calling inputmon.NewInputRegistry allows inputs to set their type,
// the `input` field, as they will. Not all inputs use their hard-coded
// name as their type, thus the input itself needs to set it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems some of the lines are out of place, see L105-106:

	// call inputmon.NewInputRegistry for the same ID to further populate the
	// Also, calling inputmon.NewInputRegistry allows inputs to set their type,

Could you review/clarify this comment?

// metrics.
// Consequently, inputs are not required to immediately adopt the new
// registry for their metrics.
MetricsRegistry *monitoring.Registry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I hijack this call? I'm also confused by this whole process.

fmt.Sprintf("End of file reached: %s; Backoff now.", logFilePath),
10*time.Second, "Filebeat did not close the file")

// 5. Now that the reader has been closed, we can make the assertions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a detail, but End of file reached does not mean the reader has been closed, it only means the file was read until the end.

The reader closed log:

log.Infof("Reader was closed. Closing. Path='%s'", path)

@AndersonQ AndersonQ marked this pull request as draft March 6, 2025 19:35
@AndersonQ
Copy link
Member Author

after talking to Fae and Tiago I'm putting it back to draft, I'll change it and split it up

@AndersonQ AndersonQ force-pushed the 423225-input-event-report branch 7 times, most recently from 502015d to 40f57e8 Compare March 7, 2025 15:22
Aggregates per-input metrics in libbeat/publisher/pipeline.client. New metrics:
`events_pipeline_total`, `events_pipeline_filtered_total`, and
`events_pipeline_published_total`.

v2.Context includes a metric registry for context-specific metrics, created
under `beat.Info.Monitoring.Namespace` and named after the context ID. If the
context lacks an ID, a 'discard' metric registry, associated with no namespace,
is used, preventing metric publication.

v2.Input passes the v2.Context metric registry when creating the pipeline
client.

Introduces a new `inputmon` API, enabling inputs to publish their metrics via
the HTTP monitoring endpoint, replacing the now deprecated
`inputmon.NewInputRegistry`. Inputs must still register metrics using
`inputmon.RegisterMetrics` for them to be published, and call
`inputmon.UnregisterMetrics` to release resources on close.
The deprecated `inputmon.NewInputRegistry` remains functional.

Integration tests added for `filestream`, `httpjson`, and `cel` to verify new
metrics.
@AndersonQ AndersonQ force-pushed the 423225-input-event-report branch from 40f57e8 to a481f4c Compare March 7, 2025 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-active-8 Automated backport with mergify to all the active 8.[0-9]+ branches macOS Enable builds in the CI for darwin testing macOSArm Enable build Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team Team:obs-ds-hosted-services Label for the Observability Hosted Services team Team:Security-Linux Platform Linux Platform Team in Security Solution Team:Security-Windows Platform Windows Platform Team in Security Solution windows-11 Enable builds in the CI for windows-10 windows-2019 windows-2022
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Report per input metrics for filtered events
8 participants