9
9
from DIRAC .ConfigurationSystem .Client .Helpers .Operations import Operations
10
10
from DIRAC .Core .Security import Properties
11
11
from DIRAC .Core .Utilities .PrettyPrint import printDict
12
+ from DIRAC .FrameworkSystem .Client .Logger import setContextLogger
12
13
from DIRAC .ResourceStatusSystem .Client .SiteStatus import SiteStatus
13
14
from DIRAC .WorkloadManagementSystem .Client import JobStatus , PilotStatus
14
15
from DIRAC .WorkloadManagementSystem .Client .Limiter import Limiter
@@ -50,11 +51,7 @@ def __init__(self, pilotAgentsDB=None, jobDB=None, tqDB=None, jlDB=None, opsHelp
50
51
self .opsHelper = Operations ()
51
52
52
53
if pilotRef :
53
- self .log = gLogger .getSubLogger (f"[{ pilotRef } ]Matcher" )
54
- self .pilotAgentsDB .log = gLogger .getSubLogger (f"[{ pilotRef } ]Matcher" )
55
- self .jobDB .log = gLogger .getSubLogger (f"[{ pilotRef } ]Matcher" )
56
- self .tqDB .log = gLogger .getSubLogger (f"[{ pilotRef } ]Matcher" )
57
- self .jlDB .log = gLogger .getSubLogger (f"[{ pilotRef } ]Matcher" )
54
+ self .log = gLogger .getLocalSubLogger (f"[{ pilotRef } ]Matcher" )
58
55
else :
59
56
self .log = gLogger .getSubLogger ("Matcher" )
60
57
@@ -64,86 +61,86 @@ def __init__(self, pilotAgentsDB=None, jobDB=None, tqDB=None, jlDB=None, opsHelp
64
61
65
62
def selectJob (self , resourceDescription , credDict ):
66
63
"""Main job selection function to find the highest priority job matching the resource capacity"""
64
+ with setContextLogger (self .log ):
65
+ startTime = time .time ()
66
+
67
+ resourceDict = self ._getResourceDict (resourceDescription , credDict )
68
+
69
+ # Make a nice print of the resource matching parameters
70
+ toPrintDict = dict (resourceDict )
71
+ if "MaxRAM" in resourceDescription :
72
+ toPrintDict ["MaxRAM" ] = resourceDescription ["MaxRAM" ]
73
+ if "NumberOfProcessors" in resourceDescription :
74
+ toPrintDict ["NumberOfProcessors" ] = resourceDescription ["NumberOfProcessors" ]
75
+ toPrintDict ["Tag" ] = []
76
+ if "Tag" in resourceDict :
77
+ for tag in resourceDict ["Tag" ]:
78
+ if not tag .endswith ("GB" ) and not tag .endswith ("Processors" ):
79
+ toPrintDict ["Tag" ].append (tag )
80
+ if not toPrintDict ["Tag" ]:
81
+ toPrintDict .pop ("Tag" )
82
+ self .log .info ("Resource description for matching" , printDict (toPrintDict ))
83
+
84
+ negativeCond = self .limiter .getNegativeCondForSite (resourceDict ["Site" ], resourceDict .get ("GridCE" ))
85
+ result = self .tqDB .matchAndGetJob (resourceDict , negativeCond = negativeCond )
67
86
68
- startTime = time .time ()
69
-
70
- resourceDict = self ._getResourceDict (resourceDescription , credDict )
71
-
72
- # Make a nice print of the resource matching parameters
73
- toPrintDict = dict (resourceDict )
74
- if "MaxRAM" in resourceDescription :
75
- toPrintDict ["MaxRAM" ] = resourceDescription ["MaxRAM" ]
76
- if "NumberOfProcessors" in resourceDescription :
77
- toPrintDict ["NumberOfProcessors" ] = resourceDescription ["NumberOfProcessors" ]
78
- toPrintDict ["Tag" ] = []
79
- if "Tag" in resourceDict :
80
- for tag in resourceDict ["Tag" ]:
81
- if not tag .endswith ("GB" ) and not tag .endswith ("Processors" ):
82
- toPrintDict ["Tag" ].append (tag )
83
- if not toPrintDict ["Tag" ]:
84
- toPrintDict .pop ("Tag" )
85
- self .log .info ("Resource description for matching" , printDict (toPrintDict ))
86
-
87
- negativeCond = self .limiter .getNegativeCondForSite (resourceDict ["Site" ], resourceDict .get ("GridCE" ))
88
- result = self .tqDB .matchAndGetJob (resourceDict , negativeCond = negativeCond )
89
-
90
- if not result ["OK" ]:
91
- raise RuntimeError (result ["Message" ])
92
- result = result ["Value" ]
93
- if not result ["matchFound" ]:
94
- self .log .info ("No match found" )
95
- return {}
96
-
97
- jobID = result ["jobId" ]
98
- resAtt = self .jobDB .getJobAttributes (jobID , ["Status" ])
99
- if not resAtt ["OK" ]:
100
- raise RuntimeError ("Could not retrieve job attributes" )
101
- if not resAtt ["Value" ]:
102
- raise RuntimeError ("No attributes returned for job" )
103
- if not resAtt ["Value" ]["Status" ] == "Waiting" :
104
- self .log .error ("Job matched by the TQ is not in Waiting state" , str (jobID ))
105
- result = self .tqDB .deleteJob (jobID )
106
87
if not result ["OK" ]:
107
88
raise RuntimeError (result ["Message" ])
108
- raise RuntimeError (f"Job { str (jobID )} is not in Waiting state" )
89
+ result = result ["Value" ]
90
+ if not result ["matchFound" ]:
91
+ self .log .info ("No match found" )
92
+ return {}
93
+
94
+ jobID = result ["jobId" ]
95
+ resAtt = self .jobDB .getJobAttributes (jobID , ["Status" ])
96
+ if not resAtt ["OK" ]:
97
+ raise RuntimeError ("Could not retrieve job attributes" )
98
+ if not resAtt ["Value" ]:
99
+ raise RuntimeError ("No attributes returned for job" )
100
+ if not resAtt ["Value" ]["Status" ] == "Waiting" :
101
+ self .log .error ("Job matched by the TQ is not in Waiting state" , str (jobID ))
102
+ result = self .tqDB .deleteJob (jobID )
103
+ if not result ["OK" ]:
104
+ raise RuntimeError (result ["Message" ])
105
+ raise RuntimeError (f"Job { str (jobID )} is not in Waiting state" )
109
106
110
- self ._reportStatus (resourceDict , jobID )
107
+ self ._reportStatus (resourceDict , jobID )
111
108
112
- result = self .jobDB .getJobJDL (jobID )
113
- if not result ["OK" ]:
114
- raise RuntimeError ("Failed to get the job JDL" )
115
-
116
- resultDict = {}
117
- resultDict ["JDL" ] = result ["Value" ]
118
- resultDict ["JobID" ] = jobID
119
-
120
- matchTime = time .time () - startTime
121
- self .log .verbose ("Match time" , f"[{ str (matchTime )} ]" )
122
-
123
- # Get some extra stuff into the response returned
124
- resOpt = self .jobDB .getJobOptParameters (jobID )
125
- if resOpt ["OK" ]:
126
- for key , value in resOpt ["Value" ].items ():
127
- resultDict [key ] = value
128
- resAtt = self .jobDB .getJobAttributes (jobID , ["Owner" , "OwnerGroup" ])
129
- if not resAtt ["OK" ]:
130
- raise RuntimeError ("Could not retrieve job attributes" )
131
- if not resAtt ["Value" ]:
132
- raise RuntimeError ("No attributes returned for job" )
133
-
134
- if self .opsHelper .getValue ("JobScheduling/CheckMatchingDelay" , True ):
135
- self .limiter .updateDelayCounters (resourceDict ["Site" ], jobID )
136
-
137
- pilotInfoReportedFlag = resourceDict .get ("PilotInfoReportedFlag" , False )
138
- if not pilotInfoReportedFlag :
139
- self ._updatePilotInfo (resourceDict )
140
- self ._updatePilotJobMapping (resourceDict , jobID )
141
-
142
- resultDict ["Owner" ] = resAtt ["Value" ]["Owner" ]
143
- resultDict ["Group" ] = resAtt ["Value" ]["OwnerGroup" ]
144
- resultDict ["PilotInfoReportedFlag" ] = True
145
-
146
- return resultDict
109
+ result = self .jobDB .getJobJDL (jobID )
110
+ if not result ["OK" ]:
111
+ raise RuntimeError ("Failed to get the job JDL" )
112
+
113
+ resultDict = {}
114
+ resultDict ["JDL" ] = result ["Value" ]
115
+ resultDict ["JobID" ] = jobID
116
+
117
+ matchTime = time .time () - startTime
118
+ self .log .verbose ("Match time" , f"[{ str (matchTime )} ]" )
119
+
120
+ # Get some extra stuff into the response returned
121
+ resOpt = self .jobDB .getJobOptParameters (jobID )
122
+ if resOpt ["OK" ]:
123
+ for key , value in resOpt ["Value" ].items ():
124
+ resultDict [key ] = value
125
+ resAtt = self .jobDB .getJobAttributes (jobID , ["Owner" , "OwnerGroup" ])
126
+ if not resAtt ["OK" ]:
127
+ raise RuntimeError ("Could not retrieve job attributes" )
128
+ if not resAtt ["Value" ]:
129
+ raise RuntimeError ("No attributes returned for job" )
130
+
131
+ if self .opsHelper .getValue ("JobScheduling/CheckMatchingDelay" , True ):
132
+ self .limiter .updateDelayCounters (resourceDict ["Site" ], jobID )
133
+
134
+ pilotInfoReportedFlag = resourceDict .get ("PilotInfoReportedFlag" , False )
135
+ if not pilotInfoReportedFlag :
136
+ self ._updatePilotInfo (resourceDict )
137
+ self ._updatePilotJobMapping (resourceDict , jobID )
138
+
139
+ resultDict ["Owner" ] = resAtt ["Value" ]["Owner" ]
140
+ resultDict ["Group" ] = resAtt ["Value" ]["OwnerGroup" ]
141
+ resultDict ["PilotInfoReportedFlag" ] = True
142
+
143
+ return resultDict
147
144
148
145
def _getResourceDict (self , resourceDescription , credDict ):
149
146
"""from resourceDescription to resourceDict (just various mods)"""
0 commit comments