Skip to content

Commit

Permalink
Merge pull request #1381 from rafaelweingartner/improve-gnocchi-ceph-…
Browse files Browse the repository at this point in the history
…read-IOPs-usage

Address high IOPs usage of the Gnocchi Ceph pool
  • Loading branch information
tobias-urdin authored May 31, 2024
2 parents cbb3836 + 8a0dcec commit 88ee87d
Showing 1 changed file with 42 additions and 6 deletions.
48 changes: 42 additions & 6 deletions gnocchi/storage/ceph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import daiquiri

from oslo_config import cfg

Expand Down Expand Up @@ -42,6 +43,11 @@

rados = ceph.rados

LOG = daiquiri.getLogger(__name__)

DEFAULT_RADOS_BUFFER_SIZE = 8192
MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE = {}


class CephStorage(storage.StorageDriver):
WRITE_FULL = False
Expand Down Expand Up @@ -88,6 +94,13 @@ def _store_metric_splits(self, metrics_keys_aggregations_data_offset,
for key, agg, data, offset in keys_aggregations_data_offset:
name = self._get_object_name(
metric, key, agg.method, version)
metric_size = len(data)

if metric_size > DEFAULT_RADOS_BUFFER_SIZE:
MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE[name] = metric_size
LOG.debug(
"Storing time series size [%s] for metric [%s].",
metric_size, name)
if offset is None:
self.ioctx.write_full(name, data)
else:
Expand Down Expand Up @@ -153,7 +166,14 @@ def _get_splits_unbatched(self, metric, key, aggregation, version=3):
try:
name = self._get_object_name(
metric, key, aggregation.method, version)
return self._get_object_content(name)

metric_size = MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE.get(
name, DEFAULT_RADOS_BUFFER_SIZE)

LOG.debug("Reading metric [%s] with buffer size of [%s].",
name, metric_size)

return self._get_object_content(name, buffer_size=metric_size)
except rados.ObjectNotFound:
return

Expand Down Expand Up @@ -206,9 +226,16 @@ def _build_unaggregated_timeserie_path(metric, version):

def _get_or_create_unaggregated_timeseries_unbatched(
self, metric, version=3):
metric_name = self._build_unaggregated_timeserie_path(metric, version)
metric_size = MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE.get(
metric_name, DEFAULT_RADOS_BUFFER_SIZE)

LOG.debug("Reading unaggregated metric [%s] with buffer size of [%s].",
metric_name, metric_size)

try:
contents = self._get_object_content(
self._build_unaggregated_timeserie_path(metric, version))
metric_name, buffer_size=metric_size)
except rados.ObjectNotFound:
self._create_metric(metric)
else:
Expand All @@ -218,14 +245,23 @@ def _get_or_create_unaggregated_timeseries_unbatched(

def _store_unaggregated_timeseries_unbatched(
self, metric, data, version=3):
self.ioctx.write_full(
self._build_unaggregated_timeserie_path(metric, version), data)

def _get_object_content(self, name):
metric_name = self._build_unaggregated_timeserie_path(metric, version)
metric_size = len(data)

if metric_size > DEFAULT_RADOS_BUFFER_SIZE:
MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE[metric_name] = metric_size
LOG.debug(
"Storing unaggregated time series size [%s] for metric [%s]",
metric_size, metric_name)
self.ioctx.write_full(metric_name, data)

def _get_object_content(self, name, buffer_size=DEFAULT_RADOS_BUFFER_SIZE):
offset = 0
content = b''

while True:
data = self.ioctx.read(name, offset=offset)
data = self.ioctx.read(name, length=buffer_size, offset=offset)
if not data:
break
content += data
Expand Down

0 comments on commit 88ee87d

Please sign in to comment.