Skip to content

supporting custom multiprocess metric path #1094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions prometheus_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .metrics_core import Metric
from .registry import Collector, CollectorRegistry, REGISTRY
from .samples import Exemplar, Sample
from .utils import floatToGoString, INF
from .utils import _getMultiprocDir, floatToGoString, INF
from .validation import (
_validate_exemplar, _validate_labelnames, _validate_metric_name,
)
Expand Down Expand Up @@ -108,13 +108,15 @@ def __init__(self: T,
unit: str = '',
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
multiprocess_dir: Optional[str] = ''
) -> None:
self._name = _build_full_name(self._type, name, namespace, subsystem, unit)
self._labelnames = _validate_labelnames(self, labelnames)
self._labelvalues = tuple(_labelvalues or ())
self._kwargs: Dict[str, Any] = {}
self._documentation = documentation
self._unit = unit
self._multiprocess_dir = multiprocess_dir

_validate_metric_name(self._name)

Expand Down Expand Up @@ -182,12 +184,13 @@ def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T:
labelnames=self._labelnames,
unit=self._unit,
_labelvalues=labelvalues,
multiprocess_dir=self._multiprocess_dir,
**self._kwargs
)
return self._metrics[labelvalues]

def remove(self, *labelvalues: Any) -> None:
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
if _getMultiprocDir():
warnings.warn(
"Removal of labels has not been implemented in multi-process mode yet.",
UserWarning)
Expand All @@ -205,7 +208,7 @@ def remove(self, *labelvalues: Any) -> None:

def clear(self) -> None:
"""Remove all labelsets from the metric"""
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
if _getMultiprocDir():
warnings.warn(
"Clearing labels has not been implemented in multi-process mode yet",
UserWarning)
Expand Down Expand Up @@ -269,7 +272,7 @@ def f():
# Count only one type of exception
with c.count_exceptions(ValueError):
pass

You can also reset the counter to zero in case your logical "process" restarts
without restarting the actual python process.

Expand All @@ -280,7 +283,7 @@ def f():

def _metric_init(self) -> None:
self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames,
self._labelvalues, self._documentation)
self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
self._created = time.time()

def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None:
Expand Down Expand Up @@ -369,6 +372,7 @@ def __init__(self,
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
multiprocess_dir: Optional[str] = ''
):
self._multiprocess_mode = multiprocess_mode
if multiprocess_mode not in self._MULTIPROC_MODES:
Expand All @@ -382,14 +386,15 @@ def __init__(self,
unit=unit,
registry=registry,
_labelvalues=_labelvalues,
multiprocess_dir=multiprocess_dir
)
self._kwargs['multiprocess_mode'] = self._multiprocess_mode
self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES

def _metric_init(self) -> None:
self._value = values.ValueClass(
self._type, self._name, self._name, self._labelnames, self._labelvalues,
self._documentation, multiprocess_mode=self._multiprocess_mode
self._documentation, multiprocess_mode=self._multiprocess_mode, multiprocess_dir=self._multiprocess_dir
)

def inc(self, amount: float = 1) -> None:
Expand Down Expand Up @@ -488,8 +493,9 @@ def create_response(request):

def _metric_init(self) -> None:
self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames,
self._labelvalues, self._documentation)
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames,
self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
self._created = time.time()

def observe(self, amount: float) -> None:
Expand Down Expand Up @@ -572,6 +578,7 @@ def __init__(self,
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS,
multiprocess_dir: Optional[str] = ''
):
self._prepare_buckets(buckets)
super().__init__(
Expand All @@ -583,6 +590,7 @@ def __init__(self,
unit=unit,
registry=registry,
_labelvalues=_labelvalues,
multiprocess_dir=multiprocess_dir
)
self._kwargs['buckets'] = buckets

Expand All @@ -602,15 +610,16 @@ def _metric_init(self) -> None:
self._buckets: List[values.ValueClass] = []
self._created = time.time()
bucket_labelnames = self._labelnames + ('le',)
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation, multiprocess_dir=self._multiprocess_dir)
for b in self._upper_bounds:
self._buckets.append(values.ValueClass(
self._type,
self._name,
self._name + '_bucket',
bucket_labelnames,
self._labelvalues + (floatToGoString(b),),
self._documentation)
self._documentation,
multiprocess_dir=self._multiprocess_dir)
)

def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None:
Expand Down Expand Up @@ -717,6 +726,7 @@ def __init__(self,
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
states: Optional[Sequence[str]] = None,
multiprocess_dir: Optional[str] = ''
):
super().__init__(
name=name,
Expand All @@ -727,6 +737,7 @@ def __init__(self,
unit=unit,
registry=registry,
_labelvalues=_labelvalues,
multiprocess_dir=multiprocess_dir
)
if name in labelnames:
raise ValueError(f'Overlapping labels for Enum metric: {name}')
Expand Down
12 changes: 4 additions & 8 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,26 @@
import glob
import json
import os
import warnings

from .metrics import Gauge
from .metrics_core import Metric
from .mmap_dict import MmapedDict
from .samples import Sample
from .utils import floatToGoString
from .utils import _getMultiprocDir, floatToGoString

try: # Python3
FileNotFoundError
except NameError: # Python >= 2.5
FileNotFoundError = IOError



class MultiProcessCollector:
"""Collector for files for multi-process mode."""

def __init__(self, registry, path=None):
if path is None:
# This deprecation warning can go away in a few releases when removing the compatibility
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
path = os.environ.get('PROMETHEUS_MULTIPROC_DIR')
path = _getMultiprocDir()
if not path or not os.path.isdir(path):
raise ValueError('env PROMETHEUS_MULTIPROC_DIR is not set or not a directory')
self._path = path
Expand Down Expand Up @@ -164,7 +160,7 @@ def collect(self):
def mark_process_dead(pid, path=None):
"""Do bookkeeping for when one process dies in a multi-process setup."""
if path is None:
path = os.environ.get('PROMETHEUS_MULTIPROC_DIR', os.environ.get('prometheus_multiproc_dir'))
path = _getMultiprocDir()
for mode in _LIVE_GAUGE_MULTIPROCESS_MODES:
for f in glob.glob(os.path.join(path, f'gauge_{mode}_{pid}.db')):
os.remove(f)
6 changes: 6 additions & 0 deletions prometheus_client/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
import os

INF = float("inf")
MINUS_INF = float("-inf")
Expand All @@ -22,3 +23,8 @@ def floatToGoString(d):
mantissa = f'{s[0]}.{s[1:dot]}{s[dot + 1:]}'.rstrip('0.')
return f'{mantissa}e+0{dot - 1}'
return s



def _getMultiprocDir() -> str:
return os.environ.get('PROMETHEUS_MULTIPROC_DIR', '') or os.environ.get('prometheus_multiproc_dir', '')
33 changes: 15 additions & 18 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
from threading import Lock
import warnings

from .mmap_dict import mmap_key, MmapedDict
from .utils import _getMultiprocDir


class MutexValue:
Expand Down Expand Up @@ -57,30 +57,27 @@ class MmapedValue:

_multiprocess = True

def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode
# This deprecation warning can go away in a few releases when removing the compatibility
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', multiprocess_dir='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode, multiprocess_dir or _getMultiprocDir()
with lock:
self.__check_for_pid_change()
self.__reset()
values.append(self)

def __reset(self):
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode, multiprocess_dir = self._params
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
file_name = '{}_{}.db'.format('_'.join([typ, multiprocess_mode]), pid['value'])
else:
file_prefix = typ
if file_prefix not in files:
filename = os.path.join(
os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
'{}_{}.db'.format(file_prefix, pid['value']))

files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
file_name = '{}_{}.db'.format(typ, pid['value'])
file_identifier = '_'.join([multiprocess_dir, file_name])
if file_identifier not in files:
os_file = os.path.join(
multiprocess_dir,
file_name
)
files[file_identifier] = MmapedDict(os_file)
self._file = files[file_identifier]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value, self._timestamp = self._file.read_value(self._key)

Expand Down Expand Up @@ -130,7 +127,7 @@ def get_value_class():
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an environment variable.
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
if _getMultiprocDir():
return MultiProcessValue()
else:
return MutexValue
Expand Down
48 changes: 34 additions & 14 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)


class TestMultiProcessDeprecation(unittest.TestCase):
class TestMultiProcessLowercase(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()

Expand All @@ -27,19 +27,6 @@ def tearDown(self):
values.ValueClass = MutexValue
shutil.rmtree(self.tempdir)

def test_deprecation_warning(self):
os.environ['prometheus_multiproc_dir'] = self.tempdir
with warnings.catch_warnings(record=True) as w:
values.ValueClass = get_value_class()
registry = CollectorRegistry()
collector = MultiProcessCollector(registry)
Counter('c', 'help', registry=None)

assert os.environ['PROMETHEUS_MULTIPROC_DIR'] == self.tempdir
assert len(w) == 1
assert issubclass(w[-1].category, DeprecationWarning)
assert "PROMETHEUS_MULTIPROC_DIR" in str(w[-1].message)

def test_mark_process_dead_respects_lowercase(self):
os.environ['prometheus_multiproc_dir'] = self.tempdir
# Just test that this does not raise with a lowercase env var. The
Expand All @@ -50,6 +37,7 @@ def test_mark_process_dead_respects_lowercase(self):
class TestMultiProcess(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.tempdir_multiproc = tempfile.mkdtemp()
os.environ['PROMETHEUS_MULTIPROC_DIR'] = self.tempdir
values.ValueClass = MultiProcessValue(lambda: 123)
self.registry = CollectorRegistry()
Expand All @@ -62,6 +50,7 @@ def _value_class(self):
def tearDown(self):
del os.environ['PROMETHEUS_MULTIPROC_DIR']
shutil.rmtree(self.tempdir)
shutil.rmtree(self.tempdir_multiproc)
values.ValueClass = MutexValue

def test_counter_adds(self):
Expand Down Expand Up @@ -397,6 +386,37 @@ def test_remove_clear_warning(self):
assert issubclass(w[-1].category, UserWarning)
assert "Clearing labels has not been implemented" in str(w[-1].message)

def test_multiproc_dir(self):
pid = 0
values.ValueClass = MultiProcessValue(lambda: pid)

def files(path):
fs = os.listdir(path)
fs.sort()
return fs

s1 = Summary('s1', 's1', registry=None)
c1 = Counter('c1', 'c1', registry=None, multiprocess_dir=self.tempdir_multiproc)

self.assertEqual(files(self.tempdir), ['summary_0.db'])
self.assertEqual(files(self.tempdir_multiproc), ['counter_0.db'])

pid = 1
c2 = Counter('c2', 'c2', registry=None, multiprocess_dir=self.tempdir_multiproc)

self.assertEqual(files(self.tempdir_multiproc), ['counter_0.db', 'counter_1.db'])

c1.inc(1)

metrics = {m.name: m for m in self.collector.collect()}
self.assertNotIn('c1', metrics)

multiproc_collector = MultiProcessCollector(self.registry, path=self.tempdir_multiproc)
multiproc_metrics = {m.name: m for m in multiproc_collector.collect()}
self.assertEqual(
multiproc_metrics['c1'].samples, [Sample('c1_total', {}, 1.0)]
)


class TestMmapedDict(unittest.TestCase):
def setUp(self):
Expand Down