Skip to content

Commit 65a0b4e

Browse files
committed
support custom multiproc metric path
Signed-off-by: iostream96 <jianing0412@hotmail.com>
1 parent 46eae7b commit 65a0b4e

File tree

4 files changed

+50
-36
lines changed

4 files changed

+50
-36
lines changed

prometheus_client/metrics.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from .metrics_core import Metric
1414
from .registry import Collector, CollectorRegistry, REGISTRY
1515
from .samples import Exemplar, Sample
16-
from .utils import floatToGoString, INF
16+
from .utils import floatToGoString, getMultiprocDir, INF
1717
from .validation import (
1818
_validate_exemplar, _validate_labelnames, _validate_metric_name,
1919
)
@@ -108,13 +108,15 @@ def __init__(self: T,
108108
unit: str = '',
109109
registry: Optional[CollectorRegistry] = REGISTRY,
110110
_labelvalues: Optional[Sequence[str]] = None,
111+
multiprocess_dir: Optional[str] = ''
111112
) -> None:
112113
self._name = _build_full_name(self._type, name, namespace, subsystem, unit)
113114
self._labelnames = _validate_labelnames(self, labelnames)
114115
self._labelvalues = tuple(_labelvalues or ())
115116
self._kwargs: Dict[str, Any] = {}
116117
self._documentation = documentation
117118
self._unit = unit
119+
self._multiprocess_dir = multiprocess_dir
118120

119121
_validate_metric_name(self._name)
120122

@@ -182,12 +184,13 @@ def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T:
182184
labelnames=self._labelnames,
183185
unit=self._unit,
184186
_labelvalues=labelvalues,
187+
multiprocess_dir=self._multiprocess_dir,
185188
**self._kwargs
186189
)
187190
return self._metrics[labelvalues]
188191

189192
def remove(self, *labelvalues: Any) -> None:
190-
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
193+
if getMultiprocDir():
191194
warnings.warn(
192195
"Removal of labels has not been implemented in multi-process mode yet.",
193196
UserWarning)
@@ -205,7 +208,7 @@ def remove(self, *labelvalues: Any) -> None:
205208

206209
def clear(self) -> None:
207210
"""Remove all labelsets from the metric"""
208-
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
211+
if getMultiprocDir():
209212
warnings.warn(
210213
"Clearing labels has not been implemented in multi-process mode yet",
211214
UserWarning)
@@ -269,7 +272,7 @@ def f():
269272
# Count only one type of exception
270273
with c.count_exceptions(ValueError):
271274
pass
272-
275+
273276
You can also reset the counter to zero in case your logical "process" restarts
274277
without restarting the actual python process.
275278
@@ -280,7 +283,7 @@ def f():
280283

281284
def _metric_init(self) -> None:
282285
self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames,
283-
self._labelvalues, self._documentation)
286+
self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
284287
self._created = time.time()
285288

286289
def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None:
@@ -369,6 +372,7 @@ def __init__(self,
369372
registry: Optional[CollectorRegistry] = REGISTRY,
370373
_labelvalues: Optional[Sequence[str]] = None,
371374
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
375+
multiprocess_dir: Optional[str] = ''
372376
):
373377
self._multiprocess_mode = multiprocess_mode
374378
if multiprocess_mode not in self._MULTIPROC_MODES:
@@ -382,14 +386,15 @@ def __init__(self,
382386
unit=unit,
383387
registry=registry,
384388
_labelvalues=_labelvalues,
389+
multiprocess_dir=multiprocess_dir
385390
)
386391
self._kwargs['multiprocess_mode'] = self._multiprocess_mode
387392
self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES
388393

389394
def _metric_init(self) -> None:
390395
self._value = values.ValueClass(
391396
self._type, self._name, self._name, self._labelnames, self._labelvalues,
392-
self._documentation, multiprocess_mode=self._multiprocess_mode
397+
self._documentation, multiprocess_mode=self._multiprocess_mode, multiprocess_dir=self._multiprocess_dir
393398
)
394399

395400
def inc(self, amount: float = 1) -> None:
@@ -488,8 +493,9 @@ def create_response(request):
488493

489494
def _metric_init(self) -> None:
490495
self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames,
491-
self._labelvalues, self._documentation)
492-
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
496+
self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
497+
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames,
498+
self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
493499
self._created = time.time()
494500

495501
def observe(self, amount: float) -> None:
@@ -572,6 +578,7 @@ def __init__(self,
572578
registry: Optional[CollectorRegistry] = REGISTRY,
573579
_labelvalues: Optional[Sequence[str]] = None,
574580
buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS,
581+
multiprocess_dir: Optional[str] = ''
575582
):
576583
self._prepare_buckets(buckets)
577584
super().__init__(
@@ -583,6 +590,7 @@ def __init__(self,
583590
unit=unit,
584591
registry=registry,
585592
_labelvalues=_labelvalues,
593+
multiprocess_dir=multiprocess_dir
586594
)
587595
self._kwargs['buckets'] = buckets
588596

@@ -602,15 +610,16 @@ def _metric_init(self) -> None:
602610
self._buckets: List[values.ValueClass] = []
603611
self._created = time.time()
604612
bucket_labelnames = self._labelnames + ('le',)
605-
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
613+
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
606614
for b in self._upper_bounds:
607615
self._buckets.append(values.ValueClass(
608616
self._type,
609617
self._name,
610618
self._name + '_bucket',
611619
bucket_labelnames,
612620
self._labelvalues + (floatToGoString(b),),
613-
self._documentation)
621+
self._documentation,
622+
multiprocess_dir=self._multiprocess_dir)
614623
)
615624

616625
def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None:
@@ -717,6 +726,7 @@ def __init__(self,
717726
registry: Optional[CollectorRegistry] = REGISTRY,
718727
_labelvalues: Optional[Sequence[str]] = None,
719728
states: Optional[Sequence[str]] = None,
729+
multiprocess_dir: Optional[str] = ''
720730
):
721731
super().__init__(
722732
name=name,
@@ -727,6 +737,7 @@ def __init__(self,
727737
unit=unit,
728738
registry=registry,
729739
_labelvalues=_labelvalues,
740+
multiprocess_dir=multiprocess_dir
730741
)
731742
if name in labelnames:
732743
raise ValueError(f'Overlapping labels for Enum metric: {name}')

prometheus_client/multiprocess.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,26 @@
22
import glob
33
import json
44
import os
5-
import warnings
65

76
from .metrics import Gauge
87
from .metrics_core import Metric
98
from .mmap_dict import MmapedDict
109
from .samples import Sample
11-
from .utils import floatToGoString
10+
from .utils import floatToGoString, getMultiprocDir
1211

1312
try: # Python3
1413
FileNotFoundError
1514
except NameError: # Python >= 2.5
1615
FileNotFoundError = IOError
1716

1817

18+
1919
class MultiProcessCollector:
2020
"""Collector for files for multi-process mode."""
2121

2222
def __init__(self, registry, path=None):
2323
if path is None:
24-
# This deprecation warning can go away in a few releases when removing the compatibility
25-
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
26-
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
27-
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
28-
path = os.environ.get('PROMETHEUS_MULTIPROC_DIR')
24+
path = getMultiprocDir()
2925
if not path or not os.path.isdir(path):
3026
raise ValueError('env PROMETHEUS_MULTIPROC_DIR is not set or not a directory')
3127
self._path = path
@@ -164,7 +160,7 @@ def collect(self):
164160
def mark_process_dead(pid, path=None):
165161
"""Do bookkeeping for when one process dies in a multi-process setup."""
166162
if path is None:
167-
path = os.environ.get('PROMETHEUS_MULTIPROC_DIR', os.environ.get('prometheus_multiproc_dir'))
163+
path = getMultiprocDir()
168164
for mode in _LIVE_GAUGE_MULTIPROCESS_MODES:
169165
for f in glob.glob(os.path.join(path, f'gauge_{mode}_{pid}.db')):
170166
os.remove(f)

prometheus_client/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import math
2+
import os
3+
import warnings
24

35
INF = float("inf")
46
MINUS_INF = float("-inf")
@@ -22,3 +24,11 @@ def floatToGoString(d):
2224
mantissa = f'{s[0]}.{s[1:dot]}{s[dot + 1:]}'.rstrip('0.')
2325
return f'{mantissa}e+0{dot - 1}'
2426
return s
27+
28+
29+
30+
def getMultiprocDir() -> str:
31+
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
32+
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
33+
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
34+
return os.environ.get('PROMETHEUS_MULTIPROC_DIR', '')

prometheus_client/values.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import os
22
from threading import Lock
3-
import warnings
43

54
from .mmap_dict import mmap_key, MmapedDict
5+
from .utils import getMultiprocDir
66

77

88
class MutexValue:
@@ -57,30 +57,27 @@ class MmapedValue:
5757

5858
_multiprocess = True
5959

60-
def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs):
61-
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode
62-
# This deprecation warning can go away in a few releases when removing the compatibility
63-
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
64-
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
65-
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
60+
def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', multiprocess_dir='', **kwargs):
61+
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode, multiprocess_dir or getMultiprocDir()
6662
with lock:
6763
self.__check_for_pid_change()
6864
self.__reset()
6965
values.append(self)
7066

7167
def __reset(self):
72-
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params
68+
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode, multiprocess_dir = self._params
7369
if typ == 'gauge':
74-
file_prefix = typ + '_' + multiprocess_mode
70+
file_name = '{}_{}.db'.format('_'.join([typ, multiprocess_mode]), pid['value'])
7571
else:
76-
file_prefix = typ
77-
if file_prefix not in files:
78-
filename = os.path.join(
79-
os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
80-
'{}_{}.db'.format(file_prefix, pid['value']))
81-
82-
files[file_prefix] = MmapedDict(filename)
83-
self._file = files[file_prefix]
72+
file_name = '{}_{}.db'.format(typ, pid['value'])
73+
file_identifier = '_'.join([multiprocess_dir, file_name])
74+
if file_identifier not in files:
75+
os_file = os.path.join(
76+
multiprocess_dir,
77+
file_name
78+
)
79+
files[file_identifier] = MmapedDict(os_file)
80+
self._file = files[file_identifier]
8481
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
8582
self._value, self._timestamp = self._file.read_value(self._key)
8683

@@ -130,7 +127,7 @@ def get_value_class():
130127
# This needs to be chosen before the first metric is constructed,
131128
# and as that may be in some arbitrary library the user/admin has
132129
# no control over we use an environment variable.
133-
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
130+
if getMultiprocDir():
134131
return MultiProcessValue()
135132
else:
136133
return MutexValue

0 commit comments

Comments
 (0)