Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically detect deleted resources #1386

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
74 changes: 74 additions & 0 deletions gnocchi/chef.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import hashlib

import daiquiri
import random

from collections import defaultdict
from gnocchi import carbonara
from gnocchi import indexer
from gnocchi import utils

LOG = daiquiri.getLogger(__name__)

Expand Down Expand Up @@ -51,6 +54,77 @@ def __init__(self, coord, incoming, index, storage):
self.index = index
self.storage = storage

def resource_ended_at_normalization(self, metric_inactive_after):
"""Marks resources as ended at if needed.

This method will check all metrics that have not received new
datapoints after a given period. The period is defined by
'metric_inactive_after' parameter. If all metrics of resource are in
inactive state, we mark the ended_at field with a timestmap. Therefore,
we consider that the resource has ceased existing.

In this process we will handle only metrics that are considered as
inactive, according to `metric_inactive_after` parameter. Therefore,
we do not need to lock these metrics while processing, as they are
inactive, and chances are that they will not receive measures anymore.
Moreover, we are only touching metadata, and not the actual data.
"""

moment_now = utils.utcnow()
moment = moment_now - datetime.timedelta(
seconds=metric_inactive_after)

inactive_metrics = self.index.list_metrics(
attribute_filter={"<": {
"last_measure_timestamp": moment}},
resource_policy_filter={"==": {"ended_at": None}}
)

LOG.debug("Inactive metrics found for processing: [%s].",
inactive_metrics)

inactive_metrics_by_resource_id = defaultdict(list)
for metric in inactive_metrics:
resource_id = metric.resource_id
inactive_metrics_by_resource_id[resource_id].append(metric)

for resource_id in inactive_metrics_by_resource_id.keys():
if resource_id is None:
LOG.debug("We do not need to process inactive metrics that do "
"not have resource. Therefore, these metrics [%s] "
"will be considered inactive, but there is nothing "
"else we can do in this process.",
inactive_metrics_by_resource_id[resource_id])
continue
resource = self.index.get_resource(
"generic", resource_id, with_metrics=True)
resource_metrics = resource.metrics
resource_inactive_metrics = inactive_metrics_by_resource_id.get(resource_id)

all_metrics_are_inactive = True
for m in resource_metrics:
if m not in resource_inactive_metrics:
all_metrics_are_inactive = False
LOG.debug("Not all metrics of resource [%s] are inactive. "
"Metric [%s] is not inactive. The inactive "
"metrics are [%s].",
resource, m, resource_inactive_metrics)
break

if all_metrics_are_inactive:
LOG.info("All metrics [%s] of resource [%s] are inactive."
"Therefore, we will mark it as finished with an"
"ended at timestmap.", resource_metrics, resource)
if resource.ended_at is not None:
LOG.debug(
"Resource [%s] already has an ended at value.", resource)
else:
LOG.info("Marking ended at timestamp for resource "
"[%s] because all of its metrics are inactive.",
resource)
self.index.update_resource(
"generic", resource_id, ended_at=moment_now)

def clean_raw_data_inactive_metrics(self):
"""Cleans metrics raw data if they are inactive.

Expand Down
11 changes: 11 additions & 0 deletions gnocchi/cli/metricd.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ def _run_job(self):
LOG.debug("Finished the cleaning of raw data points for metrics that "
"are no longer receiving measures.")

if (self.conf.metricd.metric_inactive_after and
self.conf.metricd.metric_inactive_after > 0):
LOG.debug("Starting resource ended at field normalization.")
self.chef.resource_ended_at_normalization(
self.conf.metricd.metric_inactive_after)
LOG.debug("Finished resource ended at field normalization.")
else:
LOG.debug("Resource ended at field normalization is not "
"activated. See 'metric_inactive_after' parameter if "
"you wish to activate it.")


class MetricdServiceManager(cotyledon.ServiceManager):
def __init__(self, conf):
Expand Down
6 changes: 5 additions & 1 deletion gnocchi/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,11 @@ def update_backwindow_changed_for_metrics_archive_policy(
raise exceptions.NotImplementedError

@staticmethod
def update_needs_raw_data_truncation(metric_id):
def update_needs_raw_data_truncation(metric_id, value):
raise exceptions.NotImplementedError

@staticmethod
def update_last_measure_timestamp(metric_id):
raise exceptions.NotImplementedError

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# under the License.
#

"""create metric truncation status column
"""Create metric truncation status column

Revision ID: 18fff4509e3e
Revises: 04eba72e4f90
Create Date: 2024-04-24 09:16:00

"""
import datetime

from alembic import op
from sqlalchemy.sql import func

import sqlalchemy

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""Create last measure push timestamp column
Revision ID: f89ed2e3c2ec
Revises: 94544ca86c7e
Create Date: 2024-04-24 09:16:00
"""

import datetime
from alembic import op

import sqlalchemy

# revision identifiers, used by Alembic.
revision = 'f89ed2e3c2ec'
down_revision = '94544ca86c7e'
branch_labels = None
depends_on = None


def upgrade():
sql_column = sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime,
nullable=False, default=datetime.datetime.utcnow(),
server_default=sqlalchemy.sql.func.current_timestamp())
op.add_column(
"metric", sql_column)
7 changes: 7 additions & 0 deletions gnocchi/indexer/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,13 @@ def update_needs_raw_data_truncation(self, metrid_id, value=False):
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_last_measure_timestamp(self, metrid_id):
with self.facade.writer() as session:
stmt = update(Metric).filter(Metric.id == metrid_id).values(
last_measure_timestamp=datetime.datetime.utcnow())
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_backwindow_changed_for_metrics_archive_policy(
self, archive_policy_name):
with self.facade.writer() as session:
Expand Down
15 changes: 13 additions & 2 deletions gnocchi/indexer/sqlalchemy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.

import datetime
from oslo_db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
Expand Down Expand Up @@ -114,6 +115,14 @@ class Metric(Base, GnocchiBase, indexer.Metric):
nullable=False, default=True,
server_default=sqlalchemy.sql.true())

# Timestamp that represents when the last measure push was received for the
# given metric. This allows us to identify when a metric ceased receiving
# measurements; thus, if all metric for a resource are in this situation,
# chances are that the resource ceased existing in the backend.
last_measure_timestamp = sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime, default=datetime.datetime.utcnow(), nullable=False,
server_default=sqlalchemy.sql.func.current_timestamp())

def jsonify(self):
d = {
"id": self.id,
Expand Down Expand Up @@ -257,7 +266,8 @@ def type(cls):
creator = sqlalchemy.Column(sqlalchemy.String(255))
started_at = sqlalchemy.Column(types.TimestampUTC, nullable=False,
default=lambda: utils.utcnow())
revision_start = sqlalchemy.Column(types.TimestampUTC, nullable=False,
revision_start = sqlalchemy.Column(types.TimestampUTC,
nullable=False,
default=lambda: utils.utcnow())
ended_at = sqlalchemy.Column(types.TimestampUTC)
user_id = sqlalchemy.Column(sqlalchemy.String(255))
Expand Down Expand Up @@ -299,7 +309,8 @@ class ResourceHistory(ResourceMixin, Base, GnocchiBase):
ondelete="CASCADE",
name="fk_rh_id_resource_id"),
nullable=False)
revision_end = sqlalchemy.Column(types.TimestampUTC, nullable=False,
revision_end = sqlalchemy.Column(types.TimestampUTC,
nullable=False,
default=lambda: utils.utcnow())
metrics = sqlalchemy.orm.relationship(
Metric, primaryjoin="Metric.resource_id == ResourceHistory.id",
Expand Down
12 changes: 10 additions & 2 deletions gnocchi/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def __getitem__(self, key):
for opt in _INCOMING_OPTS:
opt.default = '${storage.%s}' % opt.name


API_OPTS = (
cfg.HostAddressOpt('host',
default="0.0.0.0",
Expand Down Expand Up @@ -177,7 +176,16 @@ def list_opts():
default=10000,
min=1,
help="Number of metrics that should be deleted "
"simultaneously by one janitor.")
"simultaneously by one janitor."),
cfg.IntOpt('metric_inactive_after',
default=0,
help="Number of seconds to wait before we consider a "
"metric inactive. An inactive metric is a metric "
"that has not received new measurements for a "
"given period. If all metrics of a resource are "
"inactive, we mark the resource with the "
"'ended_at' timestamp. The default is 0 (zero), "
"which means that we never execute process.")
)),
("api", (
cfg.StrOpt('paste_config',
Expand Down
20 changes: 20 additions & 0 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,26 @@ def _map_compute_splits_operations(bound_timeserie):
if metric.needs_raw_data_truncation:
indexer_driver.update_needs_raw_data_truncation(metric.id)

# Mark when the metric receives its latest measures
indexer_driver.update_last_measure_timestamp(metric.id)

resource_id = metric.resource_id
if resource_id:
resource = indexer_driver.get_resource('generic', resource_id)
LOG.debug("Checking if resource [%s] of metric [%s] with "
"resource ID [%s] needs to be restored.",
resource, metric.id, resource_id)
if resource.ended_at is not None:
LOG.info("Resource [%s] was marked with a timestamp for the "
"'ended_at' field. However, it received a "
"measurement for metric [%s]. Therefore, restoring "
"it.", resource, metric)
indexer_driver.update_resource(
"generic", resource_id, ended_at=None)
else:
LOG.debug("Metric [%s] does not have a resource "
"assigned to it.", metric)

with self.statistics.time("splits delete"):
self._delete_metric_splits(splits_to_delete)
self.statistics["splits delete"] += len(splits_to_delete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ defaults:
# User foobar
authorization: "basic Zm9vYmFyOg=="
content-type: application/json
poll:
count: 3
delay: 1

tests:
- name: create archive policy
Expand Down
58 changes: 51 additions & 7 deletions gnocchi/tests/indexer/sqlalchemy/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import fixtures
import oslo_db.exception
from oslo_db.sqlalchemy import test_migrations
import sqlalchemy.schema

import sqlalchemy as sa
import sqlalchemy_utils

from unittest import mock

from gnocchi import indexer
from gnocchi.indexer import sqlalchemy
from gnocchi.indexer import sqlalchemy_base
from gnocchi.indexer import sqlalchemy as gnocchi_sqlalchemy
from gnocchi.indexer import sqlalchemy_base as gnocchi_sqlalchemy_base
from gnocchi.tests import base


Expand All @@ -41,7 +43,7 @@ def setUp(self):
self.db = mock.Mock()
self.conf.set_override(
'url',
sqlalchemy.SQLAlchemyIndexer._create_new_database(
gnocchi_sqlalchemy.SQLAlchemyIndexer._create_new_database(
self.conf.indexer.url),
'indexer')
self.index = indexer.get_driver(self.conf)
Expand All @@ -56,10 +58,10 @@ def setUp(self):
# NOTE(sileht): load it in sqlalchemy metadata
self.index._RESOURCE_TYPE_MANAGER.get_classes(rt)

for table in sqlalchemy_base.Base.metadata.sorted_tables:
for table in gnocchi_sqlalchemy_base.Base.metadata.sorted_tables:
if (table.name.startswith("rt_") and
table.name not in valid_resource_type_tables):
sqlalchemy_base.Base.metadata.remove(table)
gnocchi_sqlalchemy_base.Base.metadata.remove(table)
self.index._RESOURCE_TYPE_MANAGER._cache.pop(
table.name.replace('_history', ''), None)

Expand All @@ -72,7 +74,49 @@ def _drop_database(self):

@staticmethod
def get_metadata():
return sqlalchemy_base.Base.metadata
return gnocchi_sqlalchemy_base.Base.metadata

def get_engine(self):
return self.index.get_engine()

def compare_server_default(self, ctxt, ins_col, meta_col, insp_def, meta_def, rendered_meta_def):
"""Compare default values between model and db table.

Return True if the defaults are different, False if not, or None to
allow the default implementation to compare these defaults.

:param ctxt: alembic MigrationContext instance
:param ins_col: reflected column
:param insp_def: reflected column default value
:param meta_col: column from model
:param meta_def: column default value from model
:param rendered_meta_def: rendered column default value (from model)

When the column has server_default=sqlalchemy.sql.func.now(), the diff includes the followings diff
[ [ ( 'modify_default',
None,
'metric',
'last_measure_timestamp',
{ 'existing_comment': None,
'existing_nullable': False,
'existing_type': DATETIME()},
DefaultClause(<sqlalchemy.sql.elements.TextClause object at 0x7f0100b24b50>, for_update=False),
DefaultClause(<sqlalchemy.sql.functions.now at 0x7f01010b08d0; now>, for_update=False))]]

"""

method_return = super(ModelsMigrationsSync, self).compare_server_default(ctxt, ins_col, meta_col, insp_def,
meta_def, rendered_meta_def)

is_meta_column_default_timestamp = meta_def is not None and isinstance(
meta_def.arg, sa.sql.functions.current_timestamp)
is_reflected_column_default_text_type = ins_col is not None and ins_col.server_default is not None and \
isinstance(ins_col.server_default.arg, sa.sql.elements.TextClause)

is_server_default_current_timestamp = is_meta_column_default_timestamp and is_reflected_column_default_text_type

if not is_server_default_current_timestamp:
return method_return

# If it is different from "CURRENT_TIMESTAMP", then we must return True, so the test flow continues.
return rendered_meta_def != "CURRENT_TIMESTAMP"
Loading