diff --git a/.github/workflows/announce-release.yml b/.github/workflows/announce-release.yml index 15238702..50f34b82 100644 --- a/.github/workflows/announce-release.yml +++ b/.github/workflows/announce-release.yml @@ -12,7 +12,7 @@ permissions: jobs: post_to_mastodon: - if: ${{ contains(github.event.head_commit.message, 'chore(main): release') }} + if: "${{ contains(github.event.head_commit.message, 'chore(main): release') }}" runs-on: ubuntu-latest steps: - name: Post to Mastodon diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 502ee289..d60eea60 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,7 +98,7 @@ jobs: poetry install - name: Run pytest - run: poetry run coverage run -m pytest tests/tests.py -sv + run: poetry run coverage run -m pytest tests/tests.py -sv --tb=short --disable-warnings - name: Run Coverage run: poetry run coverage report -m diff --git a/pyproject.toml b/pyproject.toml index ff9b8905..e4767327 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ throttler = "^1.2.2" black = "^23.7.0" flake8 = "^6.1.0" coverage = "^7.3.1" -pytest = "^7.4.2" +pytest = "^8.3.5" snakemake = "^8.20.0" [tool.coverage.run] diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 5711f932..ad38951d 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -26,9 +26,9 @@ JobExecutorInterface, ) from snakemake_interface_common.exceptions import WorkflowError -from snakemake_executor_plugin_slurm_jobstep import get_cpu_setting from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string +from .submit_string import get_submit_command @dataclass @@ -135,9 +135,10 @@ class ExecutorSettings(ExecutorSettingsBase): # Required: # Implementation of your executor class Executor(RemoteExecutor): - def __post_init__(self): + def __post_init__(self, test_mode: bool = False): # run check whether we are running in a SLURM job context self.warn_on_jobcontext() + self.test_mode = test_mode self.run_uuid = str(uuid.uuid4()) self.logger.info(f"SLURM run ID: {self.run_uuid}") self._fallback_account_arg = None @@ -225,31 +226,28 @@ def run_job(self, job: JobExecutorInterface): comment_str = f"rule_{job.name}" else: comment_str = f"rule_{job.name}_wildcards_{wildcard_str}" - call = ( - f"sbatch " - f"--parsable " - f"--job-name {self.run_uuid} " - f"--output '{slurm_logfile}' " - f"--export=ALL " - f"--comment '{comment_str}'" - ) + # check whether the 'slurm_extra' parameter is used correctly + # prior to putatively setting in the sbatch call + if job.resources.get("slurm_extra"): + self.check_slurm_extra(job) - if not self.workflow.executor_settings.no_account: - call += self.get_account_arg(job) + job_params = { + "run_uuid": self.run_uuid, + "slurm_logfile": slurm_logfile, + "comment_str": comment_str, + "account": self.get_account_arg(job), + "partition": self.get_partition_arg(job), + "workdir": self.workflow.workdir_init, + } - call += self.get_partition_arg(job) + call = get_submit_command(job, job_params) if self.workflow.executor_settings.requeue: call += " --requeue" call += set_gres_string(job) - if job.resources.get("clusters"): - call += f" --clusters {job.resources.clusters}" - - if job.resources.get("runtime"): - call += f" -t {job.resources.runtime}" - else: + if not job.resources.get("runtime"): self.logger.warning( "No wall time information given. This might or might not " "work on your cluster. " @@ -257,28 +255,12 @@ def run_job(self, job: JobExecutorInterface): "default via --default-resources." ) - if job.resources.get("constraint"): - call += f" -C '{job.resources.constraint}'" - if job.resources.get("mem_mb_per_cpu"): - call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" - elif job.resources.get("mem_mb"): - call += f" --mem {job.resources.mem_mb}" - else: + if not job.resources.get("mem_mb_per_cpu") and not job.resources.get("mem_mb"): self.logger.warning( "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is given " "- submitting without. This might or might not work on your cluster." ) - if job.resources.get("nodes", False): - call += f" --nodes={job.resources.get('nodes', 1)}" - - # fixes #40 - set ntasks regardless of mpi, because - # SLURM v22.05 will require it for all jobs - gpu_job = job.resources.get("gpu") or "gpu" in job.resources.get("gres", "") - if gpu_job: - call += f" --ntasks-per-gpu={job.resources.get('tasks', 1)}" - else: - call += f" --ntasks={job.resources.get('tasks', 1)}" # MPI job if job.resources.get("mpi", False): if not job.resources.get("tasks_per_node") and not job.resources.get( @@ -290,19 +272,8 @@ def run_job(self, job: JobExecutorInterface): "Probably not what you want." ) - # we need to set cpus-per-task OR cpus-per-gpu, the function - # will return a string with the corresponding value - call += f" {get_cpu_setting(job, gpu_job)}" - if job.resources.get("slurm_extra"): - self.check_slurm_extra(job) - call += f" {job.resources.slurm_extra}" - exec_job = self.format_job_exec(job) - # ensure that workdir is set correctly - # use short argument as this is the same in all slurm versions - # (see https://github.com/snakemake/snakemake/issues/2014) - call += f" -D {self.workflow.workdir_init}" # and finally the job to execute with all the snakemake parameters call += f' --wrap="{exec_job}"' diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py new file mode 100644 index 00000000..b7c1e160 --- /dev/null +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -0,0 +1,69 @@ +from snakemake_executor_plugin_slurm_jobstep import get_cpu_setting +from types import SimpleNamespace + + +def get_submit_command(job, params): + """ + Return the submit command for the job. + """ + # Convert params dict to a SimpleNamespace for attribute-style access + params = SimpleNamespace(**params) + + call = ( + f"sbatch " + f"--parsable " + f"--job-name {params.run_uuid} " + f'--output "{params.slurm_logfile}" ' + f"--export=ALL " + f'--comment "{params.comment_str}"' + ) + + # check whether an account is set + if hasattr(params, "account"): + call += f" --account={params.account}" + # check whether a partition is set + if hasattr(params, "partition"): + call += f" --partition={params.partition}" + + if job.resources.get("clusters"): + call += f" --clusters {job.resources.clusters}" + + if job.resources.get("runtime"): + call += f" -t {job.resources.runtime}" + + if job.resources.get("constraint") or isinstance( + job.resources.get("constraint"), str + ): + call += f" -C '{job.resources.get('constraint')}'" + + if job.resources.get("qos") or isinstance(job.resources.get("qos"), str): + call += f" --qos='{job.resources.qos}'" + + if job.resources.get("mem_mb_per_cpu"): + call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" + elif job.resources.get("mem_mb"): + call += f" --mem {job.resources.mem_mb}" + + if job.resources.get("nodes", False): + call += f" --nodes={job.resources.get('nodes', 1)}" + + # fixes #40 - set ntasks regardless of mpi, because + # SLURM v22.05 will require it for all jobs + gpu_job = job.resources.get("gpu") or "gpu" in job.resources.get("gres", "") + if gpu_job: + call += f" --ntasks-per-gpu={job.resources.get('tasks', 1)}" + else: + call += f" --ntasks={job.resources.get('tasks', 1)}" + + # we need to set cpus-per-task OR cpus-per-gpu, the function + # will return a string with the corresponding value + call += f" {get_cpu_setting(job, gpu_job)}" + if job.resources.get("slurm_extra"): + call += f" {job.resources.slurm_extra}" + + # ensure that workdir is set correctly + # use short argument as this is the same in all slurm versions + # (see https://github.com/snakemake/snakemake/issues/2014) + call += f" -D '{params.workdir}'" + + return call diff --git a/tests/tests.py b/tests/tests.py index 63a7dcf3..63e289bc 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,11 +1,12 @@ from typing import Optional import snakemake.common.tests from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.utils import set_gres_string +from snakemake_executor_plugin_slurm.submit_string import get_submit_command from snakemake_interface_common.exceptions import WorkflowError @@ -43,6 +44,10 @@ def _create_job(**resources): mock_job = MagicMock() mock_job.resources = mock_resources + mock_job.name = "test_job" + mock_job.wildcards = {} + mock_job.is_group.return_value = False + mock_job.jobid = 1 return mock_job return _create_job @@ -50,66 +55,354 @@ def _create_job(**resources): def test_no_gres_or_gpu(self, mock_job): """Test with no GPU or GRES resources specified.""" job = mock_job() + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == "" def test_valid_gres_simple(self, mock_job): """Test with valid GRES format (simple).""" job = mock_job(gres="gpu:1") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gres=gpu:1" def test_valid_gres_with_model(self, mock_job): """Test with valid GRES format including GPU model.""" job = mock_job(gres="gpu:tesla:2") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gres=gpu:tesla:2" def test_invalid_gres_format(self, mock_job): """Test with invalid GRES format.""" job = mock_job(gres="gpu") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) def test_invalid_gres_format_missing_count(self, mock_job): """Test with invalid GRES format (missing count).""" job = mock_job(gres="gpu:tesla:") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) def test_valid_gpu_number(self, mock_job): """Test with valid GPU number.""" job = mock_job(gpu="2") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gpus=2" def test_valid_gpu_with_name(self, mock_job): """Test with valid GPU name and number.""" job = mock_job(gpu="tesla:2") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gpus=tesla:2" def test_gpu_with_model(self, mock_job): """Test GPU with model specification.""" job = mock_job(gpu="2", gpu_model="tesla") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gpus=tesla:2" def test_invalid_gpu_model_format(self, mock_job): """Test with invalid GPU model format.""" job = mock_job(gpu="2", gpu_model="invalid:model") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + with pytest.raises(WorkflowError, match="Invalid GPU model format"): set_gres_string(job) def test_gpu_model_without_gpu(self, mock_job): """Test GPU model without GPU number.""" job = mock_job(gpu_model="tesla") - with pytest.raises( - WorkflowError, match="GPU model is set, but no GPU number is given" - ): - set_gres_string(job) + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # test whether the resource setting raises the correct error + with pytest.raises( + WorkflowError, match="GPU model is set, but no GPU number is given" + ): + set_gres_string(job) def test_both_gres_and_gpu_set(self, mock_job): """Test error case when both GRES and GPU are specified.""" job = mock_job(gres="gpu:1", gpu="2") - with pytest.raises( - WorkflowError, match="GRES and GPU are set. Please only set one of them." - ): - set_gres_string(job) + + # Patch subprocess.Popen to simulate job submission + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to simulate successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Ensure the error is raised when both GRES and GPU are set + with pytest.raises( + WorkflowError, match="GRES and GPU are set. Please only set one" + ): + set_gres_string(job) + + +class TestSLURMResources(TestWorkflows): + """ + Test workflows using job resources passed as part of the job configuration. + This test suite uses the `get_submit_command` function to generate the + sbatch command and validates the inclusion of resources. + """ + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable resources.""" + + def _create_job(**resources): + mock_resources = MagicMock() + # Configure get method to return values from resources dict + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + # Add direct attribute access for certain resources + for key, value in resources.items(): + setattr(mock_resources, key, value) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.name = "test_job" + mock_job.wildcards = {} + mock_job.is_group.return_value = False + mock_job.jobid = 1 + return mock_job + + return _create_job + + def test_constraint_resource(self, mock_job): + """ + Test that the constraint resource is correctly + added to the sbatch command. + """ + # Create a job with a constraint resource + job = mock_job(constraint="haswell") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "constraint": "haswell", + } + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + assert " -C 'haswell'" in get_submit_command(job, params) + + def test_qos_resource(self, mock_job): + """Test that the qos resource is correctly added to the sbatch command.""" + # Create a job with a qos resource + job = mock_job(qos="normal") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "qos": "normal", + } + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + assert " --qos='normal'" in get_submit_command(job, params) + + def test_both_constraint_and_qos(self, mock_job): + """Test that both constraint and qos resources can be used together.""" + # Create a job with both constraint and qos resources + job = mock_job(constraint="haswell", qos="high") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "constraint": "haswell", + "qos": "high", + } + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Assert both resources are correctly included + sbatch_command = get_submit_command(job, params) + assert " --qos='high'" in sbatch_command + assert " -C 'haswell'" in sbatch_command + + def test_no_resources(self, mock_job): + """ + Test that no constraint or qos flags are added + when resources are not specified. + """ + # Create a job without constraint or qos resources + job = mock_job() + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + } + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Assert neither resource is included + sbatch_command = get_submit_command(job, params) + assert "-C " not in sbatch_command + assert "--qos " not in sbatch_command + + def test_empty_constraint(self, mock_job): + """Test that an empty constraint is still included in the command.""" + # Create a job with an empty constraint + job = mock_job(constraint="") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "constraint": "", + } + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Assert the constraint is included (even if empty) + assert "-C ''" in get_submit_command(job, params) + + def test_empty_qos(self, mock_job): + """Test that an empty qos is still included in the command.""" + # Create a job with an empty qos + job = mock_job(qos="") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "qos": "", + } + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + # Assert the qoes is included (even if empty) + assert "--qos=''" in get_submit_command(job, params) class TestWildcardsWithSlashes(snakemake.common.tests.TestWorkflowsLocalStorageBase):