|
7 | 7 | from io import StringIO
|
8 | 8 | import os
|
9 | 9 | import re
|
| 10 | +import shlex |
10 | 11 | import subprocess
|
11 | 12 | import time
|
12 | 13 | from dataclasses import dataclass, field
|
@@ -136,6 +137,9 @@ def run_job(self, job: JobExecutorInterface):
|
136 | 137 | call += self.get_account_arg(job)
|
137 | 138 | call += self.get_partition_arg(job)
|
138 | 139 |
|
| 140 | + if job.resources.get("clusters"): |
| 141 | + call += f" --clusters {job.resources.clusters}" |
| 142 | + |
139 | 143 | if job.resources.get("runtime"):
|
140 | 144 | call += f" -t {job.resources.runtime}"
|
141 | 145 | else:
|
@@ -200,7 +204,11 @@ def run_job(self, job: JobExecutorInterface):
|
200 | 204 | f"SLURM job submission failed. The error message was {e.output}"
|
201 | 205 | )
|
202 | 206 |
|
203 |
| - slurm_jobid = out.split(" ")[-1] |
| 207 | + # multicluster submissions yield submission infos like |
| 208 | + # "Submitted batch job <id> on cluster <name>". |
| 209 | + # To extract the job id in this case we need to match any number |
| 210 | + # in between a string - which might change in future versions of SLURM. |
| 211 | + slurm_jobid = re.search(r"\d+", out).group() |
204 | 212 | slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
|
205 | 213 | self.logger.info(
|
206 | 214 | f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} "
|
@@ -264,15 +272,22 @@ async def check_active_jobs(
|
264 | 272 | # in line 218 - once v20.11 is definitively not in use any more,
|
265 | 273 | # the more readable version ought to be re-adapted
|
266 | 274 |
|
| 275 | + # -X: only show main job, no substeps |
| 276 | + sacct_command = f"""sacct -X --parsable2 \ |
| 277 | + --clusters all \ |
| 278 | + --noheader --format=JobIdRaw,State \ |
| 279 | + --starttime {sacct_starttime} \ |
| 280 | + --endtime now --name {self.run_uuid}""" |
| 281 | + |
| 282 | + # for better redability in verbose output |
| 283 | + sacct_command = " ".join(shlex.split(sacct_command)) |
| 284 | + |
267 | 285 | # this code is inspired by the snakemake profile:
|
268 | 286 | # https://github.com/Snakemake-Profiles/slurm
|
269 | 287 | for i in range(status_attempts):
|
270 | 288 | async with self.status_rate_limiter:
|
271 | 289 | (status_of_jobs, sacct_query_duration) = await self.job_stati(
|
272 |
| - # -X: only show main job, no substeps |
273 |
| - f"sacct -X --parsable2 --noheader --format=JobIdRaw,State " |
274 |
| - f"--starttime {sacct_starttime} " |
275 |
| - f"--endtime now --name {self.run_uuid}" |
| 290 | + sacct_command |
276 | 291 | )
|
277 | 292 | if status_of_jobs is None and sacct_query_duration is None:
|
278 | 293 | self.logger.debug(f"could not check status of job {self.run_uuid}")
|
@@ -364,8 +379,10 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
|
364 | 379 | # about 30 sec, but can be longer in extreme cases.
|
365 | 380 | # Under 'normal' circumstances, 'scancel' is executed in
|
366 | 381 | # virtually no time.
|
| 382 | + scancel_command = f"scancel {jobids} --clusters=all" |
| 383 | + |
367 | 384 | subprocess.check_output(
|
368 |
| - f"scancel {jobids}", |
| 385 | + scancel_command, |
369 | 386 | text=True,
|
370 | 387 | shell=True,
|
371 | 388 | timeout=60,
|
|
0 commit comments