Skip to content
This repository was archived by the owner on Dec 3, 2023. It is now read-only.

Commit

Permalink
Fix Qquery Dedup key issue (#3)
Browse files Browse the repository at this point in the history
* introducing pds to qquery

* qquery to support opds

* qquery changes for opendataset

* add the pds flag

* edit queue names for autoscaling

* expose queue name for ease of naming opds autoscaling groups

* variable renaming!

* better deduplication for several cases

* fix deduplicate ke mix-up

* throttle queries to request only by month

* correct list appending

* Different query strategy for OpenDataset vs other AOIs due to sixe

* fix wrong keys in deduplication
  • Loading branch information
shitong01 authored Apr 2, 2019
1 parent 131be5c commit 67dbef0
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions qquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from hysds.celery import app
REDIS_URL = get_redis_endpoint()
POOL = None
DEDUP_KEY = 'dedup_redis_key'
DEDUP_KEY_PDS = 'dedup_redis_key_pds'
DEDUP_KEY = config()['dedup_redis_key']
DEDUP_KEY_PDS = config()['dedup_redis_key_pds']



Expand Down Expand Up @@ -61,27 +61,29 @@ def search(clazz,path):
continue
return clazz

def deduplicate(self, title, dedup_key):
def deduplicate(self, title, target_dedup_key):
'''
Prevents the re-adding of sling downloads
@params title - name of granule to check if downloading
'''
redis_target_key = config()[dedup_key]
global POOL
if POOL is None:
POOL = ConnectionPool.from_url(REDIS_URL)
r = StrictRedis(connection_pool=POOL)

if dedup_key == DEDUP_KEY:
if target_dedup_key == DEDUP_KEY:
# If sling to own bucket, checks both typical and pds lists if it is deduped
# If it is found in PDS bucket / own bucket, skip the sling
deduped = r.sismember(DEDUP_KEY, title) or r.sismember(DEDUP_KEY_PDS, title)
print("%s is deduped: %s." % (title, deduped))

if not deduped:
r.sadd(redis_target_key, title)
r.sadd(target_dedup_key, title)

return deduped
else:
# If sling to pds bucket, just check pds lists if it is deduped
return r.sadd(redis_target_key, title) == 0
return r.sadd(target_dedup_key, title) == 0


@backoff.on_exception(backoff.expo, requests.exceptions.RequestException,
Expand Down Expand Up @@ -117,15 +119,23 @@ def run(self, aoi, input_qtype, dns_list_str, rtag=None, pds_queue=None):
# break queries up by month so it does not exceed 10000 requests
results = []
print("querying %s for %s products from %s to %s" % (input_qtype, product, start_time, end_time))
monthsplit_daterange = split_daterange_by_month(start_time, end_time)

try:

for month in monthsplit_daterange:
month_results = self.query_results(month.get("start"),month.get("end"),aoi,mapping=product)
if not pds_queue:
# Query the whole start to end if it is not OpenDatasetAOI
results = self.query_results(start_time, end_time, aoi, mapping=product)
print("returned %s results for the range %s to %s" %
(str(len(month_results)), month.get("start"), month.get("end")))
results = results + month_results
(str(len(results)), start_time, end_time))

else:
# Query month-wise if it is OpenDatasetAOI as AOI is too large and returns > 10000 results
monthsplit_daterange = split_daterange_by_month(start_time, end_time)

for month in monthsplit_daterange:
month_results = self.query_results(month.get("start"),month.get("end"),aoi,mapping=product)
print("returned %s results for the range %s to %s" %
(str(len(month_results)), month.get("start"), month.get("end")))
results = results + month_results

for title, link in results:
# rotate dns in dns_list by replacing dns in link
Expand Down

0 comments on commit 67dbef0

Please sign in to comment.