diff --git a/prometheus_client/metrics.py b/prometheus_client/metrics.py index b9f25ffc..1bae4beb 100644 --- a/prometheus_client/metrics.py +++ b/prometheus_client/metrics.py @@ -13,7 +13,7 @@ from .metrics_core import Metric from .registry import Collector, CollectorRegistry, REGISTRY from .samples import Exemplar, Sample -from .utils import floatToGoString, INF +from .utils import _getMultiprocDir, floatToGoString, INF from .validation import ( _validate_exemplar, _validate_labelnames, _validate_metric_name, ) @@ -108,6 +108,7 @@ def __init__(self: T, unit: str = '', registry: Optional[CollectorRegistry] = REGISTRY, _labelvalues: Optional[Sequence[str]] = None, + multiprocess_dir: Optional[str] = '' ) -> None: self._name = _build_full_name(self._type, name, namespace, subsystem, unit) self._labelnames = _validate_labelnames(self, labelnames) @@ -115,6 +116,7 @@ def __init__(self: T, self._kwargs: Dict[str, Any] = {} self._documentation = documentation self._unit = unit + self._multiprocess_dir = multiprocess_dir _validate_metric_name(self._name) @@ -182,12 +184,13 @@ def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: labelnames=self._labelnames, unit=self._unit, _labelvalues=labelvalues, + multiprocess_dir=self._multiprocess_dir, **self._kwargs ) return self._metrics[labelvalues] def remove(self, *labelvalues: Any) -> None: - if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: + if _getMultiprocDir(): warnings.warn( "Removal of labels has not been implemented in multi-process mode yet.", UserWarning) @@ -205,7 +208,7 @@ def remove(self, *labelvalues: Any) -> None: def clear(self) -> None: """Remove all labelsets from the metric""" - if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: + if _getMultiprocDir(): warnings.warn( "Clearing labels has not been implemented in multi-process mode yet", UserWarning) @@ -269,7 +272,7 @@ def f(): # Count only one type of exception with c.count_exceptions(ValueError): pass - + You can also reset the counter to zero in case your logical "process" restarts without restarting the actual python process. @@ -280,7 +283,7 @@ def f(): def _metric_init(self) -> None: self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames, - self._labelvalues, self._documentation) + self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir) self._created = time.time() def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None: @@ -369,6 +372,7 @@ def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY, _labelvalues: Optional[Sequence[str]] = None, multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all', + multiprocess_dir: Optional[str] = '' ): self._multiprocess_mode = multiprocess_mode if multiprocess_mode not in self._MULTIPROC_MODES: @@ -382,6 +386,7 @@ def __init__(self, unit=unit, registry=registry, _labelvalues=_labelvalues, + multiprocess_dir=multiprocess_dir ) self._kwargs['multiprocess_mode'] = self._multiprocess_mode self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES @@ -389,7 +394,7 @@ def __init__(self, def _metric_init(self) -> None: self._value = values.ValueClass( self._type, self._name, self._name, self._labelnames, self._labelvalues, - self._documentation, multiprocess_mode=self._multiprocess_mode + self._documentation, multiprocess_mode=self._multiprocess_mode, multiprocess_dir=self._multiprocess_dir ) def inc(self, amount: float = 1) -> None: @@ -488,8 +493,9 @@ def create_response(request): def _metric_init(self) -> None: self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames, - self._labelvalues, self._documentation) - self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) + self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir) + self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, + self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir) self._created = time.time() def observe(self, amount: float) -> None: @@ -572,6 +578,7 @@ def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY, _labelvalues: Optional[Sequence[str]] = None, buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, + multiprocess_dir: Optional[str] = '' ): self._prepare_buckets(buckets) super().__init__( @@ -583,6 +590,7 @@ def __init__(self, unit=unit, registry=registry, _labelvalues=_labelvalues, + multiprocess_dir=multiprocess_dir ) self._kwargs['buckets'] = buckets @@ -602,7 +610,7 @@ def _metric_init(self) -> None: self._buckets: List[values.ValueClass] = [] self._created = time.time() bucket_labelnames = self._labelnames + ('le',) - self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) + self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir) for b in self._upper_bounds: self._buckets.append(values.ValueClass( self._type, @@ -610,7 +618,8 @@ def _metric_init(self) -> None: self._name + '_bucket', bucket_labelnames, self._labelvalues + (floatToGoString(b),), - self._documentation) + self._documentation, + multiprocess_dir=self._multiprocess_dir) ) def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None: @@ -717,6 +726,7 @@ def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY, _labelvalues: Optional[Sequence[str]] = None, states: Optional[Sequence[str]] = None, + multiprocess_dir: Optional[str] = '' ): super().__init__( name=name, @@ -727,6 +737,7 @@ def __init__(self, unit=unit, registry=registry, _labelvalues=_labelvalues, + multiprocess_dir=multiprocess_dir ) if name in labelnames: raise ValueError(f'Overlapping labels for Enum metric: {name}') diff --git a/prometheus_client/multiprocess.py b/prometheus_client/multiprocess.py index 2682190a..79b7b0e9 100644 --- a/prometheus_client/multiprocess.py +++ b/prometheus_client/multiprocess.py @@ -2,13 +2,12 @@ import glob import json import os -import warnings from .metrics import Gauge from .metrics_core import Metric from .mmap_dict import MmapedDict from .samples import Sample -from .utils import floatToGoString +from .utils import _getMultiprocDir, floatToGoString try: # Python3 FileNotFoundError @@ -16,16 +15,13 @@ FileNotFoundError = IOError + class MultiProcessCollector: """Collector for files for multi-process mode.""" def __init__(self, registry, path=None): if path is None: - # This deprecation warning can go away in a few releases when removing the compatibility - if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ: - os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir'] - warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning) - path = os.environ.get('PROMETHEUS_MULTIPROC_DIR') + path = _getMultiprocDir() if not path or not os.path.isdir(path): raise ValueError('env PROMETHEUS_MULTIPROC_DIR is not set or not a directory') self._path = path @@ -164,7 +160,7 @@ def collect(self): def mark_process_dead(pid, path=None): """Do bookkeeping for when one process dies in a multi-process setup.""" if path is None: - path = os.environ.get('PROMETHEUS_MULTIPROC_DIR', os.environ.get('prometheus_multiproc_dir')) + path = _getMultiprocDir() for mode in _LIVE_GAUGE_MULTIPROCESS_MODES: for f in glob.glob(os.path.join(path, f'gauge_{mode}_{pid}.db')): os.remove(f) diff --git a/prometheus_client/utils.py b/prometheus_client/utils.py index 0d2b0948..e36f4444 100644 --- a/prometheus_client/utils.py +++ b/prometheus_client/utils.py @@ -1,4 +1,5 @@ import math +import os INF = float("inf") MINUS_INF = float("-inf") @@ -22,3 +23,8 @@ def floatToGoString(d): mantissa = f'{s[0]}.{s[1:dot]}{s[dot + 1:]}'.rstrip('0.') return f'{mantissa}e+0{dot - 1}' return s + + + +def _getMultiprocDir() -> str: + return os.environ.get('PROMETHEUS_MULTIPROC_DIR', '') or os.environ.get('prometheus_multiproc_dir', '') diff --git a/prometheus_client/values.py b/prometheus_client/values.py index 6ff85e3b..17d08c60 100644 --- a/prometheus_client/values.py +++ b/prometheus_client/values.py @@ -1,8 +1,8 @@ import os from threading import Lock -import warnings from .mmap_dict import mmap_key, MmapedDict +from .utils import _getMultiprocDir class MutexValue: @@ -57,30 +57,27 @@ class MmapedValue: _multiprocess = True - def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs): - self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode - # This deprecation warning can go away in a few releases when removing the compatibility - if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ: - os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir'] - warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning) + def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', multiprocess_dir='', **kwargs): + self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode, multiprocess_dir or _getMultiprocDir() with lock: self.__check_for_pid_change() self.__reset() values.append(self) def __reset(self): - typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params + typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode, multiprocess_dir = self._params if typ == 'gauge': - file_prefix = typ + '_' + multiprocess_mode + file_name = '{}_{}.db'.format('_'.join([typ, multiprocess_mode]), pid['value']) else: - file_prefix = typ - if file_prefix not in files: - filename = os.path.join( - os.environ.get('PROMETHEUS_MULTIPROC_DIR'), - '{}_{}.db'.format(file_prefix, pid['value'])) - - files[file_prefix] = MmapedDict(filename) - self._file = files[file_prefix] + file_name = '{}_{}.db'.format(typ, pid['value']) + file_identifier = '_'.join([multiprocess_dir, file_name]) + if file_identifier not in files: + os_file = os.path.join( + multiprocess_dir, + file_name + ) + files[file_identifier] = MmapedDict(os_file) + self._file = files[file_identifier] self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text) self._value, self._timestamp = self._file.read_value(self._key) @@ -130,7 +127,7 @@ def get_value_class(): # This needs to be chosen before the first metric is constructed, # and as that may be in some arbitrary library the user/admin has # no control over we use an environment variable. - if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: + if _getMultiprocDir(): return MultiProcessValue() else: return MutexValue diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index 77fd3d81..31715062 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -17,7 +17,7 @@ ) -class TestMultiProcessDeprecation(unittest.TestCase): +class TestMultiProcessLowercase(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() @@ -27,19 +27,6 @@ def tearDown(self): values.ValueClass = MutexValue shutil.rmtree(self.tempdir) - def test_deprecation_warning(self): - os.environ['prometheus_multiproc_dir'] = self.tempdir - with warnings.catch_warnings(record=True) as w: - values.ValueClass = get_value_class() - registry = CollectorRegistry() - collector = MultiProcessCollector(registry) - Counter('c', 'help', registry=None) - - assert os.environ['PROMETHEUS_MULTIPROC_DIR'] == self.tempdir - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "PROMETHEUS_MULTIPROC_DIR" in str(w[-1].message) - def test_mark_process_dead_respects_lowercase(self): os.environ['prometheus_multiproc_dir'] = self.tempdir # Just test that this does not raise with a lowercase env var. The @@ -50,6 +37,7 @@ def test_mark_process_dead_respects_lowercase(self): class TestMultiProcess(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() + self.tempdir_multiproc = tempfile.mkdtemp() os.environ['PROMETHEUS_MULTIPROC_DIR'] = self.tempdir values.ValueClass = MultiProcessValue(lambda: 123) self.registry = CollectorRegistry() @@ -62,6 +50,7 @@ def _value_class(self): def tearDown(self): del os.environ['PROMETHEUS_MULTIPROC_DIR'] shutil.rmtree(self.tempdir) + shutil.rmtree(self.tempdir_multiproc) values.ValueClass = MutexValue def test_counter_adds(self): @@ -397,6 +386,37 @@ def test_remove_clear_warning(self): assert issubclass(w[-1].category, UserWarning) assert "Clearing labels has not been implemented" in str(w[-1].message) + def test_multiproc_dir(self): + pid = 0 + values.ValueClass = MultiProcessValue(lambda: pid) + + def files(path): + fs = os.listdir(path) + fs.sort() + return fs + + s1 = Summary('s1', 's1', registry=None) + c1 = Counter('c1', 'c1', registry=None, multiprocess_dir=self.tempdir_multiproc) + + self.assertEqual(files(self.tempdir), ['summary_0.db']) + self.assertEqual(files(self.tempdir_multiproc), ['counter_0.db']) + + pid = 1 + c2 = Counter('c2', 'c2', registry=None, multiprocess_dir=self.tempdir_multiproc) + + self.assertEqual(files(self.tempdir_multiproc), ['counter_0.db', 'counter_1.db']) + + c1.inc(1) + + metrics = {m.name: m for m in self.collector.collect()} + self.assertNotIn('c1', metrics) + + multiproc_collector = MultiProcessCollector(self.registry, path=self.tempdir_multiproc) + multiproc_metrics = {m.name: m for m in multiproc_collector.collect()} + self.assertEqual( + multiproc_metrics['c1'].samples, [Sample('c1_total', {}, 1.0)] + ) + class TestMmapedDict(unittest.TestCase): def setUp(self):