Skip to content

Commit f4f77f7

Browse files
committed
Fixes #20
Signed-off-by: Philipp Hossner <philipp.hossner@posteo.de>
1 parent ef53c83 commit f4f77f7

File tree

7 files changed

+277
-27
lines changed

7 files changed

+277
-27
lines changed

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@ For details of each metric type, see [Prometheus documentation](http://prometheu
286286
- `type`: metric type (required)
287287
- `desc`: description of this metric (required)
288288
- `key`: key name of record for instrumentation (**optional**)
289+
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
290+
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
289291
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
290292

291293
If key is empty, the metric values is treated as 1, so the counter increments by 1 on each record regardless of contents of the record.
@@ -310,6 +312,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
310312
- `type`: metric type (required)
311313
- `desc`: description of metric (required)
312314
- `key`: key name of record for instrumentation (required)
315+
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
316+
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
313317
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
314318

315319
### summary type
@@ -332,6 +336,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
332336
- `type`: metric type (required)
333337
- `desc`: description of metric (required)
334338
- `key`: key name of record for instrumentation (required)
339+
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
340+
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
335341
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
336342

337343
### histogram type
@@ -356,6 +362,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
356362
- `desc`: description of metric (required)
357363
- `key`: key name of record for instrumentation (required)
358364
- `buckets`: buckets of record for instrumentation (optional)
365+
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
366+
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
359367
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)
360368

361369
## Labels
@@ -430,6 +438,33 @@ Prometheus output/filter plugin can have multiple metric section. Top-level labe
430438

431439
In this case, `message_foo_counter` has `tag`, `hostname`, `key` and `data_type` labels.
432440

441+
## Retention
442+
443+
By default metrics with all encountered label combinations are preserved until the next restart of fluentd.
444+
Even if a label combination did not receive any update for a long time.
445+
That behavior is not always desirable e.g. when the contents of of fields change for good and the metric becomes idle.
446+
For these metrics you can set `retention` and `retention_check_interval` like this:
447+
448+
```
449+
<metric>
450+
name message_foo_counter
451+
type counter
452+
desc The total number of foo in message.
453+
key foo
454+
retention 3600 # 1h
455+
retention_check_interval 1800 # 30m
456+
<labels>
457+
bar ${bar}
458+
</labels>
459+
</metric>
460+
```
461+
462+
If `${bar}` was `baz` one time but after that no records with that value were processed, then after one hour the metric
463+
`foo{bar="baz"}` might be removed.
464+
When this actually happens depends on `retention_check_interval` (default 60).
465+
It causes a background thread to check every 30 minutes for expired metrics.
466+
So worst case the metrics are removed 30 minutes after expiration.
467+
You can set this value as low as `1`, but that may put more stress on your CPU.
433468

434469
## Try plugin with nginx
435470

lib/fluent/plugin/filter_prometheus.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ class PrometheusFilter < Fluent::Plugin::Filter
77
include Fluent::Plugin::PrometheusLabelParser
88
include Fluent::Plugin::Prometheus
99

10+
helpers :thread
11+
1012
def initialize
1113
super
1214
@registry = ::Prometheus::Client.registry
@@ -22,6 +24,17 @@ def configure(conf)
2224
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
2325
end
2426

27+
def start
28+
super
29+
Fluent::Plugin::Prometheus.start_retention_threads(
30+
@metrics,
31+
@registry,
32+
method(:thread_create),
33+
method(:thread_current_running?),
34+
@log
35+
)
36+
end
37+
2538
def filter(tag, time, record)
2639
instrument_single(tag, time, record, @metrics)
2740
record

lib/fluent/plugin/out_prometheus.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ class PrometheusOutput < Fluent::Plugin::Output
77
include Fluent::Plugin::PrometheusLabelParser
88
include Fluent::Plugin::Prometheus
99

10+
helpers :thread
11+
1012
def initialize
1113
super
1214
@registry = ::Prometheus::Client.registry
@@ -22,6 +24,17 @@ def configure(conf)
2224
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
2325
end
2426

27+
def start
28+
super
29+
Fluent::Plugin::Prometheus.start_retention_threads(
30+
@metrics,
31+
@registry,
32+
method(:thread_create),
33+
method(:thread_current_running?),
34+
@log
35+
)
36+
end
37+
2538
def process(tag, es)
2639
instrument(tag, es, @metrics)
2740
end

lib/fluent/plugin/prometheus.rb

Lines changed: 121 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,24 @@ def self.parse_metrics_elements(conf, registry, labels = {})
8181
metrics
8282
end
8383

84+
def self.start_retention_threads(metrics, registry, thread_create, thread_running, log)
85+
metrics.select { |metric| metric.has_retention? }.each do |metric|
86+
thread_create.call("prometheus_retention_#{metric.name}".to_sym) do
87+
while thread_running.call()
88+
metric.remove_expired_metrics(registry, log)
89+
sleep(metric.retention_check_interval)
90+
end
91+
end
92+
end
93+
end
94+
8495
def self.placeholder_expander(log)
8596
Fluent::Plugin::Prometheus::ExpandBuilder.new(log: log)
8697
end
8798

8899
def stringify_keys(hash_to_stringify)
89100
# Adapted from: https://www.jvt.me/posts/2019/09/07/ruby-hash-keys-string-symbol/
90-
hash_to_stringify.map do |k,v|
101+
hash_to_stringify.map do |k, v|
91102
value_or_hash = if v.instance_of? Hash
92103
stringify_keys(v)
93104
else
@@ -151,6 +162,9 @@ class Metric
151162
attr_reader :name
152163
attr_reader :key
153164
attr_reader :desc
165+
attr_reader :retention
166+
attr_reader :retention_check_interval
167+
attr_reader :last_modified_store
154168

155169
def initialize(element, registry, labels)
156170
['name', 'desc'].each do |key|
@@ -162,6 +176,11 @@ def initialize(element, registry, labels)
162176
@name = element['name']
163177
@key = element['key']
164178
@desc = element['desc']
179+
@retention = element['retention'].to_i
180+
@retention_check_interval = element.fetch('retention_check_interval', 60).to_i
181+
if has_retention?
182+
@last_modified_store = LastModifiedStore.new
183+
end
165184

166185
@base_labels = Fluent::Plugin::Prometheus.parse_labels_elements(element)
167186
@base_labels = labels.merge(@base_labels)
@@ -192,6 +211,76 @@ def self.get(registry, name, type, docstring)
192211

193212
metric
194213
end
214+
215+
def set_value?(value)
216+
if value
217+
return true
218+
end
219+
false
220+
end
221+
222+
def instrument(record, expander)
223+
value = self.value(record)
224+
if self.set_value?(value)
225+
labels = labels(record, expander)
226+
set_value(value, labels)
227+
if has_retention?
228+
@last_modified_store.set_last_updated(labels)
229+
end
230+
end
231+
end
232+
233+
def has_retention?
234+
@retention > 0
235+
end
236+
237+
def remove_expired_metrics(registry, log)
238+
if has_retention?
239+
require 'fluent/plugin/prometheus/patch_metric_store' # require locally to patch only if necessary
240+
241+
metric = registry.get(@name)
242+
243+
expiration_time = Time.now - @retention
244+
expired_label_sets = @last_modified_store.get_labels_not_modified_since(expiration_time)
245+
246+
expired_label_sets.each { |expired_label_set|
247+
log.debug "Metric #{@name} with labels #{expired_label_set} expired. Removing..."
248+
metric.remove(expired_label_set) # this method is supplied by the require at the top of this method
249+
@last_modified_store.remove(expired_label_set)
250+
}
251+
else
252+
log.warn('remove_expired_metrics should not be called when retention is not set for this metric!')
253+
end
254+
end
255+
256+
class LastModifiedStore
257+
def initialize
258+
@internal_store = Hash.new
259+
@lock = Monitor.new
260+
end
261+
262+
def synchronize
263+
@lock.synchronize { yield }
264+
end
265+
266+
def set_last_updated(labels)
267+
synchronize do
268+
@internal_store[labels] = Time.now
269+
end
270+
end
271+
272+
def remove(labels)
273+
synchronize do
274+
@internal_store.delete(labels)
275+
end
276+
end
277+
278+
def get_labels_not_modified_since(time)
279+
synchronize do
280+
@internal_store.select { |k, v| v < time }.keys
281+
end
282+
end
283+
end
195284
end
196285

197286
class Gauge < Metric
@@ -208,16 +297,17 @@ def initialize(element, registry, labels)
208297
end
209298
end
210299

211-
def instrument(record, expander)
300+
def value(record)
212301
if @key.is_a?(String)
213-
value = record[@key]
302+
record[@key]
214303
else
215-
value = @key.call(record)
216-
end
217-
if value
218-
@gauge.set(value, labels: labels(record, expander))
304+
@key.call(record)
219305
end
220306
end
307+
308+
def set_value(value, labels)
309+
@gauge.set(value, labels: labels)
310+
end
221311
end
222312

223313
class Counter < Metric
@@ -230,20 +320,22 @@ def initialize(element, registry, labels)
230320
end
231321
end
232322

233-
def instrument(record, expander)
234-
# use record value of the key if key is specified, otherwise just increment
323+
def value(record)
235324
if @key.nil?
236-
value = 1
325+
1
237326
elsif @key.is_a?(String)
238-
value = record[@key]
327+
record[@key]
239328
else
240-
value = @key.call(record)
329+
@key.call(record)
241330
end
331+
end
242332

243-
# ignore if record value is nil
244-
return if value.nil?
333+
def set_value?(value)
334+
!value.nil?
335+
end
245336

246-
@counter.increment(by: value, labels: labels(record, expander))
337+
def set_value(value, labels)
338+
@counter.increment(by: value, labels: labels)
247339
end
248340
end
249341

@@ -261,16 +353,17 @@ def initialize(element, registry, labels)
261353
end
262354
end
263355

264-
def instrument(record, expander)
356+
def value(record)
265357
if @key.is_a?(String)
266-
value = record[@key]
358+
record[@key]
267359
else
268-
value = @key.call(record)
269-
end
270-
if value
271-
@summary.observe(value, labels: labels(record, expander))
360+
@key.call(record)
272361
end
273362
end
363+
364+
def set_value(value, labels)
365+
@summary.observe(value, labels: labels)
366+
end
274367
end
275368

276369
class Histogram < Metric
@@ -294,16 +387,17 @@ def initialize(element, registry, labels)
294387
end
295388
end
296389

297-
def instrument(record, expander)
390+
def value(record)
298391
if @key.is_a?(String)
299-
value = record[@key]
392+
record[@key]
300393
else
301-
value = @key.call(record)
302-
end
303-
if value
304-
@histogram.observe(value, labels: labels(record, expander))
394+
@key.call(record)
305395
end
306396
end
397+
398+
def set_value(value, labels)
399+
@histogram.observe(value, labels: labels)
400+
end
307401
end
308402
end
309403
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# The default Prometheus client data store has no means of removing values.
2+
# For the "retention" feature we need to be able to remove metrics with specific labels after some time of inactivity.
3+
# By patching the MetricStore and Metric classes we implement that missing feature.
4+
module Prometheus
5+
module Client
6+
module DataStores
7+
class Synchronized
8+
class MetricStore
9+
def remove(labels:)
10+
synchronize do
11+
@internal_store.delete(labels)
12+
end
13+
end
14+
end
15+
end
16+
end
17+
18+
class Metric
19+
def remove(labels)
20+
label_set = label_set_for(labels)
21+
@store.remove(labels: label_set)
22+
end
23+
end
24+
end
25+
end

0 commit comments

Comments
 (0)