Skip to content

Commit

Permalink
HC-470: Batch bulk API in grq2 (#61)
Browse files Browse the repository at this point in the history
* splitting the data into 100mb chunks

increased bulk_request_timeout default to 15 seconds

* increased buffer size

* moved bulk_limit to configuration

lowered bulk_limit default to 50mb
lowered request timeout to 10 seconds (default for ES)

* increased bulk limit to 100mb

* fixed the missing log INFO issue

add addtl logging after we chunk the data to index to ES

* removed comment

* move import logging to if statement

* removed indent in get_continents()

* additional logging

---------

Co-authored-by: dustinlo <dustin.k.lo@jpl.nasa.gov>
  • Loading branch information
DustinKLo and dustinlo authored May 30, 2023
1 parent dcc4cb1 commit 861f780
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 13 deletions.
4 changes: 4 additions & 0 deletions config/settings.cfg.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ GEONAMES_INDEX = "geonames"
# timeout value for ElasticSearch bulk requests (defaults to 10)
BULK_REQUEST_TIMEOUT = 30

# Elasticsearch imposes a 100mb limit on payload size when making requests
# this breaks the data into smaller chunks before using the bulk API
BULK_LIMIT = 1e+8 # 100,000,000 bytes (100MB)

# Redis URL
REDIS_URL = "redis://{{ MOZART_REDIS_PVT_IP }}:6379/0"

Expand Down
8 changes: 8 additions & 0 deletions grq2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,11 @@ def resource_not_found(e):

from grq2.services.api_v02.service import services as api_v02_services
app.register_blueprint(api_v02_services)


if __name__ != '__main__':
import logging

gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)
4 changes: 2 additions & 2 deletions grq2/lib/geonames.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def get_nearest_cities(lon, lat, size=5):
index = app.config['GEONAMES_INDEX'] # query for results
try:
res = grq_es.search(index=index, body=query)
app.logger.debug("get_continents(): %s" % json.dumps(query, indent=2))
app.logger.debug("get_continents(): %s" % json.dumps(query))

results = []
for hit in res['hits']['hits']:
Expand Down Expand Up @@ -250,7 +250,7 @@ def get_continents(lon, lat):
index = app.config['GEONAMES_INDEX'] # query for results
try:
res = grq_es.search(index=index, body=query)
app.logger.debug("get_continents(): %s" % json.dumps(query, indent=2))
app.logger.debug("get_continents(): %s" % json.dumps(query))

results = []
for hit in res['hits']['hits']:
Expand Down
65 changes: 55 additions & 10 deletions grq2/services/api_v02/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,37 @@ def reverse_geolocation(prod_json):
prod_json['temporal_span'] = get_ts(start_time, end_time)


def split_array_chunk(data):
"""
Elasticsearch/Opensearch has a 100mb size limit when making API calls
function breaks each array into chunks
:param data: List[Dict]
:return: List[Dict]
"""
bulk_limit = app.config.get("BULK_LIMIT", 1e+8)

main_data = []
batch = []
cur_byte_count = 0
for i in range(0, len(data), 2):
action = data[i]
doc = data[i+1]

action_size = len(str.encode(json.dumps(action)))
doc_size = len(str.encode(json.dumps(doc)))

if cur_byte_count + action_size + doc_size + 8 < bulk_limit:
batch.extend([action, doc])
cur_byte_count = cur_byte_count + action_size + doc_size + 8
else:
main_data.append(batch)
batch = [action, doc]
cur_byte_count = action_size + doc_size + 8

main_data.append(batch)
return main_data


@grq_ns.route('/dataset/index', endpoint='dataset_index')
@grq_ns.doc(responses={200: "Success", 500: "Execution failed"}, description="Dataset index.")
class IndexDataset(Resource):
Expand Down Expand Up @@ -137,21 +168,35 @@ def post(self):
docs_bulk.append({"index": {"_index": index, "_id": _id}})
docs_bulk.append(ds)

response = grq_es.es.bulk(body=docs_bulk, request_timeout=bulk_request_timeout)
if response["errors"] is True:
app.logger.error(response)
delete_docs = []
for doc in docs_bulk:
if "index" in doc:
delete_docs.append({"delete": doc["index"]})
grq_es.es.bulk(delete_docs, request_timeout=bulk_request_timeout)
errors = False
error_list = []
_delete_docs = [] # keep track of docs if they need to be rolled back
data_chunks = split_array_chunk(docs_bulk) # splitting the data into 100MB chunks
app.logger.info("data split into %d chunk(s)" % len(data_chunks))

for chunk in data_chunks:
resp = grq_es.es.bulk(body=chunk, request_timeout=bulk_request_timeout)
for item in resp["items"]:
doc_info = item["index"]
_delete_docs.append({"delete": {"_index": doc_info["_index"], "_id": doc_info["_id"]}})
if resp["errors"] is True:
errors = True
error_list.extend(list(filter(lambda x: "error" in x["index"], resp["items"])))
break

if errors is True:
app.logger.error("ERROR indexing documents in Elasticsearch, rolling back...")
app.logger.error(error_list)
grq_es.es.bulk(_delete_docs, request_timeout=bulk_request_timeout)
return {
"success": False,
"message": response["items"]
"message": error_list,
}, 400

app.logger.info("successfully indexed %d documents" % len(datasets))
return {
"success": True,
"message": response
"message": "successfully indexed %d documents" % len(datasets),
}
except ElasticsearchException as e:
message = "Failed index dataset. {0}:{1}\n{2}".format(type(e), e, traceback.format_exc())
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='grq2',
version='2.0.22',
version='2.0.23',
long_description='GeoRegionQuery REST API using ElasticSearch backend',
packages=find_packages(),
include_package_data=True,
Expand Down

0 comments on commit 861f780

Please sign in to comment.