diff --git a/src/python/WMComponent/WorkflowUpdater/SiteListPoller.py b/src/python/WMComponent/WorkflowUpdater/SiteListPoller.py index 854f3a98ef..5bfbc4f52a 100755 --- a/src/python/WMComponent/WorkflowUpdater/SiteListPoller.py +++ b/src/python/WMComponent/WorkflowUpdater/SiteListPoller.py @@ -8,10 +8,11 @@ # system modules import logging import threading - +from pprint import pformat # WMCore modules from Utils.Timers import timeFunction from WMCore.DAOFactory import DAOFactory +from WMCore.Lexicon import sanitizeURL from WMCore.Services.WorkQueue.WorkQueue import WorkQueue from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer from WMCore.WMSpec.WMWorkload import WMWorkloadHelper @@ -31,7 +32,8 @@ def __init__(self, config): # get wmstats parameters self.wmstatsUrl = getattr(config.WorkflowUpdater, "wmstatsUrl") self.wmstatsSrv = WMStatsServer(self.wmstatsUrl, logger=self.logger) - self.states = getattr(config.WorkflowUpdater, "states", ['running-open', 'acquired']) + self.reqStates = getattr(config.WorkflowUpdater, "states", ['running-open', 'acquired']) + self.wqeStates = getattr(config.WorkflowUpdater, "wqeStates", ['Available']) # provide access to WMBS in local WMAgent self.daoFactory = DAOFactory(package="WMCore.WMBS", @@ -48,18 +50,30 @@ def __init__(self, config): def getActiveWorkflows(self): """ Provide list of active requests within WMAgent - :return: dict of workflows names vs pickle files + :return: list of workflow names """ + self.logger.info("Fetching active workflows in the agent") # get list of active workflows in WMAgent wflowSpecs = self.listActiveWflows.execute() - # construct dictionary of workflow names and their pickle files - wmaSpecs = {} - for wflowSpec in wflowSpecs: - name = wflowSpec['name'] # this is the name of workflow - pklFileName = wflowSpec['spec'] # the "spec" in WMBS table (wmbs_workflow.spec) is pkl file name - wmaSpecs[name] = pklFileName - return wmaSpecs + return [wflowSpec['name'] for wflowSpec in wflowSpecs] + + def findUnacquiredWorkflows(self): + """ + Using a non-customized CouchDB view, find out workflows with elements + in the statuses that we would like to update (e.g. Available). + This is required because not all workflows might have been acquired by WMBS. + :return: a flat list of workflow names + """ + self.logger.info("Finding not yet active workflows in local workqueue") + response = [] + # get list of active workflows in WMAgent by looking into local elements in Available status + summary = self.localWQ.getElementsCountAndJobsByWorkflow() + for wflowName, innerDict in summary.items(): + for status in innerDict: + if status in self.wqeStates: + response.append(wflowName) + return response def wmstatsDict(self, requests): """ @@ -71,10 +85,12 @@ def wmstatsDict(self, requests): # get list of workflows from wmstats outputMask = ['SiteWhitelist', 'SiteBlacklist'] wdict = {} - for state in self.states: + for state in self.reqStates: inputConditions = {"RequestStatus": state} self.logger.info("Fetch site info from WMStats for condition: %s and mask %s", inputConditions, outputMask) data = self.wmstatsSrv.getFilteredActiveData(inputConditions, outputMask) + self.logger.info("Found %d workflows in WMStats with status %s", len(data), state) + self.logger.debug("Data from wmstats for status %s: %s", state, pformat(data)) for rdict in data: # rdict here has the following structure: # {"RequestName": "bla", "SiteWhitelist":[], "SiteBlacklist": []} @@ -89,6 +105,7 @@ def algorithm(self, parameters=None): """ Perform the following logic: - obtain list of current active workflows from the agent + - obtain list of workflows not yet active but already in local workqueue - requests their specs from upstream wmstats server - update site lists of active workflows - push new specs to the agent local WorkQueue and update pickle spec file @@ -97,11 +114,21 @@ def algorithm(self, parameters=None): """ # get list of active workflows from the agent, the returned dict # is composed by workflow names and associated pickle file (data comes from WMBS) - wmaSpecs = self.getActiveWorkflows() - wflows = wmaSpecs.keys() + wflows = self.getActiveWorkflows() + self.logger.info("This agent has %d active workflows", len(wflows)) + self.logger.debug("Active workflows in the agent are: %s", wflows) + + # find workflows not yet active but already in local workqueue + # note that some workflows here might already be active as well (multi WQEs) + unacquiredWflows = self.findUnacquiredWorkflows() + self.logger.info("This agent has %d not yet active workflows in local workqueue", len(unacquiredWflows)) + + # now join all active and potentially not-yet-active workflows + wflows = list(set(wflows + unacquiredWflows)) # obtain workflow records from wmstats server wdict = self.wmstatsDict(wflows) + self.logger.info("There is a total of %d common active workflows in the agent and wmstats", len(wdict)) # iterate over the list of active workflows which is smaller than list from wmstats for wflow in wflows: @@ -110,9 +137,6 @@ def algorithm(self, parameters=None): siteWhiteList = wdict[wflow]['SiteWhitelist'] siteBlackList = wdict[wflow]['SiteBlacklist'] - # get the name of pkl file from wma spec - pklFileName = wmaSpecs[wflow] - # get the local Workqueue url for the workflow's spec specUrl = self.localWQ.hostWithAuth + "/%s/%s/spec" % (self.localWQ.db.name, wflow) @@ -131,17 +155,12 @@ def algorithm(self, parameters=None): try: # update local WorkQueue first params = {'SiteWhitelist': siteWhiteList, 'SiteBlacklist': siteBlackList} - self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available']) - self.logger.info("successfully updated workqueue elements for workflow %s", wflow) + self.localWQ.updateElementsByWorkflow(wHelper, params, status=self.wqeStates) + msg = f"Successfully updated elements for workflow '{wflow}', " + msg += f"under WQ states: {self.wqeStates} and spec at: {sanitizeURL(specUrl).get('url')}" + self.logger.info(msg) except Exception as ex: logging.exception("Unexpected exception while updating elements in local workqueue Details:\n%s", str(ex)) continue - - try: - # persist the change at the pkl file - self.logger.info("Updating %s with new site lists within pkl file %s", wflow, pklFileName) - # save back pickle file - wHelper.save(pklFileName) - except Exception as ex: - logging.exception("Caught unexpected exception in SiteListPoller. Details:\n%s", str(ex)) - continue + else: + self.logger.info("No site list changes found for workflow %s", wflow)