Skip to content

Commit

Permalink
Hc 376 publish datasets (#51)
Browse files Browse the repository at this point in the history
* added bulk indexing datasets to grq2 v2 rest api

* fixed bulk indexing in datasets endpoint

removed comments and TODO's

* pinned werkzueg

* bump version

Co-authored-by: dustinlo <dustin.k.lo@jpl.nasa.gov>
  • Loading branch information
DustinKLo and dustinlo authored Aug 11, 2022
1 parent 3e48da0 commit c74a154
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 18 deletions.
1 change: 1 addition & 0 deletions grq2/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def update(update_json):
"""Update GRQ metadata and urls for a product."""
version = update_json['version'] # get version

# TODO: maybe set the set the default value to "dataset" instead of None
dataset = update_json.get('dataset', None) # determine index name
index_suffix = dataset

Expand Down
129 changes: 111 additions & 18 deletions grq2/services/api_v02/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,92 @@
from flask import request
from flask_restx import Resource, fields

from grq2 import app
from shapely.geometry import shape
from elasticsearch.exceptions import ElasticsearchException

from grq2 import app, grq_es
from .service import grq_ns
from grq2.lib.dataset import update as update_dataset
from grq2.lib.dataset import update as update_dataset, map_geojson_type

from grq2.lib.geonames import get_cities, get_nearest_cities, get_continents
from grq2.lib.time_utils import getTemporalSpanInDays as get_ts


_POINT = 'Point'
_MULTIPOINT = 'MultiPoint'
_LINESTRING = 'LineString'
_MULTILINESTRING = 'MultiLineString'
_POLYGON = 'Polygon'
_MULTIPOLYGON = 'MultiPolygon'

GEOJSON_TYPES = {_POINT, _MULTIPOINT, _LINESTRING, _MULTILINESTRING, _POLYGON, _MULTIPOLYGON}


def get_es_index(prod_json):
"""
get ES index name for dataset
:param prod_json: Dict[any]
:return: str
"""
version = prod_json['version'] # get version
dataset = prod_json.get('dataset', "dataset") # determine index name

index = '%s_%s_%s' % (app.config['GRQ_INDEX'], version, dataset) # get default index

aliases = []
if 'index' in prod_json:
if 'suffix' in prod_json['index']:
index = '%s_%s' % (app.config['GRQ_INDEX'], prod_json['index']['suffix'])
aliases.extend(prod_json['index'].get('aliases', []))
del prod_json['index']
return index.lower(), aliases


def reverse_geolocation(prod_json):
"""
retrieves the dataset's city, the nearest cities and continent
:param prod_json: Dict[any]; dataset metadata
"""
if 'location' in prod_json:
location = {**prod_json['location']} # copying location to be used to create a shapely geometry object
loc_type = location['type']

geo_json_type = map_geojson_type(loc_type)
location['type'] = geo_json_type # setting proper GEOJson type, ex. multipolygon -> MultiPolygon
prod_json['location']['type'] = geo_json_type.lower()

# add center if missing
if 'center' not in prod_json:
geo_shape = shape(location)
centroid = geo_shape.centroid
prod_json['center'] = {
'type': 'point',
'coordinates': [centroid.x, centroid.y]
}

# extract coordinates from center
lon, lat = prod_json['center']['coordinates']

# add cities
if geo_json_type in (_POLYGON, _MULTIPOLYGON):
mp = True if geo_json_type == _MULTIPOLYGON else False
coords = location['coordinates'][0]
prod_json['city'] = get_cities(coords, multipolygon=mp)
elif geo_json_type in (_POINT, _MULTIPOINT, _LINESTRING, _MULTILINESTRING):
prod_json['city'] = get_nearest_cities(lon, lat)
else:
raise TypeError('%s is not a valid GEOJson type (or un-supported): %s' % (geo_json_type, GEOJSON_TYPES))

# add closest continent
continents = get_continents(lon, lat)
prod_json['continent'] = continents[0]['name'] if len(continents) > 0 else None

# set temporal_span
if prod_json.get('starttime', None) is not None and prod_json.get('endtime', None) is not None:
if isinstance(prod_json['starttime'], str) and isinstance(prod_json['endtime'], str):
start_time = prod_json['starttime']
end_time = prod_json['endtime']
prod_json['temporal_span'] = get_ts(start_time, end_time)


@grq_ns.route('/dataset/index', endpoint='dataset_index')
Expand All @@ -35,27 +118,37 @@ class IndexDataset(Resource):
@grq_ns.marshal_with(resp_model)
@grq_ns.expect(parser, validate=True)
def post(self):
info = request.form.get('dataset_info', request.args.get('dataset_info', None))
if info is None:
return {'success': False, 'message': 'dataset_info must be supplied'}, 400
datasets = json.loads(request.json)

try:
info = json.loads(info)
except Exception as e:
message = "Failed to parse dataset info JSON."
app.logger.debug(message)
return {
'success': False,
'message': message,
'job_id': None
}, 500
docs_bulk = []
for ds in datasets:
_id = ds["id"]
index, aliases = get_es_index(ds)
reverse_geolocation(ds)
docs_bulk.append({"index": {"_index": index, "_id": _id}})
docs_bulk.append(ds)

try:
return update_dataset(info)
except Exception as e:
response = grq_es.es.bulk(body=docs_bulk)
if response["errors"] is True:
app.logger.error(response)
delete_docs = []
for doc in docs_bulk:
if "index" in doc:
delete_docs.append({"delete": doc["index"]})
grq_es.es.bulk(delete_docs)
return {
"success": False,
"message": response["items"]
}, 400
return {
"success": True,
"message": response
}
except ElasticsearchException as e:
message = "Failed index dataset. {0}:{1}\n{2}".format(type(e), e, traceback.format_exc())
app.logger.error(message)
return {
'success': False,
'message': message
}, 500
}, 400

0 comments on commit c74a154

Please sign in to comment.