Skip to content

Commit

Permalink
Merge pull request #12254 from amaltaro/fix-12039-take2-wma
Browse files Browse the repository at this point in the history
Make SiteListPoller more verbose - wmagent version
  • Loading branch information
amaltaro authored Feb 7, 2025
2 parents ab7f349 + 841650b commit cdc8b18
Showing 1 changed file with 46 additions and 27 deletions.
73 changes: 46 additions & 27 deletions src/python/WMComponent/WorkflowUpdater/SiteListPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
# system modules
import logging
import threading

from pprint import pformat
# WMCore modules
from Utils.Timers import timeFunction
from WMCore.DAOFactory import DAOFactory
from WMCore.Lexicon import sanitizeURL
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper
Expand All @@ -31,7 +32,8 @@ def __init__(self, config):
# get wmstats parameters
self.wmstatsUrl = getattr(config.WorkflowUpdater, "wmstatsUrl")
self.wmstatsSrv = WMStatsServer(self.wmstatsUrl, logger=self.logger)
self.states = getattr(config.WorkflowUpdater, "states", ['running-open', 'acquired'])
self.reqStates = getattr(config.WorkflowUpdater, "states", ['running-open', 'acquired'])
self.wqeStates = getattr(config.WorkflowUpdater, "wqeStates", ['Available'])

# provide access to WMBS in local WMAgent
self.daoFactory = DAOFactory(package="WMCore.WMBS",
Expand All @@ -48,18 +50,30 @@ def __init__(self, config):
def getActiveWorkflows(self):
"""
Provide list of active requests within WMAgent
:return: dict of workflows names vs pickle files
:return: list of workflow names
"""
self.logger.info("Fetching active workflows in the agent")
# get list of active workflows in WMAgent
wflowSpecs = self.listActiveWflows.execute()

# construct dictionary of workflow names and their pickle files
wmaSpecs = {}
for wflowSpec in wflowSpecs:
name = wflowSpec['name'] # this is the name of workflow
pklFileName = wflowSpec['spec'] # the "spec" in WMBS table (wmbs_workflow.spec) is pkl file name
wmaSpecs[name] = pklFileName
return wmaSpecs
return [wflowSpec['name'] for wflowSpec in wflowSpecs]

def findUnacquiredWorkflows(self):
"""
Using a non-customized CouchDB view, find out workflows with elements
in the statuses that we would like to update (e.g. Available).
This is required because not all workflows might have been acquired by WMBS.
:return: a flat list of workflow names
"""
self.logger.info("Finding not yet active workflows in local workqueue")
response = []
# get list of active workflows in WMAgent by looking into local elements in Available status
summary = self.localWQ.getElementsCountAndJobsByWorkflow()
for wflowName, innerDict in summary.items():
for status in innerDict:
if status in self.wqeStates:
response.append(wflowName)
return response

def wmstatsDict(self, requests):
"""
Expand All @@ -71,10 +85,12 @@ def wmstatsDict(self, requests):
# get list of workflows from wmstats
outputMask = ['SiteWhitelist', 'SiteBlacklist']
wdict = {}
for state in self.states:
for state in self.reqStates:
inputConditions = {"RequestStatus": state}
self.logger.info("Fetch site info from WMStats for condition: %s and mask %s", inputConditions, outputMask)
data = self.wmstatsSrv.getFilteredActiveData(inputConditions, outputMask)
self.logger.info("Found %d workflows in WMStats with status %s", len(data), state)
self.logger.debug("Data from wmstats for status %s: %s", state, pformat(data))
for rdict in data:
# rdict here has the following structure:
# {"RequestName": "bla", "SiteWhitelist":[], "SiteBlacklist": []}
Expand All @@ -89,6 +105,7 @@ def algorithm(self, parameters=None):
"""
Perform the following logic:
- obtain list of current active workflows from the agent
- obtain list of workflows not yet active but already in local workqueue
- requests their specs from upstream wmstats server
- update site lists of active workflows
- push new specs to the agent local WorkQueue and update pickle spec file
Expand All @@ -97,11 +114,21 @@ def algorithm(self, parameters=None):
"""
# get list of active workflows from the agent, the returned dict
# is composed by workflow names and associated pickle file (data comes from WMBS)
wmaSpecs = self.getActiveWorkflows()
wflows = wmaSpecs.keys()
wflows = self.getActiveWorkflows()
self.logger.info("This agent has %d active workflows", len(wflows))
self.logger.debug("Active workflows in the agent are: %s", wflows)

# find workflows not yet active but already in local workqueue
# note that some workflows here might already be active as well (multi WQEs)
unacquiredWflows = self.findUnacquiredWorkflows()
self.logger.info("This agent has %d not yet active workflows in local workqueue", len(unacquiredWflows))

# now join all active and potentially not-yet-active workflows
wflows = list(set(wflows + unacquiredWflows))

# obtain workflow records from wmstats server
wdict = self.wmstatsDict(wflows)
self.logger.info("There is a total of %d common active workflows in the agent and wmstats", len(wdict))

# iterate over the list of active workflows which is smaller than list from wmstats
for wflow in wflows:
Expand All @@ -110,9 +137,6 @@ def algorithm(self, parameters=None):
siteWhiteList = wdict[wflow]['SiteWhitelist']
siteBlackList = wdict[wflow]['SiteBlacklist']

# get the name of pkl file from wma spec
pklFileName = wmaSpecs[wflow]

# get the local Workqueue url for the workflow's spec
specUrl = self.localWQ.hostWithAuth + "/%s/%s/spec" % (self.localWQ.db.name, wflow)

Expand All @@ -131,17 +155,12 @@ def algorithm(self, parameters=None):
try:
# update local WorkQueue first
params = {'SiteWhitelist': siteWhiteList, 'SiteBlacklist': siteBlackList}
self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available'])
self.logger.info("successfully updated workqueue elements for workflow %s", wflow)
self.localWQ.updateElementsByWorkflow(wHelper, params, status=self.wqeStates)
msg = f"Successfully updated elements for workflow '{wflow}', "
msg += f"under WQ states: {self.wqeStates} and spec at: {sanitizeURL(specUrl).get('url')}"
self.logger.info(msg)
except Exception as ex:
logging.exception("Unexpected exception while updating elements in local workqueue Details:\n%s", str(ex))
continue

try:
# persist the change at the pkl file
self.logger.info("Updating %s with new site lists within pkl file %s", wflow, pklFileName)
# save back pickle file
wHelper.save(pklFileName)
except Exception as ex:
logging.exception("Caught unexpected exception in SiteListPoller. Details:\n%s", str(ex))
continue
else:
self.logger.info("No site list changes found for workflow %s", wflow)

0 comments on commit cdc8b18

Please sign in to comment.