diff --git a/docker-compose.yml b/docker-compose.yml index d97727f1..4b95a912 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,7 +36,7 @@ services: wis2box-api: container_name: wis2box-api - image: ghcr.io/wmo-im/wis2box-api:1.0b7 + image: ghcr.io/wmo-im/wis2box-api:latest restart: always env_file: - wis2box.env diff --git a/grafana/dashboards/home.json b/grafana/dashboards/home.json index 79ba3cb8..bd600aaf 100644 --- a/grafana/dashboards/home.json +++ b/grafana/dashboards/home.json @@ -588,9 +588,42 @@ "legendFormat": "", "queryType": "range", "refId": "B" + }, + { + "datasource": { + "type": "loki", + "uid": "P55348B596EBB51C3" + }, + "editorMode": "builder", + "expr": "{compose_service=\"wis2box-management\"} |= `WARNING`", + "hide": false, + "queryType": "range", + "refId": "A" + }, + { + "datasource": { + "type": "loki", + "uid": "P55348B596EBB51C3" + }, + "editorMode": "builder", + "expr": "{compose_service=\"wis2box-api\"} |= `ERROR`", + "hide": false, + "queryType": "range", + "refId": "C" + }, + { + "datasource": { + "type": "loki", + "uid": "P55348B596EBB51C3" + }, + "editorMode": "builder", + "expr": "{compose_service=\"wis2box-api\"} |= `WARNING` != `int() argument must be a string`", + "hide": false, + "queryType": "range", + "refId": "D" } ], - "title": "wis2box ERRORs", + "title": "wis2box ERRORs and WARNINGs", "type": "logs" } ], diff --git a/tests/data/data-mappings.yml b/tests/data/data-mappings.yml deleted file mode 100644 index 40821847..00000000 --- a/tests/data/data-mappings.yml +++ /dev/null @@ -1,65 +0,0 @@ -data: - cd-brazza_met_centre.data.core.weather.surface-based-observations.synop: - plugins: - txt: - - plugin: wis2box.data.synop2bufr.ObservationDataSYNOP2BUFR - notify: true - file-pattern: '^.*_(\d{4})(\d{2}).*\.txt$' - bufr4: - - plugin: wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON - buckets: - - ${WIS2BOX_STORAGE_PUBLIC} - file-pattern: '^.*\.bufr4$' - ro-rnimh.data.core.weather.surface-based-observations.synop: - plugins: - txt: - - plugin: wis2box.data.synop2bufr.ObservationDataSYNOP2BUFR - notify: true - file-pattern: '^A_SMR.*EDZW_(\d{4})(\d{2}).*.txt$' - csv: - - plugin: wis2box.data.csv2bufr.ObservationDataCSV2BUFR - template: aws-template - notify: true - file-pattern: '^.*\.csv$' - bufr4: - - plugin: wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON - file-pattern: '^A_SMR.*EDZW_(\d{4})(\d{2}).*.bufr4$' - mw-mw_met_centre.data.core.weather.surface-based-observations.synop: - plugins: - csv: - - plugin: wis2box.data.csv2bufr.ObservationDataCSV2BUFR - template: synop_bufr - notify: true - file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.csv$' - bufr4: - - plugin: wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON - file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.bufr4$' - it-roma_met_centre.data.core.weather.surface-based-observations.synop: - plugins: - bin: - - plugin: wis2box.data.bufr4.ObservationDataBUFR - notify: true - file-pattern: '^.*\.bin$' - bufr4: - - plugin: wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON - file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.bufr4$' - dz-alger_met_centre.data.core.weather.surface-based-observations.synop: - plugins: - bufr4: - - plugin: wis2box.data.bufr4.ObservationDataBUFR - notify: true - buckets: - - ${WIS2BOX_STORAGE_INCOMING} - file-pattern: '^.*\.bufr4$' - - plugin: wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON - buckets: - - ${WIS2BOX_STORAGE_PUBLIC} - file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.bufr4$' - cn-cma.data.core.weather.prediction.forecast.medium-range.probabilistic.global: - plugins: - grib2: - - plugin: wis2box.data.universal.UniversalData - notify: true - buckets: - - ${WIS2BOX_STORAGE_INCOMING} - file-pattern: '^.*_(\d{8})\d{2}.*\.grib2$' \ No newline at end of file diff --git a/tests/data/metadata/discovery/mw-surface-weather-observations.yml b/tests/data/metadata/discovery/mw-surface-weather-observations.yml index 987cfd58..bcb14c48 100644 --- a/tests/data/metadata/discovery/mw-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/mw-surface-weather-observations.yml @@ -7,7 +7,7 @@ wis2box: plugins: csv: - plugin: wis2box.data.csv2bufr.ObservationDataCSV2BUFR - template: synop_bufr + template: CampbellAfrica-v1-template notify: true file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.csv$' bufr4: diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 53634e92..44a27d58 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -251,8 +251,8 @@ def test_message_api(): 'mw_met_centre': 25, 'roma_met_centre': 33, 'alger_met_centre': 29, - 'rnimh': 116, - 'brazza_met_centre': 15, + 'rnimh': 111, + 'brazza_met_centre': 14, 'wmo-test': 151 } for key, value in counts.items(): @@ -291,7 +291,7 @@ def test_message_api(): assert not props['data_id'].startswith('wis2') assert not props['data_id'].startswith('origin/a/wis2') assert props['data_id'].startswith('cd') - assert props['content']['size'] == 257 + assert props['content']['size'] == 253 assert props['content']['encoding'] == 'base64' assert props['content']['value'] is not None diff --git a/wis2box-management/Dockerfile b/wis2box-management/Dockerfile index c5102d0c..c1ae855e 100644 --- a/wis2box-management/Dockerfile +++ b/wis2box-management/Dockerfile @@ -19,14 +19,14 @@ # ############################################################################### -FROM ghcr.io/wmo-im/dim_eccodes_baseimage:latest +FROM ubuntu:focal -LABEL maintainer="tomkralidis@gmail.com" +LABEL maintainer="tomkralidis@gmail.com; mlimper@wmo.int" ARG WIS2BOX_PIP3_EXTRA_PACKAGES ENV TZ="Etc/UTC" \ DEBIAN_FRONTEND="noninteractive" \ - DEBIAN_PACKAGES="cron bash vim curl git libffi-dev python3-cryptography libssl-dev libudunits2-0 python3-paho-mqtt python3-dateparser python3-tz python3-setuptools unzip" + DEBIAN_PACKAGES="cron bash vim curl git libffi-dev python3-cryptography libssl-dev libudunits2-0 python3 python3-pip curl python3-paho-mqtt python3-dateparser python3-tz python3-setuptools unzip" RUN if [ "$WIS2BOX_PIP3_EXTRA_PACKAGES" = "None" ]; \ then export WIS2BOX_PIP3_EXTRA_PACKAGES=echo; \ @@ -39,11 +39,7 @@ RUN if [ "$WIS2BOX_PIP3_EXTRA_PACKAGES" = "None" ]; \ RUN apt-get update -y && apt-get install -y ${DEBIAN_PACKAGES} \ # install wis2box data pipeline dependencies && pip3 install --no-cache-dir \ - https://github.com/wmo-im/csv2bufr/archive/refs/tags/v0.7.4.zip \ - https://github.com/wmo-im/bufr2geojson/archive/refs/tags/v0.5.1.zip \ - https://github.com/wmo-im/pymetdecoder/archive/refs/tags/v0.1.10.zip \ - https://github.com/wmo-cop/pyoscar/archive/master.zip \ - https://github.com/wmo-im/synop2bufr/archive/refs/tags/v0.6.2.zip \ + https://github.com/wmo-cop/pyoscar/archive/refs/tags/0.6.4.zip \ https://github.com/geopython/pygeometa/archive/master.zip \ https://github.com/wmo-im/pywis-topics/archive/refs/tags/0.2.0.zip \ # install shapely diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index e656d75e..3312667f 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -21,15 +21,17 @@ import click import logging +import requests + +from time import sleep from wis2box import cli_helpers from wis2box.api.backend import load_backend from wis2box.api.config import load_config from wis2box.env import (BROKER_HOST, BROKER_USERNAME, BROKER_PASSWORD, - BROKER_PORT) + BROKER_PORT, DOCKER_API_URL, API_URL) from wis2box.plugin import load_plugin, PLUGINS - LOGGER = logging.getLogger(__name__) @@ -44,6 +46,56 @@ def refresh_data_mappings(): local_broker.pub('wis2box/data_mappings/refresh', '{}', qos=0) +def execute_api_process(process_name: str, payload: dict) -> dict: + """ + Executes a process on the API + + :param process_name: process name + :param payload: payload to send to process + + :returns: `dict` with execution-result + """ + + LOGGER.debug('Posting data to wis2box-api') + headers = { + 'accept': 'application/json', + 'Content-Type': 'application/json', + 'prefer': 'respond-async' + } + url = f'{DOCKER_API_URL}/processes/{process_name}/execution' + + response = requests.post(url, headers=headers, json=payload) + if response.status_code >= 400: + msg = f'Failed to post data to wis2box-api: {response.status_code}' # noqa + if response.text: + msg += f'\nError message: {response.text}' + LOGGER.error(msg) + raise ValueError(msg) + + if response.status_code == 200: + return response.json() + + headers_json = dict(response.headers) + location = headers_json['Location'] + location = location.replace(API_URL, DOCKER_API_URL) + + status = 'accepted' + while status in ['accepted', 'running']: + # get the job status + headers = { + 'accept': 'application/json', + 'Content-Type': 'application/json' + } + response = requests.get(location, headers=headers) + response_json = response.json() + if 'status' in response_json: + status = response_json['status'] + sleep(0.1) + # get result from location/results?f=json + response = requests.get(f'{location}/results?f=json', headers=headers) # noqa + return response.json() + + def setup_collection(meta: dict = {}) -> bool: """ Add collection to api backend and configuration diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index 33048951..05e0eab3 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -18,7 +18,7 @@ # under the License. # ############################################################################### - +import base64 import json import logging from pathlib import Path @@ -54,10 +54,10 @@ def __init__(self, defs: dict) -> None: self.filename = None self.incoming_filepath = None self.topic_hierarchy = TopicHierarchy(defs['topic_hierarchy']) - self.template = defs['template'] - self.file_filter = defs['pattern'] - self.enable_notification = defs['notify'] - self.buckets = defs['buckets'] + self.template = defs.get('template', None) + self.file_filter = defs.get('pattern', '.*') + self.enable_notification = defs.get('notify', False) + self.buckets = defs.get('buckets', ()) self.output_data = {} self.discovery_metadata = {} # if discovery_metadata: @@ -73,7 +73,7 @@ def publish_failure_message(self, description, wsi=None): # load plugin for local broker defs = { 'codepath': PLUGINS['pubsub']['mqtt']['plugin'], - 'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa + 'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa 'client_type': 'failure-publisher' } local_broker = load_plugin('pubsub', defs) @@ -169,7 +169,7 @@ def notify(self, identifier: str, storage_path: str, # load plugin for local broker defs_local = { 'codepath': PLUGINS['pubsub']['mqtt']['plugin'], - 'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa + 'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa 'client_type': 'notify-publisher' } local_broker = load_plugin('pubsub', defs_local) @@ -312,7 +312,13 @@ def get_public_filepath(self): @staticmethod def as_bytes(input_data): - """Get data as bytes""" + """Return input data as bytes + + :param input_data: `str`, `bytes` or `Path` of data + + :returns: `bytes` of data + """ + LOGGER.debug(f'input data is type: {type(input_data)}') if isinstance(input_data, bytes): return input_data @@ -325,5 +331,34 @@ def as_bytes(input_data): LOGGER.warning('Invalid data type') return None + @staticmethod + def as_string(input_data, base64_encode=False): + """Return input data as string + + :param input_data: `str`, `bytes` or `Path` of data + :param base64_encode: `bool` if to use base64-encode before decoding + + :returns: `str` of data + """ + + LOGGER.debug(f'input data is type: {type(input_data)}') + if isinstance(input_data, bytes): + if base64_encode: + return base64.b64encode(input_data).decode('utf-8') + else: + return input_data.decode('utf-8') + elif isinstance(input_data, str): + return input_data + elif isinstance(input_data, Path): + if base64_encode: + with input_data.open('rb') as fh: + return base64.b64encode(fh.read()).decode('utf-8') + else: + with input_data.open('r') as fh: + return fh.read() + else: + LOGGER.warning('Invalid data type') + return None + def __repr__(self): return '' diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index 2d5f4c37..a9b81141 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -19,12 +19,14 @@ # ############################################################################### + +import base64 import logging + from pathlib import Path from typing import Union -from bufr2geojson import transform as as_geojson - +from wis2box.api import execute_api_process from wis2box.data.geojson import ObservationDataGeoJSON LOGGER = logging.getLogger(__name__) @@ -33,47 +35,51 @@ class ObservationDataBUFR2GeoJSON(ObservationDataGeoJSON): """Observation data""" - def transform( - self, input_data: Union[Path, bytes], filename: str = '' - ) -> bool: + def transform(self, input_data: Union[Path, bytes], + filename: str = '') -> bool: LOGGER.debug('Procesing BUFR data') - input_bytes = self.as_bytes(input_data) - - LOGGER.debug('Generating GeoJSON features') - results = as_geojson(input_bytes, serialize=False) - - LOGGER.debug('Processing GeoJSON features') - for collection in results: - # results is an iterator, for each iteration we have: - # - dict['id'] - # - dict['id']['_meta'] - # - dict['id'] - for id, item in collection.items(): - LOGGER.debug(f'Processing feature: {id}') - - LOGGER.debug('Parsing feature datetime') - data_date = item['_meta']['data_date'] - if '/' in data_date: - # date is range/period, split and get end date/time - data_date = data_date.split('/')[1] - - LOGGER.debug('Parsing feature fields') - items_to_remove = [ - key for key in item if key not in ('geojson', '_meta') - ] - for key in items_to_remove: - LOGGER.debug(f'Removing unexpected key: {key}') - item.pop(key) - - LOGGER.debug('Populating output data for publication') - self.output_data[id] = item - - self.output_data[id]['_meta'][ - 'relative_filepath' - ] = self.get_local_filepath(data_date) - - LOGGER.debug('Successfully finished transforming BUFR data') + process_name = 'bufr2geojson' + + # check if input_data is Path object + if isinstance(input_data, Path): + payload = { + 'inputs': { + 'data_url': input_data.as_posix() + } + } + else: + input_bytes = self.as_bytes(input_data) + payload = { + 'inputs': { + 'data': base64.b64encode(input_bytes).decode('utf-8') + } + } + + result = execute_api_process(process_name, payload) + + # check for errors + if result.get('error') not in [None, '']: + LOGGER.error(result['error']) + + if 'items' not in result: + LOGGER.error(f'file={filename} failed to convert to GeoJSON') + return False + + # loop over items in response + for item in result['items']: + id = item['id'] + + data_date = item['properties']['resultTime'] + self.output_data[id] = { + '_meta': { + 'identifier': id, + 'data_date': data_date, + 'relative_filepath': self.get_local_filepath(data_date), + }, + 'geojson': item + } + return True def get_local_filepath(self, date_): diff --git a/wis2box-management/wis2box/data/bufr4.py b/wis2box-management/wis2box/data/bufr4.py index c692681c..d1520370 100644 --- a/wis2box-management/wis2box/data/bufr4.py +++ b/wis2box-management/wis2box/data/bufr4.py @@ -19,275 +19,84 @@ # ############################################################################### -from datetime import datetime +import base64 import logging + +from datetime import datetime from pathlib import Path -import tempfile from typing import Union -from bufr2geojson import BUFRParser -from eccodes import ( - codes_bufr_copy_data, - codes_bufr_new_from_samples, - codes_bufr_new_from_file, - codes_get_message, - codes_clone, - codes_set, - codes_set_array, - codes_release, - codes_get, - codes_get_array -) - +from wis2box.api import execute_api_process from wis2box.data.base import BaseAbstractData -from wis2box.metadata.station import get_geometry, get_valid_wsi LOGGER = logging.getLogger(__name__) -TEMPLATE = codes_bufr_new_from_samples("BUFR4") -TIME_PATTERNS = ['%Y', '%m', '%d', '%H', '%M', '%S'] -TIME_NAMES = [ - 'typicalYear', - 'typicalMonth', - 'typicalDay', - 'typicalHour', - 'typicalMinute', - 'typicalSecond' -] - -HEADERS = ["edition", "masterTableNumber", "bufrHeaderCentre", - "bufrHeaderSubCentre", "updateSequenceNumber", "dataCategory", - "internationalDataSubCategory", "dataSubCategory", - "masterTablesVersionNumber", "localTablesVersionNumber", - "typicalYear", "typicalMonth", "typicalDay", "typicalHour", - "typicalMinute", "typicalSecond", - "numberOfSubsets", "observedData", "compressedData"] class ObservationDataBUFR(BaseAbstractData): """Observation data""" - - def transform( - self, input_data: Union[Path, bytes], filename: str = '' - ) -> bool: - - LOGGER.debug('Procesing BUFR data') - input_bytes = self.as_bytes(input_data) - - # FIXME: figure out how to pass a bytestring to ecCodes BUFR reader - tmp = tempfile.NamedTemporaryFile() - with open(tmp.name, 'wb') as f: - f.write(input_bytes) - - # workflow - # check for multiple messages - # split messages and process - data = {} - with open(tmp.name, 'rb') as fh: - while data is not None: - data = codes_bufr_new_from_file(fh) - if data is not None: - self.transform_message(data) - codes_release(data) - - def transform_message(self, bufr_in: int) -> None: - """ - Parse single BUFR message - :param bufr_in: `int` of ecCodes pointer to BUFR message - :returns: `None` + def __init__(self, defs: dict) -> None: """ - # workflow - # check for multiple subsets - # add necessary components for WSI in BUFR - # split subsets into individual messages and process - try: - codes_set(bufr_in, 'unpack', True) - except Exception as err: - LOGGER.error(f'Error unpacking message: {err}') - raise err - - # get descriptors present in the file - descriptors = codes_get_array(bufr_in, "expandedDescriptors").tolist() - - # prepare the headers for the new messages - headers = {} - for header in HEADERS: - headers[header] = codes_get(bufr_in, header) - # original to be split by subset, so set the number of subsets to 1 - headers['numberOfSubsets'] = 1 - # set the master table version number - table_version = max( - 28, codes_get(bufr_in, 'masterTablesVersionNumber') - ) - headers['masterTablesVersionNumber'] = table_version - # set the unexpanded descriptors - out_ue = codes_get_array(bufr_in, 'unexpandedDescriptors').tolist() - if 301150 not in out_ue: - out_ue.insert(0, 301150) - headers['unexpandedDescriptors'] = out_ue - - # loop over the subsets, create a new message for each - num_subsets = codes_get(bufr_in, 'numberOfSubsets') - LOGGER.debug(f'Found {num_subsets} subsets') - for i in range(num_subsets): - idx = i + 1 - LOGGER.debug(f'Processing subset {idx}') - LOGGER.debug('Extracting subset') - codes_set(bufr_in, 'extractSubset', idx) - codes_set(bufr_in, 'doExtractSubsets', 1) - # copy the replication factors - if 31000 in descriptors: - try: - short_replication_factors = codes_get_array(bufr_in, "shortDelayedDescriptorReplicationFactor").tolist() # noqa - except Exception as e: - short_replication_factors = [] - LOGGER.error(e.__class__.__name__) - if 31001 in descriptors: - try: - replication_factors = codes_get_array(bufr_in, "delayedDescriptorReplicationFactor").tolist() # noqa - except Exception as e: - replication_factors = [] - LOGGER.error(e.__class__.__name__) - if 31002 in descriptors: - try: - extended_replication_factors = codes_get_array(bufr_in, "extendedDelayedDescriptorReplicationFactor").tolist() # noqa - except Exception as e: - extended_replication_factors = [] - LOGGER.error(e.__class__.__name__) - LOGGER.debug('Copying template BUFR') - subset_out = codes_clone(TEMPLATE) - - # set the replication factors, this needs to be done before - # setting the unexpanded descriptors - if (31000 in descriptors) and (len(short_replication_factors) > 0): # noqa - codes_set_array(subset_out, "inputShortDelayedDescriptorReplicationFactor", short_replication_factors) # noqa - if (31001 in descriptors) and (len(replication_factors) > 0): - codes_set_array(subset_out, "inputDelayedDescriptorReplicationFactor", replication_factors) # noqa - if (31002 in descriptors) and (len(extended_replication_factors) > 0): # noqa - codes_set_array(subset_out, "inputExtendedDelayedDescriptorReplicationFactor", extended_replication_factors) # noqa + ObservationDataBUFR data initializer - # we need to copy all the headers, not just the - # unexpandedDescriptors and MT number + :param def: `dict` object of resource mappings - for k, v in headers.items(): - if isinstance(v, list): - codes_set_array(subset_out, k, v) - else: - codes_set(subset_out, k, v) - - LOGGER.debug('Cloning subset to new message') - subset = codes_clone(bufr_in) - self.transform_subset(subset, subset_out) - codes_release(subset) - codes_release(subset_out) - - def transform_subset(self, subset: int, subset_out: int) -> None: - """ - Parse single BUFR message subset - :param subset: `int` of ecCodes pointer to input BUFR - :param subset_out: `int` of ecCodes pointer to output BUFR :returns: `None` """ - # workflow - # - check for WSI, - # - if None, lookup using tsi - # - check for location, - # - if None, use geometry from station report - # - check for time, - # - if temporal extent, use end time - # - set times in header - # - write a separate BUFR - parser = BUFRParser(raise_on_error=True) - LOGGER.debug('Parsing subset') - try: - parser.as_geojson(subset, id='') - except Exception as err: - LOGGER.warning(err) - try: - temp_wsi = parser.get_wsi() - temp_tsi = parser.get_tsi() - except Exception as err: - LOGGER.warning(err) + super().__init__(defs) - try: - location = parser.get_location() - if location is None or None in location['coordinates']: - raise Exception("Missing location in BUFR") - except Exception: - LOGGER.error(f"Error parsing location from subset with wsi={temp_wsi}, use coordinates from station metadata") # noqa + def transform(self, input_data: Union[Path, bytes], + filename: str = '') -> bool: - try: - data_date = parser.get_time() - except Exception: - LOGGER.error(f"Error parsing time from subset with wsi={temp_wsi}, skip this subset") # noqa - self.publish_failure_message( - description="Invalid date in BUFR data", - wsi=temp_wsi) - return + LOGGER.debug('Processing BUFR4') + data = self.as_string(input_data, base64_encode=True) - del parser + payload = { + 'inputs': { + 'channel': self.topic_hierarchy.dirpath, + 'notify': False, + 'data': data + } + } - LOGGER.debug(f'Processing temp_wsi: {temp_wsi}, temp_tsi: {temp_tsi}') - wsi = get_valid_wsi(wsi=temp_wsi, tsi=temp_tsi) - if wsi is None: - msg = 'Station not in station list: ' - msg += f'wsi={temp_wsi} tsi={temp_tsi}; skipping' - LOGGER.error(msg) - self.publish_failure_message( - description="Station not in station list", - wsi=temp_wsi) - return + process_name = 'wis2box-bufr2bufr' + result = execute_api_process(process_name, payload) try: - LOGGER.debug('Copying wsi to BUFR') - [series, issuer, number, tsi] = wsi.split('-') - codes_set(subset_out, '#1#wigosIdentifierSeries', int(series)) - codes_set(subset_out, '#1#wigosIssuerOfIdentifier', int(issuer)) - codes_set(subset_out, '#1#wigosIssueNumber', int(number)) - codes_set(subset_out, '#1#wigosLocalIdentifierCharacter', tsi) - codes_bufr_copy_data(subset, subset_out) - - if location is None or None in location['coordinates']: - msg = 'Missing coordinates in BUFR, setting from station report' # noqa - LOGGER.warning(msg) - location = get_geometry(wsi) - LOGGER.debug(f'New coordinates: {location}') - long, lat, elev = location.get('coordinates') - codes_set(subset_out, '#1#longitude', long) - codes_set(subset_out, '#1#latitude', lat) - codes_set(subset_out, '#1#heightOfStationGroundAboveMeanSeaLevel', elev) # noqa - - if '/' in data_date: - data_date = data_date.split('/')[1] - - isodate = datetime.strptime(data_date, '%Y-%m-%dT%H:%M:%SZ') - - for (name, p) in zip(TIME_NAMES, TIME_PATTERNS): - codes_set(subset_out, name, int(isodate.strftime(p))) - - isodate_str = isodate.strftime('%Y%m%dT%H%M%S') - - rmk = f"WIGOS_{wsi}_{isodate_str}" - LOGGER.info(f'Publishing with identifier: {rmk}') - - LOGGER.debug('Writing bufr4') - bufr4 = codes_get_message(subset_out) + # check for errors + for error in result['errors']: + LOGGER.error(error) + # check for warnings + for warning in result['warnings']: + LOGGER.warning(warning) + except KeyError: + LOGGER.error(f'KeyError in result={result}') + return False + + if 'data_items' not in result: + LOGGER.error(f'file={filename} failed to convert to BUFR4 (result={result})') # noqa + return False + + # loop over data_items in response + for data_item in result['data_items']: + filename = data_item['filename'] + suffix = filename.split('.')[-1] + rmk = filename.split('.')[0] + # convert data_item['data'] to bytes + input_bytes = base64.b64decode(data_item['data'].encode('utf-8')) + # define _meta + _meta = data_item['_meta'] + # convert isoformat to datetime + _meta['data_date'] = datetime.fromisoformat(_meta['data_date']) + # add relative filepath to _meta + _meta['relative_filepath'] = self.get_local_filepath(_meta['data_date']) # noqa + # add to output_data self.output_data[rmk] = { - 'bufr4': bufr4, - '_meta': { - 'identifier': rmk, - 'wigos_station_identifier': wsi, - 'data_date': isodate, - 'geometry': location, - 'relative_filepath': self.get_local_filepath(isodate) - } + suffix: input_bytes, + '_meta': _meta } - LOGGER.debug('Finished processing subset') - except Exception as err: - LOGGER.error(f'Failed processing subset: {err}') - self.publish_failure_message( - description='Failed processing subset', - wsi=wsi) + + return True def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') diff --git a/wis2box-management/wis2box/data/csv2bufr.py b/wis2box-management/wis2box/data/csv2bufr.py index 0742dc4b..1823f3cd 100644 --- a/wis2box-management/wis2box/data/csv2bufr.py +++ b/wis2box-management/wis2box/data/csv2bufr.py @@ -19,16 +19,16 @@ # ############################################################################### -import json +import base64 import logging + +from datetime import datetime from pathlib import Path from typing import Union -from csv2bufr import transform as transform_csv -import csv2bufr.templates as c2bt - +from wis2box.api import execute_api_process from wis2box.data.base import BaseAbstractData -from wis2box.metadata.station import get_valid_wsi + LOGGER = logging.getLogger(__name__) @@ -46,20 +46,6 @@ def __init__(self, defs: dict) -> None: super().__init__(defs) - self.mappings = {} - - LOGGER.debug(f'Loading template {self.template}') - if self.template.startswith('/'): - mapping_bufr4 = Path(self.template) - with mapping_bufr4.open() as fh1: - self.mappings['bufr4'] = json.load(fh1) - else: - if self.template not in c2bt.list_templates(): - raise Exception(f'Unknown template: {self.template}, options are: {c2bt.list_templates()}') # noqa - self.mappings['bufr4'] = c2bt.load_template(self.template) - - self.station_metadata = None - def transform(self, input_data: Union[Path, bytes], filename: str = '') -> bool: @@ -75,37 +61,53 @@ def transform(self, input_data: Union[Path, bytes], raise ValueError(msg) LOGGER.debug('Generating BUFR4') - input_bytes = self.as_bytes(input_data) - - LOGGER.debug('Transforming data') - results = transform_csv(input_bytes.decode(), - self.mappings['bufr4']) - - LOGGER.debug('Iterating over BUFR messages') - for item in results: - wsi = item['_meta']['properties']['wigos_station_identifier'] - if 'result' in item['_meta']: - if item['_meta']['result']['code'] != 1: - msg = item['_meta']['result']['message'] - LOGGER.error(f'Transform returned {msg} for wsi={wsi}') - self.publish_failure_message( - description='csv2bufr transform error', - wsi=wsi) - continue - if get_valid_wsi(wsi) is None: - msg = f'Station {wsi} not in station list; skipping' - LOGGER.error(msg) - self.publish_failure_message( - description='Station not in station list', - wsi=wsi) - continue - LOGGER.debug('Setting obs date for filepath creation') - identifier = item['_meta']['id'] - data_date = item['_meta']['properties']['datetime'] - - self.output_data[identifier] = item - self.output_data[identifier]['_meta']['relative_filepath'] = \ - self.get_local_filepath(data_date) + data = self.as_string(input_data) + + payload = { + 'inputs': { + 'channel': self.topic_hierarchy.dirpath, + 'template': self.template, + 'notify': False, + 'data': data + } + } + + process_name = 'wis2box-csv2bufr' + result = execute_api_process(process_name, payload) + + try: + # check for errors + for error in result['errors']: + LOGGER.error(f'input={filename} error={error}') + # check for warnings + for warning in result['warnings']: + LOGGER.warning(f'input={filename} warning={warning}') + except KeyError: + LOGGER.error(f'file={filename} failed to convert to BUFR4, result={result}') # noqa + return False + + if 'data_items' not in result: + LOGGER.error(f'file={filename} failed to convert to BUFR4') + return False + + # loop over data_items in response + for data_item in result['data_items']: + filename = data_item['filename'] + suffix = filename.split('.')[-1] + rmk = filename.split('.')[0] + # convert data_item['data'] to bytes + input_bytes = base64.b64decode(data_item['data'].encode('utf-8')) + # define _meta + _meta = data_item['_meta'] + # convert isoformat to datetime + _meta['data_date'] = datetime.fromisoformat(_meta['data_date']) + # add relative filepath to _meta + _meta['relative_filepath'] = self.get_local_filepath(_meta['data_date']) # noqa + # add to output_data + self.output_data[rmk] = { + suffix: input_bytes, + '_meta': _meta + } return True diff --git a/wis2box-management/wis2box/data/message.py b/wis2box-management/wis2box/data/message.py new file mode 100644 index 00000000..ce75c837 --- /dev/null +++ b/wis2box-management/wis2box/data/message.py @@ -0,0 +1,79 @@ +############################################################################### +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +############################################################################### + +import logging + +from pathlib import Path +from datetime import datetime +from typing import Union + +from wis2box.data.base import BaseAbstractData + +LOGGER = logging.getLogger(__name__) + + +class MessageData(BaseAbstractData): + """ + DataPublish: + + transform sets output_data to input_data + using metadata received by the plugin + """ + + def __init__(self, defs: dict) -> None: + try: + self._meta = defs['_meta'] + except KeyError: + error = f'No _meta in defs: {defs}' + LOGGER.error(error) + raise KeyError(error) + super().__init__(defs) + + def transform(self, input_data: Union[Path, bytes], + filename: str = '') -> bool: + """ + Transform input_data to output_data + + :param input_data: input data + :param filename: filename of input data + :param _meta: metadata of input data + + :returns: `bool` of result + """ + + suffix = filename.split('.')[-1] + rmk = filename.split('.')[0] + input_bytes = self.as_bytes(input_data) + + # convert isoformat to datetime + self._meta['data_date'] = datetime.fromisoformat(self._meta['data_date']) # noqa + # add relative filepath to _meta + self._meta['relative_filepath'] = self.get_local_filepath(self._meta['data_date']) # noqa + + self.output_data[rmk] = { + suffix: input_bytes, + '_meta': self._meta + } + return True + + def get_local_filepath(self, date_): + yyyymmdd = date_.strftime('%Y-%m-%d') + return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath diff --git a/wis2box-management/wis2box/data/synop2bufr.py b/wis2box-management/wis2box/data/synop2bufr.py index 5bdcf147..5b9d317b 100644 --- a/wis2box-management/wis2box/data/synop2bufr.py +++ b/wis2box-management/wis2box/data/synop2bufr.py @@ -19,14 +19,15 @@ # ############################################################################### +import base64 import logging + +from datetime import datetime from pathlib import Path from typing import Union -from synop2bufr import transform as transform_synop - +from wis2box.api import execute_api_process from wis2box.data.base import BaseAbstractData -from wis2box.metadata.station import get_valid_wsi, get_stations_csv LOGGER = logging.getLogger(__name__) @@ -44,12 +45,8 @@ def __init__(self, defs: dict) -> None: super().__init__(defs) - self.mappings = {} - - self.station_metadata = get_stations_csv() - def transform(self, input_data: Union[Path, bytes], - filename: str = '') -> bool: + filename: str = '', _meta: dict = None) -> bool: LOGGER.debug('Processing SYNOP ASCII data') @@ -65,7 +62,7 @@ def transform(self, input_data: Union[Path, bytes], raise ValueError(msg) LOGGER.debug('Generating BUFR4') - input_bytes = self.as_bytes(input_data) + data = self.as_string(input_data) try: year = int(file_match.group(1)) @@ -75,43 +72,53 @@ def transform(self, input_data: Union[Path, bytes], LOGGER.error(msg) raise ValueError(msg) - LOGGER.debug('Transforming data') - bufr_generator = transform_synop(input_bytes.decode(), - self.station_metadata, - year, month) - results = [] + # post data do wis2box-api/oapi/processes/synop2bufr + payload = { + 'inputs': { + 'channel': self.topic_hierarchy.dirpath, + 'year': year, + 'month': month, + 'notify': False, + 'data': data # noqa + } + } + + process_name = 'wis2box-synop2bufr' + result = execute_api_process(process_name, payload) try: - for item in bufr_generator: - results.append(item) - except Exception as err: - LOGGER.error(f'Error in bufr_generator: {err}') - - LOGGER.debug('Iterating over BUFR messages') - for item in results: - wsi = item['_meta']['properties']['wigos_station_identifier'] - if 'result' in item['_meta']: - if item['_meta']['result']['code'] != 1: - msg = item['_meta']['result']['message'] - LOGGER.error(f'Transform returned {msg} for wsi={wsi}') - self.publish_failure_message( - description='synop2bufr transform error', - wsi=wsi) - continue - if get_valid_wsi(wsi) is None: - msg = f'Station not in station list: wsi={wsi}; skipping' - LOGGER.error(msg) - self.publish_failure_message( - description='Station not in station list', - wsi=wsi) - continue - LOGGER.debug('Setting obs date for filepath creation') - identifier = item['_meta']['id'] - data_date = item['_meta']['properties']['datetime'] - - self.output_data[identifier] = item - self.output_data[identifier]['_meta']['relative_filepath'] = \ - self.get_local_filepath(data_date) + # check for errors + for error in result['errors']: + LOGGER.error(error) + # check for warnings + for warning in result['warnings']: + LOGGER.warning(warning) + except KeyError: + LOGGER.error(f'file={filename} failed to convert to BUFR4, result={result}') # noqa + return False + + if 'data_items' not in result: + LOGGER.error(f'file={filename} failed to convert to BUFR4') + return False + + # loop over data_items in response + for data_item in result['data_items']: + filename = data_item['filename'] + suffix = filename.split('.')[-1] + rmk = filename.split('.')[0] + # convert data_item['data'] to bytes + input_bytes = base64.b64decode(data_item['data'].encode('utf-8')) + # define _meta + _meta = data_item['_meta'] + # convert isoformat to datetime + _meta['data_date'] = datetime.fromisoformat(_meta['data_date']) + # add relative filepath to _meta + _meta['relative_filepath'] = self.get_local_filepath(_meta['data_date']) # noqa + # add to output_data + self.output_data[rmk] = { + suffix: input_bytes, + '_meta': _meta + } return True diff --git a/wis2box-management/wis2box/pubsub/mqtt.py b/wis2box-management/wis2box/pubsub/mqtt.py index 3f9f0085..76b9c1b2 100644 --- a/wis2box-management/wis2box/pubsub/mqtt.py +++ b/wis2box-management/wis2box/pubsub/mqtt.py @@ -82,7 +82,9 @@ def pub(self, topic: str, message: str, qos: int = 1) -> bool: LOGGER.debug(f'Topic: {topic}') LOGGER.debug(f'Message: {message}') + self.conn.loop_start() result = self.conn.publish(topic, message, qos) + self.conn.loop_stop() # TODO: investigate implication # result.wait_for_publish() diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index eb08a30d..5646e014 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -19,6 +19,7 @@ # ############################################################################### +import base64 import json import logging import multiprocessing as mp @@ -27,11 +28,12 @@ import click -from wis2box.api import upsert_collection_item from wis2box import cli_helpers import wis2box.data as data_ -from wis2box.api import remove_collection, setup_collection +from wis2box.api import (remove_collection, setup_collection, + upsert_collection_item) from wis2box.data_mappings import get_data_mappings +from wis2box.data.message import MessageData from wis2box.env import (BROKER_HOST, BROKER_PORT, BROKER_USERNAME, BROKER_PASSWORD, STORAGE_SOURCE, STORAGE_ARCHIVE) from wis2box.handler import Handler, NotHandledError @@ -39,6 +41,7 @@ from wis2box.plugin import load_plugin, PLUGINS from wis2box.pubsub.message import gcm + LOGGER = logging.getLogger(__name__) @@ -50,9 +53,9 @@ def __init__(self, broker): self.broker.bind('on_message', self.on_message_handler) self.broker.sub('wis2box/#') - def handle(self, filepath, message): + def handle(self, filepath): try: - LOGGER.info(f'Processing {message} for {filepath}') + LOGGER.info(f'Processing {filepath}') # load handler handler = Handler(filepath=filepath, data_mappings=self.data_mappings) @@ -71,18 +74,42 @@ def handle(self, filepath, message): msg = f'handle() error: {err}' raise err + def handle_publish(self, message): + LOGGER.debug('Loading MessageData plugin to publish data from message') # noqa + defs = { + 'topic_hierarchy': message['channel'].replace('origin/a/wis2/', ''), # noqa + '_meta': message['_meta'], + 'notify': True + } + MessageData(defs=defs) + plugin = MessageData(defs=defs) + try: + input_bytes = base64.b64decode(message['data'].encode('utf-8')) + plugin.transform( + input_data=input_bytes, + filename=message['filename'] + ) + except Exception as err: + msg = f'MessageData-transform failed: {err}' + LOGGER.error(msg, exc_info=True) + return False + try: + plugin.publish() + except Exception as err: + msg = f'MessageData-publish failed: {err}' + LOGGER.error(msg, exc_info=True) + return False + def on_message_handler(self, client, userdata, msg): LOGGER.debug(f'Raw message: {msg.payload}') topic = msg.topic message = json.loads(msg.payload) LOGGER.info(f'Incoming message on topic {topic}') - filepath = None if topic == 'wis2box/notifications': LOGGER.info(f'Notification: {message}') # store notification in messages collection upsert_collection_item('messages', message) - return elif (topic == 'wis2box/storage' and message.get('EventName', '') == 's3:ObjectCreated:Put'): LOGGER.debug('Storing data') @@ -90,31 +117,28 @@ def on_message_handler(self, client, userdata, msg): filepath = f'{STORAGE_SOURCE}/{key}' if key.startswith(STORAGE_ARCHIVE): LOGGER.info(f'Do not process archived-data: {key}') - return + # start a new process to handle the received data + while len(mp.active_children()) == mp.cpu_count(): + sleep(0.1) + mp.Process(target=self.handle, args=(filepath,)).start() + elif topic == 'wis2box/data/publication': + LOGGER.debug('Publishing data') + self.handle_publish(message) elif topic == 'wis2box/data_mappings/refresh': - LOGGER.debug('Refreshing data mappings') + LOGGER.info('Refreshing data mappings') self.data_mappings = get_data_mappings() - return + LOGGER.info(f'Data mappings: {self.data_mappings}') elif topic == 'wis2box/dataset/publication': LOGGER.debug('Publishing dataset') metadata = message discovery_metadata.publish_discovery_metadata(metadata) data_.add_collection_data(metadata) - return elif topic.startswith('wis2box/dataset/unpublication'): LOGGER.debug('Unpublishing dataset') identifier = topic.split('/')[-1] remove_collection(identifier) - return else: LOGGER.debug('Ignoring message') - return - - if filepath: - while len(mp.active_children()) == mp.cpu_count(): - sleep(0.1) - p = mp.Process(target=self.handle, args=(filepath, message)) - p.start() @click.command()