Skip to content

Commit

Permalink
minor updates plus function execute_api_process using async
Browse files Browse the repository at this point in the history
  • Loading branch information
maaikelimper committed Jan 25, 2024
1 parent 95f21dd commit 510314e
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 89 deletions.
52 changes: 51 additions & 1 deletion wis2box-management/wis2box/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,63 @@
import click
import logging

from time import sleep

import requests

from wis2box import cli_helpers
from wis2box.api.backend import load_backend
from wis2box.api.config import load_config
from wis2box import cli_helpers
from wis2box.env import DOCKER_API_URL

LOGGER = logging.getLogger(__name__)


def execute_api_process(process_name: str, payload: dict) -> dict:
"""
Executes a process on the API
:param process_name: process name
:param payload: payload to send to process
:returns: `dict` with execution-result
"""

LOGGER.debug('Posting data to wis2box-api')
headers = {
'accept': 'application/json',
'Content-Type': 'application/json',
'prefer': 'respond-async'
}
url = f'{DOCKER_API_URL}/processes/{process_name}/execution'

response = requests.post(url, headers=headers, json=payload)
if response.status_code >= 400:
msg = f'Failed to post data to wis2box-api: {response.status_code}'
LOGGER.error(msg)
raise ValueError(msg)

if response.status_code == 200:
return response.json()

headers_json = dict(response.headers)
location = headers_json['Location']

status = 'accepted'
response_json = None
while status == 'accepted' or status == 'running':
# get the job status
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
response = requests.get(location, headers=headers)
response_json = response.json()
status = response_json['status']
sleep(0.1)
return response_json


def setup_collection(meta: dict = {}) -> bool:
"""
Add collection to api backend and configuration
Expand Down
4 changes: 2 additions & 2 deletions wis2box-management/wis2box/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def publish_failure_message(self, description, wsi=None):
# load plugin for local broker
defs = {
'codepath': PLUGINS['pubsub']['mqtt']['plugin'],
'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa
'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa
'client_type': 'failure-publisher'
}
local_broker = load_plugin('pubsub', defs)
Expand Down Expand Up @@ -169,7 +169,7 @@ def notify(self, identifier: str, storage_path: str,
# load plugin for local broker
defs_local = {
'codepath': PLUGINS['pubsub']['mqtt']['plugin'],
'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa
'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa
'client_type': 'notify-publisher'
}
local_broker = load_plugin('pubsub', defs_local)
Expand Down
19 changes: 3 additions & 16 deletions wis2box-management/wis2box/data/bufr2geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@

import base64
import logging
import requests

from pathlib import Path
from typing import Union

from wis2box.api import execute_api_process
from wis2box.data.geojson import ObservationDataGeoJSON
from wis2box.env import DOCKER_API_URL

LOGGER = logging.getLogger(__name__)

Expand All @@ -40,13 +39,7 @@ def transform(self, input_data: Union[Path, bytes],
filename: str = '') -> bool:

LOGGER.debug('Procesing BUFR data')

LOGGER.debug('Posting data to wis2box-api')
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
url = f'{DOCKER_API_URL}/processes/bufr2geojson/execution'
process_name = 'bufr2geojson'

# check if input_data is Path object
if isinstance(input_data, Path):
Expand All @@ -63,13 +56,7 @@ def transform(self, input_data: Union[Path, bytes],
}
}

response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
msg = f'Failed to post data to wis2box-api: {response.status_code}'
LOGGER.error(msg)
raise ValueError(msg)

result = response.json()
result = execute_api_process(process_name, payload)

# check for errors
if 'error' in result and result['error'] != '':
Expand Down
28 changes: 7 additions & 21 deletions wis2box-management/wis2box/data/bufr4.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
###############################################################################

import base64

import logging
import requests

from datetime import datetime
from pathlib import Path
from typing import Union

from wis2box.api import execute_api_process
from wis2box.data.base import BaseAbstractData
from wis2box.env import DOCKER_API_URL

LOGGER = logging.getLogger(__name__)

Expand All @@ -53,28 +51,16 @@ def transform(self, input_data: Union[Path, bytes],
LOGGER.debug('Processing BUFR4')
data = self.as_string(input_data, base64_encode=True)

LOGGER.debug('Posting data to wis2box-api')
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}

payload = {
"inputs": {
"channel": self.topic_hierarchy.dirpath,
"notify": False,
"data": data
'inputs': {
'channel': self.topic_hierarchy.dirpath,
'notify': False,
'data': data
}
}

url = f'{DOCKER_API_URL}/processes/wis2box-bufr2bufr/execution'
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
msg = f'Failed to post data to wis2box-api: {response.status_code}'
LOGGER.error(msg)
raise ValueError(msg)

result = response.json()
process_name = 'wis2box-bufr2bufr'
result = execute_api_process(process_name, payload)

try:
# check for errors
Expand Down
31 changes: 9 additions & 22 deletions wis2box-management/wis2box/data/csv2bufr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
###############################################################################

import base64

import logging
import requests

from datetime import datetime
from pathlib import Path
from typing import Union

from wis2box.api import execute_api_process
from wis2box.data.base import BaseAbstractData
from wis2box.env import DOCKER_API_URL


LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,29 +63,17 @@ def transform(self, input_data: Union[Path, bytes],
LOGGER.debug('Generating BUFR4')
data = self.as_string(input_data)

LOGGER.debug('Posting data to wis2box-api')
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}

payload = {
"inputs": {
"channel": self.topic_hierarchy.dirpath,
"template": self.template,
"notify": False,
"data": data
'inputs': {
'channel': self.topic_hierarchy.dirpath,
'template': self.template,
'notify': False,
'data': data
}
}

url = f'{DOCKER_API_URL}/processes/wis2box-csv2bufr/execution'
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
msg = f'Failed to post data to wis2box-api: {response.status_code}'
LOGGER.error(msg)
raise ValueError(msg)

result = response.json()
process_name = 'wis2box-csv2bufr'
result = execute_api_process(process_name, payload)

# check for errors
for error in result['errors']:
Expand Down
5 changes: 3 additions & 2 deletions wis2box-management/wis2box/data/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime

import logging
from pathlib import Path

from pathlib import Path
from datetime import datetime
from typing import Union

from wis2box.data.base import BaseAbstractData
Expand Down
31 changes: 9 additions & 22 deletions wis2box-management/wis2box/data/synop2bufr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
###############################################################################

import base64

import logging
import requests

from datetime import datetime
from pathlib import Path
from typing import Union

from wis2box.api import execute_api_process
from wis2box.data.base import BaseAbstractData
from wis2box.env import DOCKER_API_URL

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,28 +74,17 @@ def transform(self, input_data: Union[Path, bytes],

# post data do wis2box-api/oapi/processes/synop2bufr
payload = {
"inputs": {
"channel": self.topic_hierarchy.dirpath,
"year": year,
"month": month,
"notify": False,
"data": data # noqa
'inputs': {
'channel': self.topic_hierarchy.dirpath,
'year': year,
'month': month,
'notify': False,
'data': data # noqa
}
}

LOGGER.debug('Posting data to wis2box-api')
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
url = f'{DOCKER_API_URL}/processes/wis2box-synop2bufr/execution'
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
msg = f'Failed to post data to wis2box-api: {response.status_code}'
LOGGER.error(msg)
raise ValueError(msg)

result = response.json()
process_name = 'wis2box-synop2bufr'
result = execute_api_process(process_name, payload)

# check for errors
for error in result['errors']:
Expand Down
7 changes: 4 additions & 3 deletions wis2box-management/wis2box/pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
###############################################################################

import base64
import json
import logging
import multiprocessing as mp
Expand All @@ -27,16 +28,18 @@

import click

from wis2box.api import upsert_collection_item
from wis2box import cli_helpers
from wis2box.api import setup_collection
from wis2box.api import upsert_collection_item
from wis2box.data_mappings import get_data_mappings
from wis2box.data.message import MessageData
from wis2box.env import (BROKER_HOST, BROKER_PORT, BROKER_USERNAME,
BROKER_PASSWORD, STORAGE_SOURCE, STORAGE_ARCHIVE)
from wis2box.handler import Handler, NotHandledError
from wis2box.plugin import load_plugin, PLUGINS
from wis2box.pubsub.message import gcm


LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -76,11 +79,9 @@ def handle_publish(self, message):
'_meta': message['_meta'],
'notify': True
}
from wis2box.data.message import MessageData
MessageData(defs=defs)
plugin = MessageData(defs=defs)
try:
import base64
input_bytes = base64.b64decode(message['data'].encode('utf-8'))
plugin.transform(
input_data=input_bytes,
Expand Down

0 comments on commit 510314e

Please sign in to comment.