Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics updates #8

Merged
merged 4 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion wis2-gdc-management/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ RUN apt-get update -y && \
# install dependencies
apt-get install -y ${DEBIAN_PACKAGES} && \
pip3 install -U https://github.com/wmo-im/pywis-pubsub/archive/main.zip && \
pip3 install --no-cache-dir -r /app/requirements.txt elasticsearch && \
pip3 install --no-cache-dir -r /app/requirements-backend.txt && \
# install wis2-gdc
cd /app && \
pip3 install -e . && \
Expand Down
2 changes: 1 addition & 1 deletion wis2-gdc-management/requirements-backend.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elasticsearch
owslib
OWSLib
12 changes: 12 additions & 0 deletions wis2-gdc-management/wis2_gdc/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,17 @@ def save(self, record: dict) -> None:

raise NotImplementedError()

@abstractmethod
def exists(self, identifier: str) -> bool:
"""
Querying whether a record exists in a backend

:param identifier: `str` of record identifier

:returns: `bool` of whether record exists in backend
"""

raise NotImplementedError()

def __repr__(self):
return '<BaseBackend>'
10 changes: 9 additions & 1 deletion wis2-gdc-management/wis2_gdc/backend/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import logging
from urllib.parse import urlparse

from elasticsearch import Elasticsearch
from elasticsearch import Elasticsearch, NotFoundError

from wis2_gdc.backend.base import BaseBackend

Expand Down Expand Up @@ -158,5 +158,13 @@ def save(self, record: dict) -> None:
LOGGER.debug(f"Indexing record {record['id']}")
self.es.index(index=self.index_name, id=record['id'], body=record)

def exists(self, identifier: str) -> bool:
LOGGER.debug(f'Querying GDC for id {identifier}')
try:
_ = self.es.get(index=self.index_name, id=identifier)
return True
except NotFoundError:
return False

def __repr__(self):
return '<ElasticsearchBackend>'
27 changes: 19 additions & 8 deletions wis2-gdc-management/wis2_gdc/backend/ogcapi_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
###############################################################################

import logging
import json

from owslib.ogcapi.records import Records

from wis2_gdc import env
from wis2_gdc.backend.base import BaseBackend
Expand All @@ -29,18 +32,18 @@

class OGCAPIRecordsBackend(BaseBackend):

def save(self):
def __init__(self, defs):
super().__init__(defs)

import json
self.conn = Records(env.API_URL)
self.collection = 'discovery-metadata'

from owslib.ogcapi import Records
def save(self):

oarec = Records(env.API_URL)
collection = 'discovery-metadata'
ttype = 'create'

try:
_ = oarec.get_collection_item(self.metadata['id'])
_ = self.conn.get_collection_item(self.metadata['id'])
ttype = 'update'
except Exception:
pass
Expand All @@ -49,10 +52,18 @@ def save(self):

if ttype == 'create':
LOGGER.debug('Adding new record to catalogue')
_ = oarec.get_collection_create(collection, payload)
_ = self.conn.get_collection_create(self.collection, payload)
elif ttype == 'update':
LOGGER.debug('Updating existing record in catalogue')
_ = oarec.get_collection_update(collection, payload)
_ = self.conn.get_collection_update(self.collection, payload)

def exists(self, identifier: str) -> bool:
LOGGER.debug(f'Querying GDC for id {identifier}')
try:
_ = self.conn.collection_item(self.collection, identifier)
return True
except RuntimeError:
return False

def __repr__(self):
return '<OGCAPIRecordsBackend>'
62 changes: 51 additions & 11 deletions wis2-gdc-management/wis2_gdc/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import json
import logging
from pathlib import Path
from typing import Union

import click
import requests
Expand Down Expand Up @@ -50,6 +51,8 @@ def __init__(self):

self.broker = None
self.metadata = None
self.backend = BACKENDS[BACKEND_TYPE](
{'connection': BACKEND_CONNECTION})

if PUBLISH_REPORTS:
self.broker = MQTTPubSubClient(BROKER_URL)
Expand Down Expand Up @@ -105,24 +108,24 @@ def register(self, metadata: dict) -> None:
if ets_results['ets-report']['summary']['FAILED'] > 0:
LOGGER.warning('ETS errors; metadata not published')
return
except KeyError: # validation error
except KeyError:
LOGGER.debug('Validation errors; metadata not published')
self.broker.pub('wis2-gdc/metrics/failed_total',
json.dumps(centre_id_labels))
self._process_record_metric(
self.metadata['id'], 'failed_total', centre_id_labels)
return

self.broker.pub('wis2-gdc/metrics/passed_total',
json.dumps(centre_id_labels))
self._process_record_metric(
self.metadata['id'], 'passed_total', centre_id_labels)

data_policy = self.metadata['properties']['wmo:dataPolicy']

self.broker.pub(f'wis2-gdc/metrics/{data_policy}_total',
json.dumps(centre_id_labels))
self._process_record_metric(
self.metadata['id'], f'{data_policy}_total', centre_id_labels)

LOGGER.info('Updating links')
self.update_record_links()

LOGGER.info(f'Publishing metadata to {BACKEND_TYPE} ({BACKEND_CONNECTION})') # noqa
LOGGER.info('Publishing metadata to backend')
self._publish()

if RUN_KPI:
Expand All @@ -135,6 +138,44 @@ def register(self, metadata: dict) -> None:
LOGGER.info('Publishing KPI report to broker')
self.broker.pub(topic, json.dumps(kpi_results))

kpi_labels = [self.metadata['id']] + centre_id_labels

self._process_record_metric(
self.metadata['id'], 'kpi_percentage_total',
kpi_labels, kpi_results['summary']['percentage'])

def _process_record_metric(self, identifier: str, metric_name: str,
labels: list,
value: Union[str, int, float] = None) -> None:
"""
Helper function to process record metric

:param identifier: identifier of metadata record
:param metric_name: `str` of name of metric
:param labels: `list` of labels to apply
:param value: optional value(s) to set

:returns: `None`
"""

publish_metric = True

message_payload = {
'labels': labels
}

if value is not None:
message_payload['value'] = value

if self.backend.exists(identifier) and len(labels) == 2:
LOGGER.debug('Record exists; not publishing metric')
publish_metric = False

if publish_metric:
LOGGER.debug('Record does not exist; publishing metric')
self.broker.pub(f'wis2-gdc/metrics/{metric_name}',
json.dumps(message_payload))

def _run_ets(self) -> dict:
"""
Helper function to run ETS
Expand Down Expand Up @@ -169,9 +210,8 @@ def _publish(self):
:returns: `None`
"""

backend = BACKENDS[BACKEND_TYPE]({'connection': BACKEND_CONNECTION})
LOGGER.info('Saving metadata to backend')
backend.save(self.metadata)
LOGGER.info(f'Saving to {BACKEND_TYPE} ({BACKEND_CONNECTION})')
self.backend.save(self.metadata)

def update_record_links(self) -> None:
"""
Expand Down
15 changes: 10 additions & 5 deletions wis2-gdc-metrics-collector/metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
['centre_id', 'report_by']
)

METRIC_KPI_PERCENTAGE_TOTAL = Counter(
METRIC_KPI_PERCENTAGE_TOTAL = Gauge(
'wmo_wis2_gdc_kpi_percentage_total',
'KPI percentage for a single metadata record (metadata_id equals WCMP2 id)', # noqa
['metadata_id', 'centre_id', 'report_by']
Expand All @@ -90,13 +90,13 @@
['centre_id', 'report_by']
)

METRIC_KPI_PERCENTAGE_OVER80_TOTAL = Counter(
METRIC_KPI_PERCENTAGE_OVER80_TOTAL = Gauge(
'wmo_wis2_gdc_kpi_percentage_over80_total',
'Number of metadata records with KPI percentage over 80',
['centre_id', 'report_by']
)

METRIC_SEARCH_TOTAL = Counter(
METRIC_SEARCH_TOTAL = Gauge(
'wmo_wis2_gdc_search_total',
'Number of search requests (during last monitoring period)',
['centre_id', 'report_by']
Expand Down Expand Up @@ -129,9 +129,12 @@ def _sub_connect(client, userdata, flags, rc):
def _sub_message(client, userdata, msg):
LOGGER.debug('Processing message')
topic = msg.topic
labels = json.loads(msg.payload)
payload = json.loads(msg.payload)
labels = payload['labels']
value = payload.get('value')
LOGGER.debug(f'Topic: {topic}')
LOGGER.debug(f'Labels: {labels}')
LOGGER.debug(f"Labels: {payload['labels']}")
LOGGER.debug(f"Value: {payload.get('labels')}")

if topic == 'wis2-gdc/metrics/passed_total':
METRIC_PASSED_TOTAL.labels(*labels).inc()
Expand All @@ -141,6 +144,8 @@ def _sub_message(client, userdata, msg):
METRIC_CORE_TOTAL.labels(*labels).inc()
elif topic == 'wis2-gdc/metrics/recommended_total':
METRIC_RECOMMENDED_TOTAL.labels(*labels).inc()
elif topic == 'wis2-gdc/metrics/kpi_percentage_total':
METRIC_KPI_PERCENTAGE_TOTAL.labels(*labels).set(value)

url = urlparse(BROKER_URL)

Expand Down
Loading