Skip to content
This repository was archived by the owner on Dec 3, 2023. It is now read-only.

Commit

Permalink
Qquery changes for slings across multiple scihub accounts with DNS Al…
Browse files Browse the repository at this point in the history
…ias (#1)

* qquery changes for dns_alias

* correct bug

* job_spec changes for dns_alias

* fix bug with input params

* qquery changes for dns_alias

* qquery changes for dns_alias

* qquery changes for dns_alias fix bugs

* whatsup here

* for debugging and staging

* qquery changes for dns_alias

* BUGS

* changes to job-spec for separate queues (up to 5)

* updates to cron.py for more endpoints on scihub

* moar changes for new queues!

* sorry, my bad

* this as well!

* use query_enpoitn instead of qtype

* change the bash file too!

* i am blind

* better distinction for queries in sling

* let qquery handle th splitting up pf slings into scihubxx queues

* we will not change the plugins!

* bug in param name

* revert unecessary changes

* revert unecessary changes

* fix bug

* remove debugs

* 1 queued for ASF to sling

* add "dns_list" to other jobs too in dockerfiles!

* print more information
  • Loading branch information
shitong01 authored Sep 18, 2018
1 parent c81481f commit f7710d8
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 14 deletions.
8 changes: 7 additions & 1 deletion docker/hold_hysds-io.json.sling_acquisition
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
"enumerables":
[
"factotum-job_worker-scihub_throttled",
"factotum-job_worker-scihub1_throttled",
"factotum-job_worker-scihub2_throttled",
"factotum-job_worker-scihub3_throttled",
"factotum-job_worker-scihub4_throttled",
"factotum-job_worker-scihub5_throttled",
"factotum-job_worker-apihub_throttled",
"factotum-job_worker-asf_throttled",
"factotum-job_worker-asf1_throttled",
"factotum-job_worker-small",
"factotum-job_worker-crawl",
"urgent-response-job_worker-large",
"urgent-response-job_worker-small"
],
"default": "factotum-job_worker-scihub_throttled"
"default": "factotum-job_worker-scihub1_throttled"

},
{
Expand Down
5 changes: 5 additions & 0 deletions docker/hysds-io.json.cron
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
{
"name": "endpoint",
"from": "submitter"
},
{
"name": "dns_list",
"from": "submitter",
"type": "text"
}
]
}
5 changes: 5 additions & 0 deletions docker/hysds-io.json.qquery
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
"from": "submitter",
"type": "jobspec_version",
"version_regex": "sling"
},
{
"name": "dns_list",
"from": "submitter",
"type": "text"
}
]
}
5 changes: 5 additions & 0 deletions docker/hysds-io.json.qquery_iterate
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
"from": "submitter",
"type": "jobspec_version",
"version_regex": "sling"
},
{
"name": "dns_list",
"from": "submitter",
"type": "text"
}
]
}
4 changes: 4 additions & 0 deletions docker/job-spec.json.cron
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
{
"name": "endpoint",
"destination": "positional"
},
{
"name": "dns_list",
"destination": "positional"
}
]
}
16 changes: 15 additions & 1 deletion docker/job-spec.json.qquery
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
{
"command":"/home/ops/verdi/ops/qquery/qquery/query.sh",
"imported_worker_files":{"/home/ops/.netrc":"/home/ops/.netrc"},
"required-queues": ["factotum-job_worker-apihub_throttled","factotum-job_worker-asf_throttled","factotum-job_worker-scihub_throttled","factotum-job_worker-unavco_throttled"],
"required-queues": [
"factotum-job_worker-apihub_throttled",
"factotum-job_worker-asf_throttled",
"factotum-job_worker-asf1_throttled",
"factotum-job_worker-scihub_throttled",
"factotum-job_worker-scihub1_throttled",
"factotum-job_worker-scihub2_throttled",
"factotum-job_worker-scihub3_throttled",
"factotum-job_worker-scihub4_throttled",
"factotum-job_worker-scihub5_throttled",
"factotum-job_worker-unavco_throttled"],
"soft_time_limit": 3600,
"time_limit": 3600,
"disk_usage":"10GB",
Expand All @@ -17,6 +27,10 @@
{
"name": "sling_version",
"destination": "positional"
},
{
"name": "dns_list",
"destination": "positional"
}
]
}
16 changes: 15 additions & 1 deletion docker/job-spec.json.qquery_iterate
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
{
"command":"/home/ops/verdi/ops/qquery/qquery/query.sh",
"imported_worker_files":{"/home/ops/.netrc":"/home/ops/.netrc"},
"required-queues": ["factotum-job_worker-apihub_throttled","factotum-job_worker-asf_throttled","factotum-job_worker-scihub_throttled","factotum-job_worker-unavco_throttled"],
"required-queues": [
"factotum-job_worker-apihub_throttled",
"factotum-job_worker-asf_throttled",
"factotum-job_worker-asf1_throttled",
"factotum-job_worker-scihub_throttled",
"factotum-job_worker-scihub1_throttled",
"factotum-job_worker-scihub2_throttled",
"factotum-job_worker-scihub3_throttled",
"factotum-job_worker-scihub4_throttled",
"factotum-job_worker-scihub5_throttled",
"factotum-job_worker-unavco_throttled"],
"disk_usage":"10GB",
"params" : [
{
Expand All @@ -15,6 +25,10 @@
{
"name": "sling_version",
"destination": "positional"
},
{
"name": "dns_list",
"destination": "positional"
}
]
}
10 changes: 8 additions & 2 deletions qquery/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("qtype", help="query endpoint, e.g. (asf|scihub|unavco)")
parser.add_argument("--dns_list", help="dns list for qtype to use from .netrc, comma separated", required=True)
parser.add_argument("--tag", help="PGE docker image tag (release, version, " +
"or branch) to propagate",
default="master", required=False)
Expand All @@ -32,6 +33,7 @@
args = parser.parse_args()

query_endpoint = args.qtype
dns_list = args.dns_list
qquery_rtag = args.tag
sling_rtag = qquery_rtag if args.sling_tag is None else args.sling_tag

Expand All @@ -45,7 +47,7 @@
#if the region is inactive, skip
print("AOI {0} marked as inactive. Skipping".format(region["id"]))
continue

#skip regions without types_types map
if "query" not in region["metadata"].keys():
continue
Expand All @@ -57,7 +59,7 @@

#determine qquery job submission branch
job_spec = 'job-qquery:'+qquery_rtag

#determine the repo to query from the types_map in the aoi
for qtype in region["metadata"]["query"].keys(): #list of endpoints to query
if qtype != query_endpoint:
Expand All @@ -84,6 +86,10 @@
"from": "value",
"value": "{}".format(qtype),
},
{"name": "dns_list",
"from": "value",
"value": "{}".format(dns_list),
},
{ "name": "sling_version",
"from": "value",
"value": "{}".format(sling_rtag),
Expand Down
36 changes: 28 additions & 8 deletions qquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ def query_results(self, start_time, end_time, aoi, mapping=None):
'''
return self.query(start_time, end_time, aoi, mapping=mapping)

def run(self, aoi, input_qtype, rtag=None):
def run(self, aoi, input_qtype, dns_list_str, rtag=None):
'''
Run the overall query. Should not be overridden.
'''
#determine config parameters
query_params = self.parse_params(aoi, input_qtype)
query_params = self.parse_params(aoi, input_qtype, dns_list_str)
if query_params == None:
print("Failed to parse params properly")
print("aoi:",aoi)
Expand All @@ -91,15 +91,28 @@ def run(self, aoi, input_qtype, rtag=None):
start_time = query_params["starttime"]
end_time = query_params["endtime"]
products = query_params["products"]
dns_list = query_params["dns_list"]

#counter for rotating between DNS and queues for sling downloads, so as to overcome per account download limits
num_queue = 0
num_dns = len(dns_list)

#each products contains a specific mapping for the query
for product in products:
print("querying %s for %s products from %s to %s" % (input_qtype, product, start_time, end_time))
try:
results = self.query_results(start_time,end_time,aoi,mapping=product)
print("returned %s results" % str(len(results)))
for title,link in results:
print("submitting sling for endpoint: %s, url: %s" % (input_qtype, link))
self.submit_sling_job(aoi, query_params, input_qtype, title, link, rtag)
# rotate dns in dns_list by replacing dns in link
num_queue += 1
queue_grp = (num_queue % num_dns) + 1
new_dns_link = re.sub('(?<=https:\/\/).*?(?=\/)', dns_list[queue_grp-1], link)

print("submitting sling for endpoint: %s, queuegrp:%s, url: %s aliased to %s"
% (input_qtype, queue_grp, link, new_dns_link))
self.submit_sling_job(aoi, query_params, input_qtype, queue_grp, title, new_dns_link, rtag)

except QueryBadResponseException as qe:
print("Error: Failed to query properly. {0}".format(str(qe)),file=sys.stderr)
raise qe
Expand All @@ -108,7 +121,7 @@ def run(self, aoi, input_qtype, rtag=None):
self.saveStamp(stamp,self.stampKeyname(aoi,input_qtype))


def submit_sling_job(self, aoi, query_params, qtype, title, link, rtag=None):
def submit_sling_job(self, aoi, query_params, qtype, queue_grp, title, link, rtag=None):
#Query for all products, and return a list of (Title,URL)
cfg = config() #load settings.json
priority = query_params["priority"]
Expand Down Expand Up @@ -140,7 +153,7 @@ def submit_sling_job(self, aoi, query_params, qtype, title, link, rtag=None):
job_type = "job:spyddder-sling_%s" % qtype
job_name = "spyddder-sling_%s-%s-%s.%s" % (qtype,aoi['id'],title,self.getFileType())
oauth_url = None
queue = "factotum-job_worker-%s_throttled" % qtype # job submission queue
queue = "factotum-job_worker-%s_throttled" % (qtype+str(queue_grp)) # job submission queue

#set sling job spec release/branch
if rtag is None:
Expand Down Expand Up @@ -206,7 +219,7 @@ def submit_sling_job(self, aoi, query_params, qtype, title, link, rtag=None):



def parse_params(self, aoi, input_qtype):
def parse_params(self, aoi, input_qtype, dns_list_str):
'''
parses the parameters from the aoi, determines proper start/end times and returns a dict object containing params
'''
Expand Down Expand Up @@ -298,12 +311,16 @@ def parse_params(self, aoi, input_qtype):
for tag in tags:
tags.append(aoi["metadata"][qtype]["tag"])

# parses dns comma seperated string to array
dns_list = [x.strip() for x in dns_list_str.split(',')]

#fill parameters
params["starttime"] = start_time
params["endtime"] = end_time
params["priority"] = priority
params["products"] = products
params["tag"] = tags
params["dns_list"] = dns_list
return params


Expand Down Expand Up @@ -464,6 +481,8 @@ def parser():
parse.add_argument("-r","--region", required=True, help="Region to submit the query for", dest="region")
parse.add_argument("-t","--query-type", required=True, help="Query type to find correct query handler", dest="qtype")
parse.add_argument("--tag", help="PGE docker image tag (release, version, or branch) to propagate", required=False)
parse.add_argument("--dns_list", help="List of DNS to use as endpoint for this query, comma separated")

return parse

if __name__ == "__main__":
Expand All @@ -472,6 +491,7 @@ def parser():
cfg = config()
# Detect region
region = None

#skip to the aoi we want
for aoi in get_aois(cfg):
if aoi["id"] == args.region:
Expand All @@ -484,7 +504,7 @@ def parser():
try:
print("Finding handler: {0}".format(args.qtype))
handler = AbstractQuery.getQueryHandler(args.qtype)
handler.run(aoi, args.qtype, args.tag)
handler.run(aoi, args.qtype, args.dns_list, args.tag)
#a = AbstractQuery()
#a.run(aoi)
except Exception as e:
Expand Down
7 changes: 6 additions & 1 deletion qquery/query.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ if [ -z "${3}" ]
echo "No sling release version specified"
exit 1
fi
if [ -z "${4}" ]
then
echo "No dns list specified"
exit 1
fi

${CODE_DIR}/query.py --region ${1} --query-type ${2} --tag ${3}
${CODE_DIR}/query.py --region ${1} --query-type ${2} --tag ${3} --dns_list ${4}

0 comments on commit f7710d8

Please sign in to comment.