diff --git a/wis2-gdc-management/Dockerfile b/wis2-gdc-management/Dockerfile index 2340c81..1ff8ffe 100644 --- a/wis2-gdc-management/Dockerfile +++ b/wis2-gdc-management/Dockerfile @@ -37,7 +37,7 @@ RUN apt-get update -y && \ # install dependencies apt-get install -y ${DEBIAN_PACKAGES} && \ pip3 install -U https://github.com/wmo-im/pywis-pubsub/archive/main.zip && \ - pip3 install --no-cache-dir -r /app/requirements.txt elasticsearch && \ + pip3 install --no-cache-dir -r /app/requirements-backend.txt && \ # install wis2-gdc cd /app && \ pip3 install -e . && \ diff --git a/wis2-gdc-management/requirements-backend.txt b/wis2-gdc-management/requirements-backend.txt index b2066d6..c1cfe6b 100644 --- a/wis2-gdc-management/requirements-backend.txt +++ b/wis2-gdc-management/requirements-backend.txt @@ -1,2 +1,2 @@ elasticsearch -owslib +OWSLib diff --git a/wis2-gdc-management/wis2_gdc/backend/base.py b/wis2-gdc-management/wis2_gdc/backend/base.py index 22656e5..26631de 100644 --- a/wis2-gdc-management/wis2_gdc/backend/base.py +++ b/wis2-gdc-management/wis2_gdc/backend/base.py @@ -61,5 +61,17 @@ def save(self, record: dict) -> None: raise NotImplementedError() + @abstractmethod + def exists(self, identifier: str) -> bool: + """ + Querying whether a record exists in a backend + + :param identifier: `str` of record identifier + + :returns: `bool` of whether record exists in backend + """ + + raise NotImplementedError() + def __repr__(self): return '' diff --git a/wis2-gdc-management/wis2_gdc/backend/elastic.py b/wis2-gdc-management/wis2_gdc/backend/elastic.py index 0455270..6785a2e 100644 --- a/wis2-gdc-management/wis2_gdc/backend/elastic.py +++ b/wis2-gdc-management/wis2_gdc/backend/elastic.py @@ -22,7 +22,7 @@ import logging from urllib.parse import urlparse -from elasticsearch import Elasticsearch +from elasticsearch import Elasticsearch, NotFoundError from wis2_gdc.backend.base import BaseBackend @@ -158,5 +158,13 @@ def save(self, record: dict) -> None: LOGGER.debug(f"Indexing record {record['id']}") self.es.index(index=self.index_name, id=record['id'], body=record) + def exists(self, identifier: str) -> bool: + LOGGER.debug(f'Querying GDC for id {identifier}') + try: + _ = self.es.get(index=self.index_name, id=identifier) + return True + except NotFoundError: + return False + def __repr__(self): return '' diff --git a/wis2-gdc-management/wis2_gdc/backend/ogcapi_records.py b/wis2-gdc-management/wis2_gdc/backend/ogcapi_records.py index 6864671..7542c3c 100644 --- a/wis2-gdc-management/wis2_gdc/backend/ogcapi_records.py +++ b/wis2-gdc-management/wis2_gdc/backend/ogcapi_records.py @@ -20,6 +20,9 @@ ############################################################################### import logging +import json + +from owslib.ogcapi.records import Records from wis2_gdc import env from wis2_gdc.backend.base import BaseBackend @@ -29,18 +32,18 @@ class OGCAPIRecordsBackend(BaseBackend): - def save(self): + def __init__(self, defs): + super().__init__(defs) - import json + self.conn = Records(env.API_URL) + self.collection = 'discovery-metadata' - from owslib.ogcapi import Records + def save(self): - oarec = Records(env.API_URL) - collection = 'discovery-metadata' ttype = 'create' try: - _ = oarec.get_collection_item(self.metadata['id']) + _ = self.conn.get_collection_item(self.metadata['id']) ttype = 'update' except Exception: pass @@ -49,10 +52,18 @@ def save(self): if ttype == 'create': LOGGER.debug('Adding new record to catalogue') - _ = oarec.get_collection_create(collection, payload) + _ = self.conn.get_collection_create(self.collection, payload) elif ttype == 'update': LOGGER.debug('Updating existing record in catalogue') - _ = oarec.get_collection_update(collection, payload) + _ = self.conn.get_collection_update(self.collection, payload) + + def exists(self, identifier: str) -> bool: + LOGGER.debug(f'Querying GDC for id {identifier}') + try: + _ = self.conn.collection_item(self.collection, identifier) + return True + except RuntimeError: + return False def __repr__(self): return '' diff --git a/wis2-gdc-management/wis2_gdc/registrar.py b/wis2-gdc-management/wis2_gdc/registrar.py index 8a2a53f..7aadc26 100644 --- a/wis2-gdc-management/wis2_gdc/registrar.py +++ b/wis2-gdc-management/wis2_gdc/registrar.py @@ -23,6 +23,7 @@ import json import logging from pathlib import Path +from typing import Union import click import requests @@ -50,6 +51,8 @@ def __init__(self): self.broker = None self.metadata = None + self.backend = BACKENDS[BACKEND_TYPE]( + {'connection': BACKEND_CONNECTION}) if PUBLISH_REPORTS: self.broker = MQTTPubSubClient(BROKER_URL) @@ -105,24 +108,24 @@ def register(self, metadata: dict) -> None: if ets_results['ets-report']['summary']['FAILED'] > 0: LOGGER.warning('ETS errors; metadata not published') return - except KeyError: # validation error + except KeyError: LOGGER.debug('Validation errors; metadata not published') - self.broker.pub('wis2-gdc/metrics/failed_total', - json.dumps(centre_id_labels)) + self._process_record_metric( + self.metadata['id'], 'failed_total', centre_id_labels) return - self.broker.pub('wis2-gdc/metrics/passed_total', - json.dumps(centre_id_labels)) + self._process_record_metric( + self.metadata['id'], 'passed_total', centre_id_labels) data_policy = self.metadata['properties']['wmo:dataPolicy'] - self.broker.pub(f'wis2-gdc/metrics/{data_policy}_total', - json.dumps(centre_id_labels)) + self._process_record_metric( + self.metadata['id'], f'{data_policy}_total', centre_id_labels) LOGGER.info('Updating links') self.update_record_links() - LOGGER.info(f'Publishing metadata to {BACKEND_TYPE} ({BACKEND_CONNECTION})') # noqa + LOGGER.info('Publishing metadata to backend') self._publish() if RUN_KPI: @@ -135,6 +138,44 @@ def register(self, metadata: dict) -> None: LOGGER.info('Publishing KPI report to broker') self.broker.pub(topic, json.dumps(kpi_results)) + kpi_labels = [self.metadata['id']] + centre_id_labels + + self._process_record_metric( + self.metadata['id'], 'kpi_percentage_total', + kpi_labels, kpi_results['summary']['percentage']) + + def _process_record_metric(self, identifier: str, metric_name: str, + labels: list, + value: Union[str, int, float] = None) -> None: + """ + Helper function to process record metric + + :param identifier: identifier of metadata record + :param metric_name: `str` of name of metric + :param labels: `list` of labels to apply + :param value: optional value(s) to set + + :returns: `None` + """ + + publish_metric = True + + message_payload = { + 'labels': labels + } + + if value is not None: + message_payload['value'] = value + + if self.backend.exists(identifier) and len(labels) == 2: + LOGGER.debug('Record exists; not publishing metric') + publish_metric = False + + if publish_metric: + LOGGER.debug('Record does not exist; publishing metric') + self.broker.pub(f'wis2-gdc/metrics/{metric_name}', + json.dumps(message_payload)) + def _run_ets(self) -> dict: """ Helper function to run ETS @@ -169,9 +210,8 @@ def _publish(self): :returns: `None` """ - backend = BACKENDS[BACKEND_TYPE]({'connection': BACKEND_CONNECTION}) - LOGGER.info('Saving metadata to backend') - backend.save(self.metadata) + LOGGER.info(f'Saving to {BACKEND_TYPE} ({BACKEND_CONNECTION})') + self.backend.save(self.metadata) def update_record_links(self) -> None: """ diff --git a/wis2-gdc-metrics-collector/metrics_collector.py b/wis2-gdc-metrics-collector/metrics_collector.py index 0d48fba..f08647d 100644 --- a/wis2-gdc-metrics-collector/metrics_collector.py +++ b/wis2-gdc-metrics-collector/metrics_collector.py @@ -78,7 +78,7 @@ ['centre_id', 'report_by'] ) -METRIC_KPI_PERCENTAGE_TOTAL = Counter( +METRIC_KPI_PERCENTAGE_TOTAL = Gauge( 'wmo_wis2_gdc_kpi_percentage_total', 'KPI percentage for a single metadata record (metadata_id equals WCMP2 id)', # noqa ['metadata_id', 'centre_id', 'report_by'] @@ -90,13 +90,13 @@ ['centre_id', 'report_by'] ) -METRIC_KPI_PERCENTAGE_OVER80_TOTAL = Counter( +METRIC_KPI_PERCENTAGE_OVER80_TOTAL = Gauge( 'wmo_wis2_gdc_kpi_percentage_over80_total', 'Number of metadata records with KPI percentage over 80', ['centre_id', 'report_by'] ) -METRIC_SEARCH_TOTAL = Counter( +METRIC_SEARCH_TOTAL = Gauge( 'wmo_wis2_gdc_search_total', 'Number of search requests (during last monitoring period)', ['centre_id', 'report_by'] @@ -129,9 +129,12 @@ def _sub_connect(client, userdata, flags, rc): def _sub_message(client, userdata, msg): LOGGER.debug('Processing message') topic = msg.topic - labels = json.loads(msg.payload) + payload = json.loads(msg.payload) + labels = payload['labels'] + value = payload.get('value') LOGGER.debug(f'Topic: {topic}') - LOGGER.debug(f'Labels: {labels}') + LOGGER.debug(f"Labels: {payload['labels']}") + LOGGER.debug(f"Value: {payload.get('labels')}") if topic == 'wis2-gdc/metrics/passed_total': METRIC_PASSED_TOTAL.labels(*labels).inc() @@ -141,6 +144,8 @@ def _sub_message(client, userdata, msg): METRIC_CORE_TOTAL.labels(*labels).inc() elif topic == 'wis2-gdc/metrics/recommended_total': METRIC_RECOMMENDED_TOTAL.labels(*labels).inc() + elif topic == 'wis2-gdc/metrics/kpi_percentage_total': + METRIC_KPI_PERCENTAGE_TOTAL.labels(*labels).set(value) url = urlparse(BROKER_URL)