-
Notifications
You must be signed in to change notification settings - Fork 30
feat: measuring compute efficiency per job #221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 105 commits
b4235fb
27b8610
7e6b369
ed1beaf
c50d28e
7c0e434
d947665
bd3d900
15f7892
f61ab7c
f55f8b0
2861539
474ead0
6da5674
7a51cf9
0a3c17b
9c8ffc0
8faafba
b78baeb
7f49ee5
c53937c
b3f80d8
9f8ed3e
dba70d8
d69a0b1
be45e7b
bf18d2b
8f5de00
bb4c85e
bc1f024
8d0f20f
11862ed
4b671e3
9361226
3cac42c
317a2af
f59d03d
641beb3
8baba42
92c3531
8321d29
5c21a1b
5686272
8007365
4cfac88
aca969c
7cf6c86
bfbc3a6
545f00d
2cc48a3
c4a1385
dd6e897
a9f1ceb
c38d11e
548eba6
acee2a6
a2ce419
212143d
1f31fc0
22f8715
a48eea5
22c3244
4ff426f
a3a1f34
a265e8c
bd4c173
1e06187
266aa41
05019fb
135c47a
af84f85
ebeb08b
2d1b041
bf3e88f
082e01a
4e99e32
67b0671
a1c5ba2
86ff31a
0cb4b7f
b433010
efd6398
7267dab
2c334ad
a26e395
02a855d
ee59544
db9116a
a0a0618
96551c6
7c8e3d6
4167460
07d663a
55e7d58
2dbc6a8
aadbf8c
820a9db
b56bf06
2b72aa4
8b2e100
fb826d1
930b70e
cb3db0c
91815df
f983322
66f0dc3
602db18
b7c80ef
012ce8f
761b562
067fac7
ece91e1
856601e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,6 @@ | |
__email__ = "johannes.koester@uni-due.de" | ||
__license__ = "MIT" | ||
|
||
import atexit | ||
import csv | ||
from io import StringIO | ||
import os | ||
|
@@ -16,6 +15,9 @@ | |
from datetime import datetime, timedelta | ||
from typing import List, Generator, Optional | ||
import uuid | ||
|
||
import pandas as pd | ||
|
||
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo | ||
from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor | ||
from snakemake_interface_executor_plugins.settings import ( | ||
|
@@ -27,7 +29,12 @@ | |
) | ||
from snakemake_interface_common.exceptions import WorkflowError | ||
|
||
from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string | ||
from .utils import ( | ||
delete_slurm_environment, | ||
delete_empty_dirs, | ||
set_gres_string, | ||
) | ||
from .efficiency_report import time_to_seconds, parse_maxrss, parse_reqmem | ||
from .submit_string import get_submit_command | ||
|
||
|
||
|
@@ -106,6 +113,37 @@ class ExecutorSettings(ExecutorSettingsBase): | |
"required": False, | ||
}, | ||
) | ||
efficiency_report: bool = field( | ||
default=False, | ||
metadata={ | ||
"help": "Generate an efficiency report at the end of the workflow. " | ||
"This flag has no effect, if not set.", | ||
"env_var": False, | ||
"required": False, | ||
}, | ||
) | ||
efficiency_report_path: Optional[Path] = field( | ||
default=None, | ||
metadata={ | ||
"help": "Path to the efficiency report file. " | ||
"If not set, the report will be written to " | ||
"the current working directory with the name " | ||
"'efficiency_report_<run_uuid>.csv'. " | ||
"This flag has no effect, if not set.", | ||
"env_var": False, | ||
"required": False, | ||
}, | ||
) | ||
efficiency_threshold: Optional[float] = field( | ||
default=0.8, | ||
metadata={ | ||
"help": "The efficiency threshold for the efficiency report. " | ||
"Jobs with an efficiency below this threshold will be reported. " | ||
"This flag has no effect, if not set.", | ||
"env_var": False, | ||
"required": False, | ||
}, | ||
) | ||
|
||
|
||
# Required: | ||
|
@@ -149,7 +187,21 @@ def __post_init__(self, test_mode: bool = False): | |
if self.workflow.executor_settings.logdir | ||
else Path(".snakemake/slurm_logs").resolve() | ||
) | ||
atexit.register(self.clean_old_logs) | ||
|
||
def shutdown(self) -> None: | ||
""" | ||
Shutdown the executor. | ||
This method is overloaded, to include the cleaning of old log files | ||
and to optionally create an efficiency report. | ||
""" | ||
# First, we invoke the original shutdown method | ||
super().shutdown() | ||
|
||
# Next, clean up old log files, unconditionally. | ||
self.clean_old_logs() | ||
# If the efficiency report is enabled, create it. | ||
if self.workflow.executor_settings.efficiency_report: | ||
self.create_efficiency_report() | ||
|
||
def clean_old_logs(self) -> None: | ||
"""Delete files older than specified age from the SLURM log directory.""" | ||
|
@@ -160,20 +212,23 @@ def clean_old_logs(self) -> None: | |
return | ||
cutoff_secs = age_cutoff * 86400 | ||
current_time = time.time() | ||
self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s)") | ||
self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s).") | ||
|
||
for path in self.slurm_logdir.rglob("*.log"): | ||
if path.is_file(): | ||
try: | ||
file_age = current_time - path.stat().st_mtime | ||
if file_age > cutoff_secs: | ||
path.unlink() | ||
except (OSError, FileNotFoundError) as e: | ||
self.logger.warning(f"Could not delete logfile {path}: {e}") | ||
self.logger.error(f"Could not delete logfile {path}: {e}") | ||
# we need a 2nd iteration to remove putatively empty directories | ||
try: | ||
delete_empty_dirs(self.slurm_logdir) | ||
except (OSError, FileNotFoundError) as e: | ||
self.logger.warning(f"Could not delete empty directory {path}: {e}") | ||
self.logger.error( | ||
f"Could not delete empty directories in {self.slurm_logdir}: {e}" | ||
) | ||
|
||
def warn_on_jobcontext(self, done=None): | ||
if not done: | ||
|
@@ -730,10 +785,137 @@ def check_slurm_extra(self, job): | |
jobname = re.compile(r"--job-name[=?|\s+]|-J\s?") | ||
if re.search(jobname, job.resources.slurm_extra): | ||
raise WorkflowError( | ||
"The --job-name option is not allowed in the 'slurm_extra' " | ||
"parameter. The job name is set by snakemake and must not be " | ||
"overwritten. It is internally used to check the stati of the " | ||
"all submitted jobs by this workflow." | ||
"The --job-name option is not allowed in the 'slurm_extra' parameter. " | ||
"The job name is set by snakemake and must not be overwritten. " | ||
"It is internally used to check the stati of the all submitted jobs " | ||
"by this workflow." | ||
"Please consult the documentation if you are unsure how to " | ||
"query the status of your jobs." | ||
) | ||
|
||
def create_efficiency_report(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe move this into a separate module, in order to keep init smaller? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, this was the idea, initially. Yet, not possible with the |
||
""" | ||
Fetch sacct job data for a Snakemake workflow | ||
and compute efficiency metrics. | ||
""" | ||
cmd = f"sacct --name={self.run_uuid} --parsable2 --noheader" | ||
cmd += ( | ||
" --format=JobID,JobName,Comment,Elapsed,TotalCPU," | ||
"NNodes,NCPUS,MaxRSS,ReqMem" | ||
) | ||
e_threshold = self.workflow.executor_settings.efficiency_threshold | ||
|
||
try: | ||
result = subprocess.run( | ||
shlex.split(cmd), capture_output=True, text=True, check=True | ||
) | ||
lines = result.stdout.strip().split("\n") | ||
except subprocess.CalledProcessError: | ||
self.logger.error( | ||
f"Failed to retrieve job data for workflow {self.run_uuid}." | ||
) | ||
return None | ||
|
||
# Convert to DataFrame | ||
df = pd.DataFrame( | ||
(line.split("|") for line in lines), | ||
columns=[ | ||
"JobID", | ||
"JobName", | ||
"Comment", | ||
"Elapsed", | ||
"TotalCPU", | ||
"NNodes", | ||
"NCPUS", | ||
"MaxRSS", | ||
"ReqMem", | ||
], | ||
) | ||
|
||
# If the "Comment" column is empty, | ||
# a) delete the column | ||
# b) issue a warning | ||
if df["Comment"].isnull().all(): | ||
self.logger.warning( | ||
f"No comments found for workflow {self.run_uuid}. " | ||
"This field is used to store the rule name. " | ||
"Please ensure that the 'comment' field is set for your cluster. " | ||
"Administrators can set this up in the SLURM configuration." | ||
) | ||
df.drop(columns=["Comment"], inplace=True) | ||
# remember, that the comment column is not available | ||
nocomment = True | ||
# else: rename the column to 'RuleName' | ||
else: | ||
df.rename(columns={"Comment": "RuleName"}, inplace=True) | ||
nocomment = False | ||
# Convert types | ||
df["NNodes"] = pd.to_numeric(df["NNodes"], errors="coerce") | ||
df["NCPUS"] = pd.to_numeric(df["NCPUS"], errors="coerce") | ||
|
||
# Convert time fields | ||
df["Elapsed_sec"] = df["Elapsed"].apply(time_to_seconds) | ||
df["TotalCPU_sec"] = df["TotalCPU"].apply(time_to_seconds) | ||
|
||
# Compute CPU efficiency | ||
df["CPU Efficiency (%)"] = ( | ||
df["TotalCPU_sec"] / (df["Elapsed_sec"] * df["NCPUS"]) | ||
) * 100 | ||
df["CPU Efficiency (%)"] = df["CPU Efficiency (%)"].fillna(0).round(2) | ||
|
||
# Convert MaxRSS | ||
df["MaxRSS_MB"] = df["MaxRSS"].apply(parse_maxrss) | ||
|
||
# Convert ReqMem and calculate memory efficiency | ||
df["RequestedMem_MB"] = df.apply( | ||
lambda row: parse_reqmem(row["ReqMem"], row["NNodes"]), axis=1 | ||
) | ||
df["Memory Usage (%)"] = df.apply( | ||
lambda row: ( | ||
(row["MaxRSS_MB"] / row["RequestedMem_MB"] * 100) | ||
if row["RequestedMem_MB"] > 0 | ||
else 0 | ||
), | ||
axis=1, | ||
) | ||
|
||
df["Memory Usage (%)"] = df["Memory Usage (%)"].fillna(0).round(2) | ||
|
||
# Drop all rows containing "batch" or "extern" as job names | ||
df = df[~df["JobName"].str.contains("batch|extern")] | ||
|
||
# Log warnings for low efficiency | ||
for _, row in df.iterrows(): | ||
if row["CPU Efficiency (%)"] < e_threshold: | ||
if nocomment: | ||
self.logger.warning( | ||
f"Job {row['JobID']} ({row['JobName']}) " | ||
f"has low CPU efficiency: {row['CPU Efficiency (%)']}%." | ||
) | ||
else: | ||
# if the comment column is available, we can use it to | ||
# identify the rule name | ||
self.logger.warning( | ||
f"Job {row['JobID']} for rule '{row['RuleName']}' " | ||
f"({row['JobName']}) has low CPU efficiency: " | ||
f"{row['CPU Efficiency (%)']}%." | ||
) | ||
|
||
# we construct a path object to allow for a customi | ||
# logdir, if specified | ||
p = Path() | ||
|
||
# Save the report to a CSV file | ||
logfile = f"efficiency_report_{self.run_uuid}.csv" | ||
if self.workflow.executor_settings.efficiency_report_path: | ||
logfile = ( | ||
Path(self.workflow.executor_settings.efficiency_report_path) / logfile | ||
) | ||
else: | ||
logfile = p.cwd() / logfile | ||
df.to_csv(logfile) | ||
|
||
# write out the efficiency report at normal verbosity in any case | ||
self.logger.info( | ||
f"Efficiency report for workflow {self.run_uuid} saved to {logfile}." | ||
) |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,50 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import re | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import pandas as pd | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def time_to_seconds(time_str): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"""Convert SLURM time format to seconds.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if pd.isna(time_str) or time_str.strip() == "": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
parts = time_str.split(":") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(parts) == 3: # H:M:S | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif len(parts) == 2: # M:S | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return int(parts[0]) * 60 + float(parts[1]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elif len(parts) == 1: # S | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return float(parts[0]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def parse_maxrss(maxrss): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"""Convert MaxRSS to MB.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if pd.isna(maxrss) or maxrss.strip() == "" or maxrss == "0": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
match = re.match(r"(\d+)([KMG]?)", maxrss) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if match: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
value, unit = match.groups() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
value = int(value) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
unit_multipliers = {"K": 1 / 1024, "M": 1, "G": 1024} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return value * unit_multipliers.get(unit, 1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+29
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion
SLURM occasionally reports memory with decimals. The current regex restricts to integers, silently returning 0 for such lines and skewing usage statistics. - match = re.match(r"(\d+)([KMG]?)", maxrss)
+ match = re.match(r"(\d+(?:\.\d+)?)([KMG]?)", maxrss)
...
- value = int(value)
+ value = float(value) 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def parse_reqmem(reqmem, number_of_nodes=1): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"""Convert requested memory to MB.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if pd.isna(reqmem) or reqmem.strip() == "": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
match = re.match( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
r"(\d+)([KMG])?(\S+)?", reqmem | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) # Handles "4000M" or "4G" or "2G/node" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if match: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
value, unit, per_unit = match.groups() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
value = int(value) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
unit_multipliers = {"K": 1 / 1024, "M": 1, "G": 1024} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
mem_mb = value * unit_multipliers.get(unit, 1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if per_unit and "/node" in per_unit: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# the memory values is per node, hence we need to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# multiply with the number of nodes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return mem_mb * number_of_nodes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return mem_mb # Default case (per CPU or total) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+38
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion NaN propagation & ‘/cpu’ interpretation
Consider: - if per_unit and "/node" in per_unit:
- # the memory values is per node, hence we need to
- # multiply with the number of nodes
- return mem_mb * number_of_nodes
- return mem_mb # Default case (per CPU or total)
+ if per_unit:
+ if "/node" in per_unit:
+ return mem_mb * (number_of_nodes if pd.notna(number_of_nodes) else 1)
+ if "/cpu" in per_unit:
+ # multiply by CPUs per task later; keep per-cpu value for now
+ return mem_mb
+ return mem_mb 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
Uh oh!
There was an error while loading. Please reload this page.