diff --git a/config/settings.cfg.tmpl b/config/settings.cfg.tmpl index f84b3d9..9a6df65 100644 --- a/config/settings.cfg.tmpl +++ b/config/settings.cfg.tmpl @@ -17,6 +17,10 @@ GEONAMES_INDEX = "geonames" # timeout value for ElasticSearch bulk requests (defaults to 10) BULK_REQUEST_TIMEOUT = 30 +# Elasticsearch imposes a 100mb limit on payload size when making requests +# this breaks the data into smaller chunks before using the bulk API +BULK_LIMIT = 1e+8 # 100,000,000 bytes (100MB) + # Redis URL REDIS_URL = "redis://{{ MOZART_REDIS_PVT_IP }}:6379/0" diff --git a/grq2/__init__.py b/grq2/__init__.py index 6ab942e..b1bd254 100644 --- a/grq2/__init__.py +++ b/grq2/__init__.py @@ -112,3 +112,11 @@ def resource_not_found(e): from grq2.services.api_v02.service import services as api_v02_services app.register_blueprint(api_v02_services) + + +if __name__ != '__main__': + import logging + + gunicorn_logger = logging.getLogger('gunicorn.error') + app.logger.handlers = gunicorn_logger.handlers + app.logger.setLevel(gunicorn_logger.level) diff --git a/grq2/lib/geonames.py b/grq2/lib/geonames.py index 9f28e37..71e9dd7 100644 --- a/grq2/lib/geonames.py +++ b/grq2/lib/geonames.py @@ -170,7 +170,7 @@ def get_nearest_cities(lon, lat, size=5): index = app.config['GEONAMES_INDEX'] # query for results try: res = grq_es.search(index=index, body=query) - app.logger.debug("get_continents(): %s" % json.dumps(query, indent=2)) + app.logger.debug("get_continents(): %s" % json.dumps(query)) results = [] for hit in res['hits']['hits']: @@ -250,7 +250,7 @@ def get_continents(lon, lat): index = app.config['GEONAMES_INDEX'] # query for results try: res = grq_es.search(index=index, body=query) - app.logger.debug("get_continents(): %s" % json.dumps(query, indent=2)) + app.logger.debug("get_continents(): %s" % json.dumps(query)) results = [] for hit in res['hits']['hits']: diff --git a/grq2/services/api_v02/datasets.py b/grq2/services/api_v02/datasets.py index 35b8c1e..6c2a675 100644 --- a/grq2/services/api_v02/datasets.py +++ b/grq2/services/api_v02/datasets.py @@ -105,6 +105,37 @@ def reverse_geolocation(prod_json): prod_json['temporal_span'] = get_ts(start_time, end_time) +def split_array_chunk(data): + """ + Elasticsearch/Opensearch has a 100mb size limit when making API calls + function breaks each array into chunks + :param data: List[Dict] + :return: List[Dict] + """ + bulk_limit = app.config.get("BULK_LIMIT", 1e+8) + + main_data = [] + batch = [] + cur_byte_count = 0 + for i in range(0, len(data), 2): + action = data[i] + doc = data[i+1] + + action_size = len(str.encode(json.dumps(action))) + doc_size = len(str.encode(json.dumps(doc))) + + if cur_byte_count + action_size + doc_size + 8 < bulk_limit: + batch.extend([action, doc]) + cur_byte_count = cur_byte_count + action_size + doc_size + 8 + else: + main_data.append(batch) + batch = [action, doc] + cur_byte_count = action_size + doc_size + 8 + + main_data.append(batch) + return main_data + + @grq_ns.route('/dataset/index', endpoint='dataset_index') @grq_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Dataset index.") class IndexDataset(Resource): @@ -137,21 +168,35 @@ def post(self): docs_bulk.append({"index": {"_index": index, "_id": _id}}) docs_bulk.append(ds) - response = grq_es.es.bulk(body=docs_bulk, request_timeout=bulk_request_timeout) - if response["errors"] is True: - app.logger.error(response) - delete_docs = [] - for doc in docs_bulk: - if "index" in doc: - delete_docs.append({"delete": doc["index"]}) - grq_es.es.bulk(delete_docs, request_timeout=bulk_request_timeout) + errors = False + error_list = [] + _delete_docs = [] # keep track of docs if they need to be rolled back + data_chunks = split_array_chunk(docs_bulk) # splitting the data into 100MB chunks + app.logger.info("data split into %d chunk(s)" % len(data_chunks)) + + for chunk in data_chunks: + resp = grq_es.es.bulk(body=chunk, request_timeout=bulk_request_timeout) + for item in resp["items"]: + doc_info = item["index"] + _delete_docs.append({"delete": {"_index": doc_info["_index"], "_id": doc_info["_id"]}}) + if resp["errors"] is True: + errors = True + error_list.extend(list(filter(lambda x: "error" in x["index"], resp["items"]))) + break + + if errors is True: + app.logger.error("ERROR indexing documents in Elasticsearch, rolling back...") + app.logger.error(error_list) + grq_es.es.bulk(_delete_docs, request_timeout=bulk_request_timeout) return { "success": False, - "message": response["items"] + "message": error_list, }, 400 + + app.logger.info("successfully indexed %d documents" % len(datasets)) return { "success": True, - "message": response + "message": "successfully indexed %d documents" % len(datasets), } except ElasticsearchException as e: message = "Failed index dataset. {0}:{1}\n{2}".format(type(e), e, traceback.format_exc()) diff --git a/setup.py b/setup.py index 88f1258..b36c310 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='grq2', - version='2.0.22', + version='2.0.23', long_description='GeoRegionQuery REST API using ElasticSearch backend', packages=find_packages(), include_package_data=True,