diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index 36250473c..239754d67 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -21,6 +21,9 @@ import click import logging +import requests + +from time import sleep from wis2box import cli_helpers from wis2box.api.backend import load_backend @@ -72,10 +75,9 @@ def execute_api_process(process_name: str, payload: dict) -> dict: headers_json = dict(response.headers) location = headers_json['Location'] - location.replace(API_URL, DOCKER_API_URL) + location = location.replace(API_URL, DOCKER_API_URL) status = 'accepted' - response_json = None while status == 'accepted' or status == 'running': # get the job status headers = { @@ -84,9 +86,12 @@ def execute_api_process(process_name: str, payload: dict) -> dict: } response = requests.get(location, headers=headers) response_json = response.json() - status = response_json['status'] + if 'status' in response_json: + status = response_json['status'] sleep(0.1) - return response_json + # get result from location/results?f=json + response = requests.get(f'{location}/results?f=json', headers=headers) # noqa + return response.json() def setup_collection(meta: dict = {}) -> bool: diff --git a/wis2box-management/wis2box/data/bufr4.py b/wis2box-management/wis2box/data/bufr4.py index e8b509118..d15203703 100644 --- a/wis2box-management/wis2box/data/bufr4.py +++ b/wis2box-management/wis2box/data/bufr4.py @@ -71,6 +71,7 @@ def transform(self, input_data: Union[Path, bytes], LOGGER.warning(warning) except KeyError: LOGGER.error(f'KeyError in result={result}') + return False if 'data_items' not in result: LOGGER.error(f'file={filename} failed to convert to BUFR4 (result={result})') # noqa diff --git a/wis2box-management/wis2box/data/csv2bufr.py b/wis2box-management/wis2box/data/csv2bufr.py index b21645464..1823f3cda 100644 --- a/wis2box-management/wis2box/data/csv2bufr.py +++ b/wis2box-management/wis2box/data/csv2bufr.py @@ -75,13 +75,16 @@ def transform(self, input_data: Union[Path, bytes], process_name = 'wis2box-csv2bufr' result = execute_api_process(process_name, payload) - # check for errors - for error in result['errors']: - LOGGER.error(error) - - # check for warnings - for warning in result['warnings']: - LOGGER.warning(warning) + try: + # check for errors + for error in result['errors']: + LOGGER.error(f'input={filename} error={error}') + # check for warnings + for warning in result['warnings']: + LOGGER.warning(f'input={filename} warning={warning}') + except KeyError: + LOGGER.error(f'file={filename} failed to convert to BUFR4, result={result}') # noqa + return False if 'data_items' not in result: LOGGER.error(f'file={filename} failed to convert to BUFR4') diff --git a/wis2box-management/wis2box/data/synop2bufr.py b/wis2box-management/wis2box/data/synop2bufr.py index 84f42e712..5b9d317be 100644 --- a/wis2box-management/wis2box/data/synop2bufr.py +++ b/wis2box-management/wis2box/data/synop2bufr.py @@ -86,13 +86,16 @@ def transform(self, input_data: Union[Path, bytes], process_name = 'wis2box-synop2bufr' result = execute_api_process(process_name, payload) - # check for errors - for error in result['errors']: - LOGGER.error(error) - - # check for warnings - for warning in result['warnings']: - LOGGER.warning(warning) + try: + # check for errors + for error in result['errors']: + LOGGER.error(error) + # check for warnings + for warning in result['warnings']: + LOGGER.warning(warning) + except KeyError: + LOGGER.error(f'file={filename} failed to convert to BUFR4, result={result}') # noqa + return False if 'data_items' not in result: LOGGER.error(f'file={filename} failed to convert to BUFR4') diff --git a/wis2box-management/wis2box/pubsub/mqtt.py b/wis2box-management/wis2box/pubsub/mqtt.py index 3f9f00852..76b9c1b2d 100644 --- a/wis2box-management/wis2box/pubsub/mqtt.py +++ b/wis2box-management/wis2box/pubsub/mqtt.py @@ -82,7 +82,9 @@ def pub(self, topic: str, message: str, qos: int = 1) -> bool: LOGGER.debug(f'Topic: {topic}') LOGGER.debug(f'Message: {message}') + self.conn.loop_start() result = self.conn.publish(topic, message, qos) + self.conn.loop_stop() # TODO: investigate implication # result.wait_for_publish() diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index b31b26f88..6ea532040 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -30,7 +30,8 @@ from wis2box import cli_helpers import wis2box.data as data_ -from wis2box.api import remove_collection, setup_collection +from wis2box.api import (remove_collection, setup_collection, + upsert_collection_item) from wis2box.data_mappings import get_data_mappings from wis2box.data.message import MessageData from wis2box.env import (BROKER_HOST, BROKER_PORT, BROKER_USERNAME, @@ -105,14 +106,10 @@ def on_message_handler(self, client, userdata, msg): topic = msg.topic message = json.loads(msg.payload) LOGGER.info(f'Incoming message on topic {topic}') - filepath = None - target = None - args = None if topic == 'wis2box/notifications': LOGGER.info(f'Notification: {message}') # store notification in messages collection upsert_collection_item('messages', message) - return elif (topic == 'wis2box/storage' and message.get('EventName', '') == 's3:ObjectCreated:Put'): LOGGER.debug('Storing data') @@ -120,31 +117,28 @@ def on_message_handler(self, client, userdata, msg): filepath = f'{STORAGE_SOURCE}/{key}' if key.startswith(STORAGE_ARCHIVE): LOGGER.info(f'Do not process archived-data: {key}') - return + # start a new process to handle the received data + while len(mp.active_children()) == mp.cpu_count(): + sleep(0.1) + mp.Process(target=self.handle, args=(filepath,)).start() + elif (topic == 'wis2box/data/publication'): + LOGGER.debug('Publishing data') + self.handle_publish(message) elif topic == 'wis2box/data_mappings/refresh': - LOGGER.debug('Refreshing data mappings') + LOGGER.info('Refreshing data mappings') self.data_mappings = get_data_mappings() - return + LOGGER.info(f'Data mappings: {self.data_mappings}') elif topic == 'wis2box/dataset/publication': LOGGER.debug('Publishing dataset') metadata = message discovery_metadata.publish_discovery_metadata(metadata) data_.add_collection_data(metadata) - return elif topic.startswith('wis2box/dataset/unpublication'): LOGGER.debug('Unpublishing dataset') identifier = topic.split('/')[-1] remove_collection(identifier) - return else: LOGGER.debug('Ignoring message') - return - - if filepath: - while len(mp.active_children()) == mp.cpu_count(): - sleep(0.1) - p = mp.Process(target=target, args=args) - p.start() @click.command()