Skip to content

Commit

Permalink
fix conversion via API
Browse files Browse the repository at this point in the history
  • Loading branch information
maaikelimper committed Feb 27, 2024
1 parent cc7a8d6 commit 59327ec
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 35 deletions.
13 changes: 9 additions & 4 deletions wis2box-management/wis2box/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import click
import logging
import requests

from time import sleep

from wis2box import cli_helpers
from wis2box.api.backend import load_backend
Expand Down Expand Up @@ -72,10 +75,9 @@ def execute_api_process(process_name: str, payload: dict) -> dict:

headers_json = dict(response.headers)
location = headers_json['Location']
location.replace(API_URL, DOCKER_API_URL)
location = location.replace(API_URL, DOCKER_API_URL)

status = 'accepted'
response_json = None
while status == 'accepted' or status == 'running':
# get the job status
headers = {
Expand All @@ -84,9 +86,12 @@ def execute_api_process(process_name: str, payload: dict) -> dict:
}
response = requests.get(location, headers=headers)
response_json = response.json()
status = response_json['status']
if 'status' in response_json:
status = response_json['status']
sleep(0.1)
return response_json
# get result from location/results?f=json
response = requests.get(f'{location}/results?f=json', headers=headers) # noqa
return response.json()


def setup_collection(meta: dict = {}) -> bool:
Expand Down
1 change: 1 addition & 0 deletions wis2box-management/wis2box/data/bufr4.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def transform(self, input_data: Union[Path, bytes],
LOGGER.warning(warning)
except KeyError:
LOGGER.error(f'KeyError in result={result}')
return False

if 'data_items' not in result:
LOGGER.error(f'file={filename} failed to convert to BUFR4 (result={result})') # noqa
Expand Down
17 changes: 10 additions & 7 deletions wis2box-management/wis2box/data/csv2bufr.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ def transform(self, input_data: Union[Path, bytes],
process_name = 'wis2box-csv2bufr'
result = execute_api_process(process_name, payload)

# check for errors
for error in result['errors']:
LOGGER.error(error)

# check for warnings
for warning in result['warnings']:
LOGGER.warning(warning)
try:
# check for errors
for error in result['errors']:
LOGGER.error(f'input={filename} error={error}')
# check for warnings
for warning in result['warnings']:
LOGGER.warning(f'input={filename} warning={warning}')
except KeyError:
LOGGER.error(f'file={filename} failed to convert to BUFR4, result={result}') # noqa
return False

if 'data_items' not in result:
LOGGER.error(f'file={filename} failed to convert to BUFR4')
Expand Down
17 changes: 10 additions & 7 deletions wis2box-management/wis2box/data/synop2bufr.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,16 @@ def transform(self, input_data: Union[Path, bytes],
process_name = 'wis2box-synop2bufr'
result = execute_api_process(process_name, payload)

# check for errors
for error in result['errors']:
LOGGER.error(error)

# check for warnings
for warning in result['warnings']:
LOGGER.warning(warning)
try:
# check for errors
for error in result['errors']:
LOGGER.error(error)
# check for warnings
for warning in result['warnings']:
LOGGER.warning(warning)
except KeyError:
LOGGER.error(f'file={filename} failed to convert to BUFR4, result={result}') # noqa
return False

if 'data_items' not in result:
LOGGER.error(f'file={filename} failed to convert to BUFR4')
Expand Down
2 changes: 2 additions & 0 deletions wis2box-management/wis2box/pubsub/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def pub(self, topic: str, message: str, qos: int = 1) -> bool:
LOGGER.debug(f'Topic: {topic}')
LOGGER.debug(f'Message: {message}')

self.conn.loop_start()
result = self.conn.publish(topic, message, qos)
self.conn.loop_stop()

# TODO: investigate implication
# result.wait_for_publish()
Expand Down
28 changes: 11 additions & 17 deletions wis2box-management/wis2box/pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

from wis2box import cli_helpers
import wis2box.data as data_
from wis2box.api import remove_collection, setup_collection
from wis2box.api import (remove_collection, setup_collection,
upsert_collection_item)
from wis2box.data_mappings import get_data_mappings
from wis2box.data.message import MessageData
from wis2box.env import (BROKER_HOST, BROKER_PORT, BROKER_USERNAME,
Expand Down Expand Up @@ -105,46 +106,39 @@ def on_message_handler(self, client, userdata, msg):
topic = msg.topic
message = json.loads(msg.payload)
LOGGER.info(f'Incoming message on topic {topic}')
filepath = None
target = None
args = None
if topic == 'wis2box/notifications':
LOGGER.info(f'Notification: {message}')
# store notification in messages collection
upsert_collection_item('messages', message)
return
elif (topic == 'wis2box/storage' and
message.get('EventName', '') == 's3:ObjectCreated:Put'):
LOGGER.debug('Storing data')
key = str(message['Key'])
filepath = f'{STORAGE_SOURCE}/{key}'
if key.startswith(STORAGE_ARCHIVE):
LOGGER.info(f'Do not process archived-data: {key}')
return
# start a new process to handle the received data
while len(mp.active_children()) == mp.cpu_count():
sleep(0.1)
mp.Process(target=self.handle, args=(filepath,)).start()
elif (topic == 'wis2box/data/publication'):
LOGGER.debug('Publishing data')
self.handle_publish(message)
elif topic == 'wis2box/data_mappings/refresh':
LOGGER.debug('Refreshing data mappings')
LOGGER.info('Refreshing data mappings')
self.data_mappings = get_data_mappings()
return
LOGGER.info(f'Data mappings: {self.data_mappings}')
elif topic == 'wis2box/dataset/publication':
LOGGER.debug('Publishing dataset')
metadata = message
discovery_metadata.publish_discovery_metadata(metadata)
data_.add_collection_data(metadata)
return
elif topic.startswith('wis2box/dataset/unpublication'):
LOGGER.debug('Unpublishing dataset')
identifier = topic.split('/')[-1]
remove_collection(identifier)
return
else:
LOGGER.debug('Ignoring message')
return

if filepath:
while len(mp.active_children()) == mp.cpu_count():
sleep(0.1)
p = mp.Process(target=target, args=args)
p.start()


@click.command()
Expand Down

0 comments on commit 59327ec

Please sign in to comment.