Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean raw data metrics that do not receive measures #1385

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 100 additions & 6 deletions gnocchi/chef.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import daiquiri

from gnocchi import carbonara
from gnocchi import indexer


LOG = daiquiri.getLogger(__name__)


Expand All @@ -45,9 +45,104 @@ class Chef(object):
def __init__(self, coord, incoming, index, storage):
self.coord = coord
self.incoming = incoming
# This variable is an instance of the indexer,
# which means, database connector.
self.index = index
self.storage = storage

def clean_raw_data_inactive_metrics(self):
"""Cleans metrics raw data if they are inactive.

The truncating of the raw metrics data is only done when new
measures are pushed. Therefore, if no new measures are pushed, and the
archive policy was updated to reduce the backwindow, the raw
datapoints for metrics that are not receiving new datapoints are never
truncated.

The goal of this method is to identify metrics that are in
"inactive state", meaning, not receiving new datapoints, and execute
their raw data points truncation. We check the column
"needs_raw_data_truncation", to determine if the archive policy was
updated, and no measure push was executed for the metric.

If the metric is not receiving new datapoints, the processing workflow
will not mark the column "needs_raw_data_truncation" to False;
therefore, that is how we identify such metrics.
"""

metrics_to_clean = self.index.list_metrics(
attribute_filter={"==": {
"needs_raw_data_truncation": True}}
)

LOG.debug("Metrics [%s] found to execute the raw data cleanup.",
metrics_to_clean)

for metric in metrics_to_clean:
LOG.debug("Executing the raw data cleanup for metric [%s].",
metric)
try:
metrid_id = metric.id
# To properly generate the lock here, we need to use the
# same process as it is done in the measures processing.
# Therefore, we have to use the sack to control the locks
# in this processing here. See 'process_new_measures_for_sack'
# for more details.
sack_for_metric = self.incoming.sack_for_metric(metrid_id)
metric_lock = self.get_sack_lock(sack_for_metric)

if not metric_lock.acquire():
LOG.debug(
"Metric [%s] is locked, cannot clean it up now.",
metric.id)
continue

agg_methods = list(metric.archive_policy.aggregation_methods)
block_size = metric.archive_policy.max_block_size
back_window = metric.archive_policy.back_window

if any(filter(lambda x: x.startswith("rate:"), agg_methods)):
back_window += 1

raw_measure = self.storage. \
_get_or_create_unaggregated_timeseries_unbatched(metric)

if raw_measure:
LOG.debug("Truncating metric [%s] for backwindow [%s].",
metric.id, back_window)

ts = carbonara.BoundTimeSerie.unserialize(raw_measure,
block_size,
back_window)
# Trigger the truncation process to remove the excess of
# raw data according to the updated back_window.
ts._truncate()

self.storage._store_unaggregated_timeseries_unbatched(
metric, ts.serialize())

self.index.update_needs_raw_data_truncation(metric.id)
else:
LOG.debug("No raw measures found for metric [%s] for "
"cleanup.", metric.id)
self.index.update_needs_raw_data_truncation(metric.id)
except Exception:
LOG.error("Unable to lock metric [%s] for cleanup.",
metric, exc_info=True)
continue
finally:
if metric_lock:
metric_lock.release()
LOG.debug("Releasing lock [%s] for metric [%s].",
metric_lock, metric.id)
else:
LOG.debug(
"There is no lock for metric [%s] to be released.",
metric.id)

if metrics_to_clean:
LOG.debug("Cleaned up metrics [%s].", metrics_to_clean)

def expunge_metrics(self, cleanup_batch_size, sync=False):
"""Remove deleted metrics.

Expand Down Expand Up @@ -124,7 +219,7 @@ def refresh_metrics(self, metrics, timeout=None, sync=False):
metrics_by_id[metric_id]: measures
for metric_id, measures
in metrics_and_measures.items()
})
}, self.index)
LOG.debug("Measures for %d metrics processed",
len(metric_ids))
except Exception:
Expand Down Expand Up @@ -165,7 +260,7 @@ def process_new_measures_for_sack(self, sack, blocking=False, sync=False):
self.storage.add_measures_to_metrics({
metric: measures[metric.id]
for metric in metrics
})
}, self.index)
LOG.debug("Measures for %d metrics processed",
len(metrics))
return len(measures)
Expand All @@ -180,7 +275,6 @@ def process_new_measures_for_sack(self, sack, blocking=False, sync=False):
def get_sack_lock(self, sack):
# FIXME(jd) Some tooz drivers have a limitation on lock name length
# (e.g. MySQL). This should be handled by tooz, but it's not yet.
lock_name = hashlib.new(
'sha1',
('gnocchi-sack-%s-lock' % str(sack)).encode()).hexdigest().encode()
lock_name = ('gnocchi-sack-%s-lock' % str(sack)).encode()
lock_name = hashlib.new('sha1', lock_name).hexdigest().encode()
return self.coord.get_lock(lock_name)
1 change: 1 addition & 0 deletions gnocchi/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def prepare_service(conf=None):

opts.set_defaults()
policy_opts.set_defaults(conf, 'policy.yaml')

conf = service.prepare_service(conf=conf)
return conf

Expand Down
14 changes: 11 additions & 3 deletions gnocchi/cli/metricd.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def _configure(self):
self.conf.coordination_url)
self.store = storage.get_driver(self.conf)
self.incoming = incoming.get_driver(self.conf)
self.index = indexer.get_driver(self.conf)
self.indexer = indexer.get_driver(self.conf)
self.chef = chef.Chef(self.coord, self.incoming,
self.index, self.store)
self.indexer, self.store)

def run(self):
self._configure()
Expand Down Expand Up @@ -267,9 +267,17 @@ def __init__(self, worker_id, conf):
worker_id, conf, conf.metricd.metric_cleanup_delay)

def _run_job(self):
LOG.debug("Cleaning up deleted metrics with batch size [%s].",
self.conf.metricd.cleanup_batch_size)
self.chef.expunge_metrics(self.conf.metricd.cleanup_batch_size)
LOG.debug("Metrics marked for deletion removed from backend")

LOG.debug("Starting the cleaning of raw data points for metrics that "
"are no longer receiving measures.")
self.chef.clean_raw_data_inactive_metrics()
LOG.debug("Finished the cleaning of raw data points for metrics that "
"are no longer receiving measures.")


class MetricdServiceManager(cotyledon.ServiceManager):
def __init__(self, conf):
Expand All @@ -288,7 +296,7 @@ def __init__(self, conf):

def on_reload(self):
# NOTE(sileht): We do not implement reload() in Workers so all workers
# will received SIGHUP and exit gracefully, then their will be
# will receive SIGHUP and exit gracefully, then their will be
# restarted with the new number of workers. This is important because
# we use the number of worker to declare the capability in tooz and
# to select the block of metrics to proceed.
Expand Down
6 changes: 6 additions & 0 deletions gnocchi/incoming/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,15 @@ def process_measures_for_sack(self, sack):
sack, metric_id)
processed_files[metric_id] = files
m = self._make_measures_array()

count = 0
total_files = len(files)
for f in files:
count = count + 1
abspath = self._build_measure_path(metric_id, f)
with open(abspath, "rb") as e:
LOG.debug("(%s/%s) Reading metric file [%s].",
count, total_files, abspath)
m = numpy.concatenate((
m, self._unserialize_measures(f, e.read())))
measures[metric_id] = m
Expand Down
19 changes: 15 additions & 4 deletions gnocchi/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ def lastmodified(self):

class Metric(object):
def __init__(self, id, archive_policy, creator=None,
name=None, resource_id=None):
name=None, resource_id=None, needs_raw_data_truncation=False):
self.id = id
self.archive_policy = archive_policy
self.creator = creator
self.name = name
self.resource_id = resource_id
self.needs_raw_data_truncation = needs_raw_data_truncation

def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.id)
Expand All @@ -88,12 +89,13 @@ def __str__(self):
return str(self.id)

def __eq__(self, other):
return (isinstance(other, Metric)
and self.id == other.id
return (isinstance(other, Metric) and self.id == other.id
and self.archive_policy == other.archive_policy
and self.creator == other.creator
and self.name == other.name
and self.resource_id == other.resource_id)
and self.resource_id == other.resource_id
and self.needs_raw_data_truncation ==
other.needs_raw_data_truncation)

__hash__ = object.__hash__

Expand Down Expand Up @@ -438,6 +440,15 @@ def delete_resources(resource_type='generic',
def delete_metric(id):
raise exceptions.NotImplementedError

@staticmethod
def update_backwindow_changed_for_metrics_archive_policy(
archive_policy_name):
raise exceptions.NotImplementedError

@staticmethod
def update_needs_raw_data_truncation(metric_id):
raise exceptions.NotImplementedError

@staticmethod
def expunge_metric(id):
raise exceptions.NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""create metric truncation status column

Revision ID: 18fff4509e3e
Revises: 04eba72e4f90
Create Date: 2024-04-24 09:16:00

"""
import datetime

from alembic import op
from sqlalchemy.sql import func

import sqlalchemy

# revision identifiers, used by Alembic.
revision = '18fff4509e3e'
down_revision = '04eba72e4f90'
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"metric", sqlalchemy.Column(
"needs_raw_data_truncation", sqlalchemy.Boolean,
nullable=False, default=True,
server_default=sqlalchemy.sql.true()))
19 changes: 19 additions & 0 deletions gnocchi/indexer/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ def list_metrics(self, details=False, status='active',
sort_keys=sort_keys,
marker=metric_marker,
sort_dirs=sort_dirs)

except ValueError as e:
raise indexer.InvalidPagination(e)
except exception.InvalidSortKey as e:
Expand Down Expand Up @@ -1395,6 +1396,24 @@ def delete_metric(self, id):
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(id)

def update_needs_raw_data_truncation(self, metrid_id, value=False):
with self.facade.writer() as session:
stmt = update(Metric).filter(Metric.id == metrid_id).values(
needs_raw_data_truncation=value)
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_backwindow_changed_for_metrics_archive_policy(
self, archive_policy_name):
with self.facade.writer() as session:
stmt = update(Metric).filter(
Metric.archive_policy_name == archive_policy_name).values(
needs_raw_data_truncation=True)
if session.execute(stmt).rowcount == 0:
LOG.info("No metric was updated for archive_policy [%s]. "
"This might indicate that the archive policy is not "
"used by any metric.", archive_policy_name)

@staticmethod
def _build_sort_keys(sorts, unique_keys):
# transform the api-wg representation to the oslo.db one
Expand Down
7 changes: 7 additions & 0 deletions gnocchi/indexer/sqlalchemy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy.orm import declarative_base

import sqlalchemy_utils

from gnocchi import archive_policy
Expand Down Expand Up @@ -99,13 +100,19 @@ class Metric(Base, GnocchiBase, indexer.Metric):
sqlalchemy.ForeignKey('resource.id',
ondelete="SET NULL",
name="fk_metric_resource_id_resource_id"))

name = sqlalchemy.Column(sqlalchemy.String(255))
unit = sqlalchemy.Column(sqlalchemy.String(31))
status = sqlalchemy.Column(sqlalchemy.Enum('active', 'delete',
name="metric_status_enum"),
nullable=False,
server_default='active')

needs_raw_data_truncation = sqlalchemy.Column(
"needs_raw_data_truncation", sqlalchemy.Boolean,
nullable=False, default=True,
server_default=sqlalchemy.sql.true())

def jsonify(self):
d = {
"id": self.id,
Expand Down
2 changes: 1 addition & 1 deletion gnocchi/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def list_opts():
default=10000,
min=1,
help="Number of metrics that should be deleted "
"simultaneously by one janitor."),
"simultaneously by one janitor.")
)),
("api", (
cfg.StrOpt('paste_config',
Expand Down
Loading
Loading