Skip to content
This repository was archived by the owner on Dec 3, 2023. It is now read-only.

Commit

Permalink
Qquery changes to support OpenDataset sling management (#2)
Browse files Browse the repository at this point in the history
* introducing pds to qquery

* qquery to support opds

* qquery changes for opendataset

* add the pds flag

* edit queue names for autoscaling

* expose queue name for ease of naming opds autoscaling groups

* variable renaming!

* better deduplication for several cases

* fix deduplicate ke mix-up

* throttle queries to request only by month

* correct list appending
  • Loading branch information
shitong01 authored Mar 7, 2019
1 parent ca22f57 commit 131be5c
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 76 deletions.
1 change: 1 addition & 0 deletions qquery/config/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"aoi_index": "grq_*_area_of_interest",
"dedup_redis_key":"granules-s1a_slc",
"dedup_redis_key_pds":"granules-s1a_slc-pds",
"window-size-days":1,
"repository-base":"https://aria-alt-dav.jpl.nasa.gov/incoming"
}
218 changes: 142 additions & 76 deletions qquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from hysds.celery import app
REDIS_URL = get_redis_endpoint()
POOL = None
DEDUP_KEY = 'dedup_redis_key'
DEDUP_KEY_PDS = 'dedup_redis_key_pds'



class AbstractQuery(object):
'''
Expand Down Expand Up @@ -57,17 +61,28 @@ def search(clazz,path):
continue
return clazz

def deduplicate(self,title):
def deduplicate(self, title, dedup_key):
'''
Prevents the re-adding of sling downloads
@params title - name of granule to check if downloading
'''
key = config()['dedup_redis_key']
redis_target_key = config()[dedup_key]
global POOL
if POOL is None:
POOL = ConnectionPool.from_url(REDIS_URL)
r = StrictRedis(connection_pool=POOL)
return r.sadd(key,title) == 0

if dedup_key == DEDUP_KEY:
# If sling to own bucket, checks both typical and pds lists if it is deduped
# If it is found in PDS bucket / own bucket, skip the sling
deduped = r.sismember(DEDUP_KEY, title) or r.sismember(DEDUP_KEY_PDS, title)
if not deduped:
r.sadd(redis_target_key, title)
return deduped
else:
# If sling to pds bucket, just check pds lists if it is deduped
return r.sadd(redis_target_key, title) == 0


@backoff.on_exception(backoff.expo, requests.exceptions.RequestException,
max_tries=8, max_value=32)
Expand All @@ -77,7 +92,7 @@ def query_results(self, start_time, end_time, aoi, mapping=None):
'''
return self.query(start_time, end_time, aoi, mapping=mapping)

def run(self, aoi, input_qtype, dns_list_str, rtag=None):
def run(self, aoi, input_qtype, dns_list_str, rtag=None, pds_queue=None):
'''
Run the overall query. Should not be overridden.
'''
Expand All @@ -99,19 +114,29 @@ def run(self, aoi, input_qtype, dns_list_str, rtag=None):

#each products contains a specific mapping for the query
for product in products:
# break queries up by month so it does not exceed 10000 requests
results = []
print("querying %s for %s products from %s to %s" % (input_qtype, product, start_time, end_time))
monthsplit_daterange = split_daterange_by_month(start_time, end_time)

try:
results = self.query_results(start_time,end_time,aoi,mapping=product)
print("returned %s results" % str(len(results)))
for title,link in results:

for month in monthsplit_daterange:
month_results = self.query_results(month.get("start"),month.get("end"),aoi,mapping=product)
print("returned %s results for the range %s to %s" %
(str(len(month_results)), month.get("start"), month.get("end")))
results = results + month_results

for title, link in results:
# rotate dns in dns_list by replacing dns in link
num_queue += 1
queue_grp = (num_queue % num_dns) + 1
new_dns_link = re.sub('(?<=https:\/\/).*?(?=\/)', dns_list[queue_grp-1], link)
new_dns_link = re.sub('(?<=https:\/\/).*?(?=\/)', dns_list[queue_grp - 1], link)

print("submitting sling for endpoint: %s, queuegrp:%s, url: %s aliased to %s"
% (input_qtype, queue_grp, link, new_dns_link))
self.submit_sling_job(aoi, query_params, input_qtype, queue_grp, title, new_dns_link, rtag)
self.submit_sling_job(aoi, query_params, input_qtype, queue_grp, title, new_dns_link, rtag,
pds_queue)

except QueryBadResponseException as qe:
print("Error: Failed to query properly. {0}".format(str(qe)),file=sys.stderr)
Expand All @@ -121,53 +146,98 @@ def run(self, aoi, input_qtype, dns_list_str, rtag=None):
self.saveStamp(stamp,self.stampKeyname(aoi,input_qtype))


def submit_sling_job(self, aoi, query_params, qtype, queue_grp, title, link, rtag=None):
def submit_sling_job(self, aoi, query_params, qtype, queue_grp, title, link, rtag=None, pds_queue=None):
#Query for all products, and return a list of (Title,URL)
cfg = config() #load settings.json
priority = query_params["priority"]
products = query_params["products"]
tags = query_params["tag"]

#build payload items for job submission
yr, mo, dy = self.getDataDateFromTitle(title) #date
md5 = hashlib.md5("{0}.{1}\n".format(title,self.getFileType())).hexdigest()
repo_url = "%s/%s/%s/%s/%s/%s.%s" % (cfg["repository-base"],md5[0:8],md5[8:16],md5[16:24],md5[24:32],title,self.getFileType())
location = {}
location['type'] = 'polygon'
location['aoi'] = aoi['id']
location['coordinates'] = aoi['location']['coordinates']
prod_met = {}
prod_met['source'] = qtype
prod_met['dataset_type'] = title[0:3]
prod_met['spatial_extent'] = location
prod_met['tag'] = tags

#required params for job submission
if hasattr(self, 'getOauthUrl'):
#sling via oauth
oauth_url = self.getOauthUrl()
job_type = "job:spyddder-sling-oauth_%s" % qtype
job_name = "spyddder-sling-oauth_%s-%s-%s.%s" % (qtype,aoi['id'],title,self.getFileType())
filename = title + "." + self.getFileType()

if not pds_queue:
# build payload items for job submission
tags = query_params["tag"]
md5 = hashlib.md5("{0}.{1}\n".format(title, self.getFileType())).hexdigest()
cfg = config() # load settings.json
repo_url = "%s/%s/%s/%s/%s/%s.%s" % (
cfg["repository-base"], md5[0:8], md5[8:16], md5[16:24], md5[24:32], title, self.getFileType())
location = {}
location['type'] = 'polygon'
location['aoi'] = aoi['id']
location['coordinates'] = aoi['location']['coordinates']
prod_met = {}
prod_met['source'] = qtype
prod_met['dataset_type'] = title[0:3]
prod_met['spatial_extent'] = location
prod_met['tag'] = tags
queue = "factotum-job_worker-%s_throttled" % (qtype+str(queue_grp)) # job submission queue
job_header = 'job-sling:'
dedup_key = DEDUP_KEY
params = [
{"name": "download_url",
"from": "value",
"value": link,
},
{"name": "repo_url",
"from": "value",
"value": repo_url,
},
{"name": "prod_name",
"from": "value",
"value": title,
},
{"name": "file_type",
"from": "value",
"value": self.getFileType(),
},
{"name": "prod_date",
"from": "value",
"value": "{}".format("%s-%s-%s" % (yr, mo, dy)),
},
{"name": "prod_met",
"from": "value",
"value": prod_met,
},
{"name": "options",
"from": "value",
"value": "--force_extract"
}
]
else:
#normal sling
job_type = "job:spyddder-sling_%s" % qtype
job_name = "spyddder-sling_%s-%s-%s.%s" % (qtype,aoi['id'],title,self.getFileType())
oauth_url = None
queue = "factotum-job_worker-%s_throttled" % (qtype+str(queue_grp)) # job submission queue
# queue = "opds-%s-job_worker-small" % (qtype)
queue = pds_queue # job submission queue, no queue group for autoscalers
job_header = 'job-sling-extract-opds:'
dedup_key = DEDUP_KEY_PDS
params = [
{"name": "download_url",
"from": "value",
"value": link,
},
{"name": "prod_name",
"from": "value",
"value": "%s-pds" % title,
},
{"name": "file",
"from": "value",
"value": filename,
},
{"name": "prod_date",
"from": "value",
"value": "{}".format("%s-%s-%s" % (yr, mo, dy)),
}
]

#set sling job spec release/branch
if rtag is None:
try:
with open('_context.json') as json_data:
context = json.load(json_data)
job_spec = 'job-sling:'+context['job_specification']['job-version']
job_spec = job_header + context['job_specification']['job-version']
except:
print('Failed on loading context.json')
else: job_spec = 'job-sling:'+rtag
else: job_spec = job_header + rtag

rtime = datetime.datetime.utcnow()
job_name = "%s-%s-%s-%s-%s" % (job_spec,queue,title,rtime.strftime("%d_%b_%Y_%H:%M:%S"),aoi['id'])
job_name = job_name.lstrip('job-')
priority = query_params["priority"]

#Setup input arguments here
rule = {
Expand All @@ -176,47 +246,18 @@ def submit_sling_job(self, aoi, query_params, qtype, queue_grp, title, link, rta
"priority": priority,
"kwargs":'{}'
}
params = [
{ "name": "download_url",
"from": "value",
"value": link,
},
{ "name": "repo_url",
"from": "value",
"value": repo_url,
},
{ "name": "prod_name",
"from": "value",
"value": title,
},
{ "name": "file_type",
"from": "value",
"value": self.getFileType(),
},
{ "name": "prod_date",
"from": "value",
"value": "{}".format("%s-%s-%s" % (yr, mo, dy)),
},
{ "name": "prod_met",
"from": "value",
"value": prod_met,
},
{ "name": "options",
"from": "value",
"value": "--force_extract"
}
]

#check for dedup, if clear, submit job
if not self.deduplicate(title+"."+self.getFileType()):
if not self.deduplicate(filename, dedup_key):
submit_mozart_job({}, rule,
hysdsio={"id": "internal-temporary-wiring",
"params": params,
"job-specification": job_spec},
job_name=job_name)
else:
print("Will not submit sling job for {0}, already processed".format(title))


location = " to OpenDataset" if pds_queue else "to own bucket"
reason = "in OpenDataset" if pds_queue else "in OpenDataset or own bucket"
print("Will not submit sling job {0} to {1}, already processed {2}".format(title, location, reason))


def parse_params(self, aoi, input_qtype, dns_list_str):
Expand Down Expand Up @@ -430,6 +471,28 @@ def convert_to_dt(timestring1):
y,m,d,h,mn,sec = [int(i) for i in list(re.findall(dt_reg, timestring1)[0])]
return datetime.datetime(y,m,d,h,mn,sec)

def split_daterange_by_month(starttime, endtime):
'''Takes the start and end datetime string YYYY-MM-DDTHH:mm:SS and split period by month range'''
dt_start = convert_to_dt(starttime)
dt_end = convert_to_dt(endtime)
start_dates = [dt_start]
end_dates = []
today = dt_start
while today <= dt_end:
# print(today)
tomorrow = today + datetime.timedelta(1)
if tomorrow.month != today.month:
start_dates.append(tomorrow)
end_dates.append(today + datetime.timedelta(2))
today = tomorrow
end_dates.append(dt_end)

out_fmt = "%Y-%m-%dT%H:%M:%S"
monthsplit_daterange = [{"start":start.strftime(out_fmt), "end":end.strftime(out_fmt)}
for start, end in zip(start_dates, end_dates)]

return monthsplit_daterange


def parse_start_end(json_dict, event_time=None):
'''
Expand Down Expand Up @@ -482,6 +545,9 @@ def parser():
parse.add_argument("-t","--query-type", required=True, help="Query type to find correct query handler", dest="qtype")
parse.add_argument("--tag", help="PGE docker image tag (release, version, or branch) to propagate", required=False)
parse.add_argument("--dns_list", help="List of DNS to use as endpoint for this query, comma separated")
parse.add_argument("--pds_queue", help="Queue to send Opendataset slings jobs, specify if sling to Opendataset desired",
default=None, required=False)


return parse

Expand All @@ -504,7 +570,7 @@ def parser():
try:
print("Finding handler: {0}".format(args.qtype))
handler = AbstractQuery.getQueryHandler(args.qtype)
handler.run(aoi, args.qtype, args.dns_list, args.tag)
handler.run(aoi, args.qtype, args.dns_list, args.tag, args.pds_queue)
#a = AbstractQuery()
#a.run(aoi)
except Exception as e:
Expand Down

0 comments on commit 131be5c

Please sign in to comment.