Skip to content

Commit

Permalink
async crawler for domain_objs
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Jun 4, 2019
1 parent e5dcfe6 commit b990bda
Showing 1 changed file with 64 additions and 33 deletions.
97 changes: 64 additions & 33 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,65 @@
import hsds_logger as log
import config

class DomainCrawler:
def __init__(self, app, root_id, include_attrs=True, max_tasks=40):
log.info(f"DomainCrawler.__init__ root_id: {root_id}")
self._app = app
self._include_attrs = include_attrs
self._max_tasks = max_tasks
self._q = asyncio.Queue()
self._obj_dict = {}
self.seen_ids = set()
self._q.put_nowait(root_id)

async def crawl(self):
workers = [asyncio.Task(self.work())
for _ in range(self._max_tasks)]
# When all work is done, exit.
log.info("DomainCrawler - await queue.join")
await self._q.join()
log.info("DomainCrawler - join complete")

for w in workers:
w.cancel()
log.debug("DomainCrawler - workers canceled")

async def work(self):
while True:
obj_id = await self._q.get()
await self.fetch(obj_id)
self._q.task_done()

async def fetch(self, obj_id):
log.debug(f"DomainCrawler - fetch for obj_id: {obj_id}")
obj_json = await getObjectJson(self._app, obj_id, include_links=True, include_attrs=self._include_attrs)
log.debug(f"DomainCrawler - for {obj_id} got json: {obj_json}")

# including links, so don't need link count
if "link_count" in obj_json:
del obj_json["link_count"]
self._obj_dict[obj_id] = obj_json
if self._include_attrs:
del obj_json["attributeCount"]

# if this is a group, iterate through all the hard links and
# add to the lookup ids set
if getCollectionForId(obj_id) == "groups":
links = obj_json["links"]
log.debug(f"DomainCrawler links: {links}")
for title in links:
log.debug(f"DomainCrawler - got link: {title}")
link_obj = links[title]
if link_obj["class"] != 'H5L_TYPE_HARD':
continue
link_id = link_obj["id"]
if link_id not in self._obj_dict:
# haven't seen this object yet, get obj json
log.debug(f"DomainCrawler - adding link_id: {link_id}")
self._obj_dict[link_id] = {} # placeholder for obj id
self._q.put_nowait(link_id)
log.debug(f"DomainCrawler - fetch conplete obj_id: {obj_id}")


async def get_collections(app, root_id):
""" Return the object ids for given root.
Expand Down Expand Up @@ -87,42 +146,14 @@ async def get_collections(app, root_id):
async def getDomainObjects(app, root_id, include_attrs=False):
""" Iterate through all objects in heirarchy and add to obj_dict keyed by obj id
"""

log.info(f"getDomainObjects for root: {root_id}")
lookup_ids = set()
lookup_ids.add(root_id)
obj_dict = {}

while lookup_ids:
obj_id = lookup_ids.pop()
obj_json = await getObjectJson(app, obj_id, include_links=True, include_attrs=include_attrs)
log.debug(f"getDomainObjects for {obj_id}: {obj_json}")
crawler = DomainCrawler(app, root_id, include_attrs=include_attrs)
await crawler.crawl()
log.info(f"getDomainObjects returning: {len(crawler._obj_dict)} objects")

# including links, so don't need link count
if "link_count" in obj_json:
del obj_json["link_count"]
obj_dict[obj_id] = obj_json
if include_attrs:
del obj_json["attributeCount"]

# if this is a group, iterate through all the hard links and
# add to the lookup ids set
if getCollectionForId(obj_id) == "groups":
links = obj_json["links"]
log.debug(f"getDomainObjects links: {links}")
for title in links:
log.debug(f"getDomainObjects - got link: {title}")
link_obj = links[title]
if link_obj["class"] != 'H5L_TYPE_HARD':
continue
link_id = link_obj["id"]
if link_id not in obj_dict:
# haven't seen this object yet, get obj json
log.debug(f"getDomainObjects - adding link_id: {link_id}")
lookup_ids.add(link_id)

log.info(f"getDomainObjects returning: {len(obj_dict)} objects")
return obj_dict
return crawler._obj_dict

def getIdList(objs, marker=None, limit=None):
""" takes a map of ids to objs and returns ordered list
Expand Down

0 comments on commit b990bda

Please sign in to comment.