From 95304e51bb15b2064fb04343ed14722e36ff5dab Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Sat, 15 Mar 2025 15:10:03 +0100 Subject: [PATCH 01/28] feat: added new 'qos' resource --- snakemake_executor_plugin_slurm/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 56880697..a95580b5 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -259,6 +259,8 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("constraint"): call += f" -C '{job.resources.constraint}'" + if jog.resources.get("qos"): + call += f" --qos '{job.resources.qos}'" if job.resources.get("mem_mb_per_cpu"): call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" elif job.resources.get("mem_mb"): From 64aa03ee2c4fe19f2f3bbcd833503eecb609b67b Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Sat, 15 Mar 2025 15:38:41 +0100 Subject: [PATCH 02/28] fix: typo --- snakemake_executor_plugin_slurm/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index a95580b5..8f26d28a 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -259,7 +259,7 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("constraint"): call += f" -C '{job.resources.constraint}'" - if jog.resources.get("qos"): + if job.resources.get("qos"): call += f" --qos '{job.resources.qos}'" if job.resources.get("mem_mb_per_cpu"): call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" From 55716c66e7e23fb890c89b99b228079430c5ae8a Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Sat, 15 Mar 2025 15:52:26 +0100 Subject: [PATCH 03/28] feat: new tests for the 'constraint' and 'qos' resources --- tests/tests.py | 189 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 188 insertions(+), 1 deletion(-) diff --git a/tests/tests.py b/tests/tests.py index 63a7dcf3..f9aca644 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,7 +1,7 @@ from typing import Optional import snakemake.common.tests from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from snakemake_executor_plugin_slurm import ExecutorSettings @@ -145,3 +145,190 @@ def test_wildcard_slash_replacement(self): # Verify no slashes remain in the wildcard string assert "/" not in wildcard_str + + +class TestSLURMResources: + """Test cases for the constraint and qos resources in the Executor.run_job method.""" + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable resources.""" + + def _create_job(**resources): + mock_resources = MagicMock() + # Configure get method to return values from resources dict + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + # Add direct attribute access for certain resources + for key, value in resources.items(): + setattr(mock_resources, key, value) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.name = "test_job" + mock_job.wildcards = {} + mock_job.is_group.return_value = False + mock_job.jobid = 1 + return mock_job + + return _create_job + + @pytest.fixture + def mock_executor(self): + """Create a mock executor for testing the run_job method.""" + from snakemake_executor_plugin_slurm import Executor + + # Create a mock workflow + mock_workflow = MagicMock() + mock_settings = MagicMock() + mock_settings.requeue = False + mock_settings.no_account = True + mock_settings.logdir = None + mock_workflow.executor_settings = mock_settings + mock_workflow.workdir_init = "/test/workdir" + + # Create an executor with the mock workflow + executor = Executor(mock_workflow, None) + + # Mock some executor methods to avoid external calls + executor.get_account_arg = MagicMock(return_value="") + executor.get_partition_arg = MagicMock(return_value="") + executor.report_job_submission = MagicMock() + + # Return the mocked executor + return executor + + def test_constraint_resource(self, mock_job, mock_executor): + """Test that the constraint resource is correctly added to the sbatch command.""" + # Create a job with a constraint resource + job = mock_job(constraint="haswell") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Run the job + mock_executor.run_job(job) + + # Get the sbatch command from the call + call_args = mock_popen.call_args[0][0] + + # Assert the constraint is correctly included + assert "-C 'haswell'" in call_args + + def test_qos_resource(self, mock_job, mock_executor): + """Test that the qos resource is correctly added to the sbatch command.""" + # Create a job with a qos resource + job = mock_job(qos="normal") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Run the job + mock_executor.run_job(job) + + # Get the sbatch command from the call + call_args = mock_popen.call_args[0][0] + + # Assert the qos is correctly included + assert "--qos 'normal'" in call_args + + def test_both_constraint_and_qos(self, mock_job, mock_executor): + """Test that both constraint and qos resources can be used together.""" + # Create a job with both constraint and qos resources + job = mock_job(constraint="haswell", qos="high") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Run the job + mock_executor.run_job(job) + + # Get the sbatch command from the call + call_args = mock_popen.call_args[0][0] + + # Assert both resources are correctly included + assert "-C 'haswell'" in call_args + assert "--qos 'high'" in call_args + + def test_no_resources(self, mock_job, mock_executor): + """Test that no constraint or qos flags are added when resources are not specified.""" + # Create a job without constraint or qos resources + job = mock_job() + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Run the job + mock_executor.run_job(job) + + # Get the sbatch command from the call + call_args = mock_popen.call_args[0][0] + + # Assert neither resource is included + assert "-C " not in call_args + assert "--qos " not in call_args + + def test_empty_constraint(self, mock_job, mock_executor): + """Test that an empty constraint is still included in the command.""" + # Create a job with an empty constraint + job = mock_job(constraint="") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Run the job + mock_executor.run_job(job) + + # Get the sbatch command from the call + call_args = mock_popen.call_args[0][0] + + # Assert the constraint is included (even if empty) + assert "-C ''" in call_args + + def test_empty_qos(self, mock_job, mock_executor): + """Test that an empty qos is still included in the command.""" + # Create a job with an empty qos + job = mock_job(qos="") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Run the job + mock_executor.run_job(job) + + # Get the sbatch command from the call + call_args = mock_popen.call_args[0][0] + + # Assert the qos is included (even if empty) + assert "--qos ''" in call_args From 75227925fc59b4cbe52e5e44e1d68c03ec44eb7d Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Sat, 15 Mar 2025 15:55:52 +0100 Subject: [PATCH 04/28] fix: linting of tests.py --- tests/tests.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index f9aca644..0db14bc3 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -148,7 +148,10 @@ def test_wildcard_slash_replacement(self): class TestSLURMResources: - """Test cases for the constraint and qos resources in the Executor.run_job method.""" + """ + Test cases for the constraint and qos resources + in the Executor.run_job method. + """ @pytest.fixture def mock_job(self): @@ -200,7 +203,10 @@ def mock_executor(self): return executor def test_constraint_resource(self, mock_job, mock_executor): - """Test that the constraint resource is correctly added to the sbatch command.""" + """ + Test that the constraint resource is correctly + added to the sbatch command. + """ # Create a job with a constraint resource job = mock_job(constraint="haswell") @@ -267,7 +273,10 @@ def test_both_constraint_and_qos(self, mock_job, mock_executor): assert "--qos 'high'" in call_args def test_no_resources(self, mock_job, mock_executor): - """Test that no constraint or qos flags are added when resources are not specified.""" + """ + Test that no constraint or qos flags are added + when resources are not specified. + """ # Create a job without constraint or qos resources job = mock_job() From 0c74d04f0547ff77e07b3a8cfdcd686507b47bdf Mon Sep 17 00:00:00 2001 From: meesters Date: Mon, 17 Mar 2025 12:25:25 +0100 Subject: [PATCH 05/28] fix: trying workflow fix with pytest upgrade --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 502ee289..1faff1b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,9 @@ jobs: - name: Install poetry run: pip install poetry + - name: Always upgrade PyTest + run: pip install --upgrade pytest + - name: Determine dependencies run: poetry lock From 4ec357f8a6d8d8ea088346481f476d2c716c6807 Mon Sep 17 00:00:00 2001 From: meesters Date: Mon, 17 Mar 2025 14:41:23 +0100 Subject: [PATCH 06/28] fix: revoking last commit - did not help --- .github/workflows/ci.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1faff1b7..502ee289 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,9 +20,6 @@ jobs: - name: Install poetry run: pip install poetry - - name: Always upgrade PyTest - run: pip install --upgrade pytest - - name: Determine dependencies run: poetry lock From 9f2a8a6958d21461355bcb18e27ff3bbf2c4ce95 Mon Sep 17 00:00:00 2001 From: meesters Date: Mon, 17 Mar 2025 14:49:43 +0100 Subject: [PATCH 07/28] fix: attempt by reordering tests --- tests/tests.py | 70 +++++++++++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 0db14bc3..d01f0923 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -112,41 +112,6 @@ def test_both_gres_and_gpu_set(self, mock_job): set_gres_string(job) -class TestWildcardsWithSlashes(snakemake.common.tests.TestWorkflowsLocalStorageBase): - """ - Test handling of wildcards with slashes to ensure log directories are - correctly constructed. - """ - - __test__ = True - - def get_executor(self) -> str: - return "slurm" - - def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: - return ExecutorSettings( - logdir="test_logdir", init_seconds_before_status_checks=1 - ) - - def test_wildcard_slash_replacement(self): - """ - Test that slashes in wildcards are correctly replaced with - underscores in log directory paths. - """ - - # Just test the wildcard sanitization logic directly - wildcards = ["/leading_slash", "middle/slash", "trailing/"] - - # This is the actual logic from the Executor.run_job method - wildcard_str = "_".join(wildcards).replace("/", "_") if wildcards else "" - - # Assert that slashes are correctly replaced with underscores - assert wildcard_str == "_leading_slash_middle_slash_trailing_" - - # Verify no slashes remain in the wildcard string - assert "/" not in wildcard_str - - class TestSLURMResources: """ Test cases for the constraint and qos resources @@ -341,3 +306,38 @@ def test_empty_qos(self, mock_job, mock_executor): # Assert the qos is included (even if empty) assert "--qos ''" in call_args + + +class TestWildcardsWithSlashes(snakemake.common.tests.TestWorkflowsLocalStorageBase): + """ + Test handling of wildcards with slashes to ensure log directories are + correctly constructed. + """ + + __test__ = True + + def get_executor(self) -> str: + return "slurm" + + def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: + return ExecutorSettings( + logdir="test_logdir", init_seconds_before_status_checks=1 + ) + + def test_wildcard_slash_replacement(self): + """ + Test that slashes in wildcards are correctly replaced with + underscores in log directory paths. + """ + + # Just test the wildcard sanitization logic directly + wildcards = ["/leading_slash", "middle/slash", "trailing/"] + + # This is the actual logic from the Executor.run_job method + wildcard_str = "_".join(wildcards).replace("/", "_") if wildcards else "" + + # Assert that slashes are correctly replaced with underscores + assert wildcard_str == "_leading_slash_middle_slash_trailing_" + + # Verify no slashes remain in the wildcard string + assert "/" not in wildcard_str From 1aca8750be9cc09c833850d75d270c98967fe1ad Mon Sep 17 00:00:00 2001 From: meesters Date: Mon, 17 Mar 2025 14:56:04 +0100 Subject: [PATCH 08/28] fix: syntax fix --- .github/workflows/announce-release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/announce-release.yml b/.github/workflows/announce-release.yml index 15238702..50f34b82 100644 --- a/.github/workflows/announce-release.yml +++ b/.github/workflows/announce-release.yml @@ -12,7 +12,7 @@ permissions: jobs: post_to_mastodon: - if: ${{ contains(github.event.head_commit.message, 'chore(main): release') }} + if: "${{ contains(github.event.head_commit.message, 'chore(main): release') }}" runs-on: ubuntu-latest steps: - name: Post to Mastodon From 404f38b82acf9c69b9d438335080c0e03003a530 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 09:41:06 +0100 Subject: [PATCH 09/28] fix: trying fix fixture --- tests/tests.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/tests.py b/tests/tests.py index d01f0923..0fe8ddea 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -2,6 +2,7 @@ import snakemake.common.tests from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch +import sys import pytest from snakemake_executor_plugin_slurm import ExecutorSettings @@ -103,6 +104,14 @@ def test_gpu_model_without_gpu(self, mock_job): ): set_gres_string(job) + @pytest.fixture(autouse=True) + def patch_sys_streams(self): + """Patch sys.stdout and sys.stderr to prevent file descriptor issues.""" + with patch("sys.stdout", new_callable=lambda: sys.stdout), patch( + "sys.stderr", new_callable=lambda: sys.stderr + ): + yield + def test_both_gres_and_gpu_set(self, mock_job): """Test error case when both GRES and GPU are specified.""" job = mock_job(gres="gpu:1", gpu="2") From 9abdaddc3da12befc6deaea9dc745c22634ea79a Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 10:38:51 +0100 Subject: [PATCH 10/28] fix: trying fix fixture - now with io streams - another try --- tests/tests.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 0fe8ddea..19b10f3a 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -104,11 +104,13 @@ def test_gpu_model_without_gpu(self, mock_job): ): set_gres_string(job) - @pytest.fixture(autouse=True) + @pytest.fixture(scope="class", autouse=True) def patch_sys_streams(self): """Patch sys.stdout and sys.stderr to prevent file descriptor issues.""" - with patch("sys.stdout", new_callable=lambda: sys.stdout), patch( - "sys.stderr", new_callable=lambda: sys.stderr + import io + + with patch("sys.stdout", new_callable=lambda: io.StringIO()), patch( + "sys.stderr", new_callable=lambda: io.StringIO() ): yield From 076eb7b76786dfa409f32b368e2068975fe4f20c Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 13:09:53 +0100 Subject: [PATCH 11/28] fix: revoked last two commits --- tests/tests.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 19b10f3a..168bc425 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -104,16 +104,6 @@ def test_gpu_model_without_gpu(self, mock_job): ): set_gres_string(job) - @pytest.fixture(scope="class", autouse=True) - def patch_sys_streams(self): - """Patch sys.stdout and sys.stderr to prevent file descriptor issues.""" - import io - - with patch("sys.stdout", new_callable=lambda: io.StringIO()), patch( - "sys.stderr", new_callable=lambda: io.StringIO() - ): - yield - def test_both_gres_and_gpu_set(self, mock_job): """Test error case when both GRES and GPU are specified.""" job = mock_job(gres="gpu:1", gpu="2") From 22d70de920826226fc3a6673c9810f1f8bbdc958 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 13:14:08 +0100 Subject: [PATCH 12/28] fix: unused import --- tests/tests.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tests.py b/tests/tests.py index 168bc425..d01f0923 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -2,7 +2,6 @@ import snakemake.common.tests from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch -import sys import pytest from snakemake_executor_plugin_slurm import ExecutorSettings From 34dff5dce00cb0c1f2e858e54752cabfc6677e76 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 15:16:19 +0100 Subject: [PATCH 13/28] fix: trying updated pytest --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ff9b8905..e4767327 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ throttler = "^1.2.2" black = "^23.7.0" flake8 = "^6.1.0" coverage = "^7.3.1" -pytest = "^7.4.2" +pytest = "^8.3.5" snakemake = "^8.20.0" [tool.coverage.run] From b5e951afda684be3f9e6a61bd224dc8ef66cc2c8 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 16:18:40 +0100 Subject: [PATCH 14/28] fix: forcing different stream --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 502ee289..5a9947b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,7 +98,7 @@ jobs: poetry install - name: Run pytest - run: poetry run coverage run -m pytest tests/tests.py -sv + run: poetry run coverage run --tb=short --disable-warnings -m pytest tests/tests.py -sv - name: Run Coverage run: poetry run coverage report -m From 7257642feebc1ff9ab2928ae239daae112ce7f74 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 17:10:24 +0100 Subject: [PATCH 15/28] fix: syntax --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5a9947b8..d60eea60 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,7 +98,7 @@ jobs: poetry install - name: Run pytest - run: poetry run coverage run --tb=short --disable-warnings -m pytest tests/tests.py -sv + run: poetry run coverage run -m pytest tests/tests.py -sv --tb=short --disable-warnings - name: Run Coverage run: poetry run coverage report -m From 34990d97f68afa3a6baf7a9e843e12de9820c1e6 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 17:46:39 +0100 Subject: [PATCH 16/28] trial: outcommenting offending function --- tests/tests.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index d01f0923..fa489f43 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -103,13 +103,13 @@ def test_gpu_model_without_gpu(self, mock_job): ): set_gres_string(job) - def test_both_gres_and_gpu_set(self, mock_job): - """Test error case when both GRES and GPU are specified.""" - job = mock_job(gres="gpu:1", gpu="2") - with pytest.raises( - WorkflowError, match="GRES and GPU are set. Please only set one of them." - ): - set_gres_string(job) +# def test_both_gres_and_gpu_set(self, mock_job): +# """Test error case when both GRES and GPU are specified.""" +# job = mock_job(gres="gpu:1", gpu="2") +# with pytest.raises( +# WorkflowError, match="GRES and GPU are set. Please only set one of them." +# ): +# set_gres_string(job) class TestSLURMResources: From 226f3c6d43a22f1ebb939fdf6ce1d9de9c05798c Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 19:45:45 +0100 Subject: [PATCH 17/28] fix: trying different mock setup --- tests/tests.py | 65 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index fa489f43..a71878ef 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -27,7 +27,6 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: class TestGresString: """Test cases for the set_gres_string function.""" - @pytest.fixture def mock_job(self): """Create a mock job with configurable resources.""" @@ -43,59 +42,89 @@ def _create_job(**resources): mock_job = MagicMock() mock_job.resources = mock_resources + mock_job.name = "test_job" + mock_job.wildcards = {} + mock_job.is_group.return_value = False + mock_job.jobid = 1 return mock_job return _create_job - def test_no_gres_or_gpu(self, mock_job): + + @pytest.fixture + def mock_executor(self): + """Create a mock executor for testing the run_job method.""" + from snakemake_executor_plugin_slurm import Executor + + # Create a mock workflow + mock_workflow = MagicMock() + mock_settings = MagicMock() + mock_settings.requeue = False + mock_settings.no_account = True + mock_settings.logdir = None + mock_workflow.executor_settings = mock_settings + mock_workflow.workdir_init = "/test/workdir" + + # Create an executor with the mock workflow + executor = Executor(mock_workflow, None) + + # Mock some executor methods to avoid external calls + executor.get_account_arg = MagicMock(return_value="") + executor.get_partition_arg = MagicMock(return_value="") + executor.report_job_submission = MagicMock() + + # Return the mocked executor + return executor + + def test_no_gres_or_gpu(self, mock_job, mock_executor): """Test with no GPU or GRES resources specified.""" job = mock_job() assert set_gres_string(job) == "" - def test_valid_gres_simple(self, mock_job): + def test_valid_gres_simple(self, mock_job, mock_executor): """Test with valid GRES format (simple).""" job = mock_job(gres="gpu:1") assert set_gres_string(job) == " --gres=gpu:1" - def test_valid_gres_with_model(self, mock_job): + def test_valid_gres_with_model(self, mock_job, mock_executor): """Test with valid GRES format including GPU model.""" job = mock_job(gres="gpu:tesla:2") assert set_gres_string(job) == " --gres=gpu:tesla:2" - def test_invalid_gres_format(self, mock_job): + def test_invalid_gres_format(self, mock_job, mock_executor): """Test with invalid GRES format.""" job = mock_job(gres="gpu") with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) - def test_invalid_gres_format_missing_count(self, mock_job): + def test_invalid_gres_format_missing_count(self, mock_job, mock_executor): """Test with invalid GRES format (missing count).""" job = mock_job(gres="gpu:tesla:") with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) - def test_valid_gpu_number(self, mock_job): + def test_valid_gpu_number(self, mock_job, mock_executor): """Test with valid GPU number.""" job = mock_job(gpu="2") assert set_gres_string(job) == " --gpus=2" - def test_valid_gpu_with_name(self, mock_job): + def test_valid_gpu_with_name(self, mock_job, mock_executor): """Test with valid GPU name and number.""" job = mock_job(gpu="tesla:2") assert set_gres_string(job) == " --gpus=tesla:2" - def test_gpu_with_model(self, mock_job): + def test_gpu_with_model(self, mock_job, mock_executor): """Test GPU with model specification.""" job = mock_job(gpu="2", gpu_model="tesla") assert set_gres_string(job) == " --gpus=tesla:2" - def test_invalid_gpu_model_format(self, mock_job): + def test_invalid_gpu_model_format(self, mock_job, mock_executor): """Test with invalid GPU model format.""" job = mock_job(gpu="2", gpu_model="invalid:model") with pytest.raises(WorkflowError, match="Invalid GPU model format"): set_gres_string(job) - def test_gpu_model_without_gpu(self, mock_job): + def test_gpu_model_without_gpu(self, mock_job, mock_executor): """Test GPU model without GPU number.""" job = mock_job(gpu_model="tesla") with pytest.raises( @@ -103,13 +132,13 @@ def test_gpu_model_without_gpu(self, mock_job): ): set_gres_string(job) -# def test_both_gres_and_gpu_set(self, mock_job): -# """Test error case when both GRES and GPU are specified.""" -# job = mock_job(gres="gpu:1", gpu="2") -# with pytest.raises( -# WorkflowError, match="GRES and GPU are set. Please only set one of them." -# ): -# set_gres_string(job) + def test_both_gres_and_gpu_set(self, mock_job, mock_executor): + """Test error case when both GRES and GPU are specified.""" + job = mock_job(gres="gpu:1", gpu="2") + with pytest.raises( + WorkflowError, match="GRES and GPU are set. Please only set one of them." + ): + set_gres_string(job) class TestSLURMResources: From 24d2e2df1b99113af620bfefa1b4af0fe2a9eb7c Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 20:16:03 +0100 Subject: [PATCH 18/28] fix: added missing fixture --- tests/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests.py b/tests/tests.py index a71878ef..3113f656 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -27,6 +27,7 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: class TestGresString: """Test cases for the set_gres_string function.""" + @pytest.fixture def mock_job(self): """Create a mock job with configurable resources.""" @@ -50,7 +51,6 @@ def _create_job(**resources): return _create_job - @pytest.fixture def mock_executor(self): """Create a mock executor for testing the run_job method.""" From cb1d1788708859b106203589f5aa332738229c2f Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 21:00:01 +0100 Subject: [PATCH 19/28] fix: trying process mock --- tests/tests.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/tests.py b/tests/tests.py index 3113f656..18de3874 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -73,6 +73,15 @@ def mock_executor(self): executor.get_partition_arg = MagicMock(return_value="") executor.report_job_submission = MagicMock() + # Ensure subprocess mocks stdout and stderr + with patch("subprocess.Popen") as mock_popen: + process_mock = MagicMock() + process_mock.communicate.return_value = (b"123", b"") # Ensure byte output + process_mock.returncode = 0 + process_mock.stdout = MagicMock() # Ensure stdout is defined + process_mock.stderr = MagicMock() # Ensure stderr is defined + mock_popen.return_value = process_mock + # Return the mocked executor return executor From 08e702b7f572801ebae21f024b38bb65434bea73 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 18 Mar 2025 21:45:04 +0100 Subject: [PATCH 20/28] fix: everywhere - a mock popen --- tests/tests.py | 124 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 107 insertions(+), 17 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 18de3874..be5cf0a5 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -73,81 +73,171 @@ def mock_executor(self): executor.get_partition_arg = MagicMock(return_value="") executor.report_job_submission = MagicMock() - # Ensure subprocess mocks stdout and stderr - with patch("subprocess.Popen") as mock_popen: - process_mock = MagicMock() - process_mock.communicate.return_value = (b"123", b"") # Ensure byte output - process_mock.returncode = 0 - process_mock.stdout = MagicMock() # Ensure stdout is defined - process_mock.stderr = MagicMock() # Ensure stderr is defined - mock_popen.return_value = process_mock - # Return the mocked executor return executor def test_no_gres_or_gpu(self, mock_job, mock_executor): """Test with no GPU or GRES resources specified.""" job = mock_job() + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == "" def test_valid_gres_simple(self, mock_job, mock_executor): """Test with valid GRES format (simple).""" job = mock_job(gres="gpu:1") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gres=gpu:1" def test_valid_gres_with_model(self, mock_job, mock_executor): """Test with valid GRES format including GPU model.""" job = mock_job(gres="gpu:tesla:2") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gres=gpu:tesla:2" def test_invalid_gres_format(self, mock_job, mock_executor): """Test with invalid GRES format.""" job = mock_job(gres="gpu") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) def test_invalid_gres_format_missing_count(self, mock_job, mock_executor): """Test with invalid GRES format (missing count).""" job = mock_job(gres="gpu:tesla:") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) def test_valid_gpu_number(self, mock_job, mock_executor): """Test with valid GPU number.""" job = mock_job(gpu="2") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gpus=2" def test_valid_gpu_with_name(self, mock_job, mock_executor): """Test with valid GPU name and number.""" job = mock_job(gpu="tesla:2") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gpus=tesla:2" def test_gpu_with_model(self, mock_job, mock_executor): """Test GPU with model specification.""" job = mock_job(gpu="2", gpu_model="tesla") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + assert set_gres_string(job) == " --gpus=tesla:2" def test_invalid_gpu_model_format(self, mock_job, mock_executor): """Test with invalid GPU model format.""" job = mock_job(gpu="2", gpu_model="invalid:model") + + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + with pytest.raises(WorkflowError, match="Invalid GPU model format"): set_gres_string(job) def test_gpu_model_without_gpu(self, mock_job, mock_executor): """Test GPU model without GPU number.""" job = mock_job(gpu_model="tesla") - with pytest.raises( - WorkflowError, match="GPU model is set, but no GPU number is given" - ): - set_gres_string(job) + # Patch subprocess.Popen to capture the sbatch command + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to return successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # test whether the resource setting raises the correct error + with pytest.raises( + WorkflowError, match="GPU model is set, but no GPU number is given" + ): + set_gres_string(job) def test_both_gres_and_gpu_set(self, mock_job, mock_executor): """Test error case when both GRES and GPU are specified.""" job = mock_job(gres="gpu:1", gpu="2") - with pytest.raises( - WorkflowError, match="GRES and GPU are set. Please only set one of them." - ): - set_gres_string(job) + + # Patch subprocess.Popen to simulate job submission + with patch("subprocess.Popen") as mock_popen: + # Configure the mock to simulate successful submission + process_mock = MagicMock() + process_mock.communicate.return_value = ("123", "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + # Ensure the error is raised when both GRES and GPU are set + with pytest.raises( + WorkflowError, match="GRES and GPU are set. Please only set one" + ): + set_gres_string(job) class TestSLURMResources: From 06a27c9af7cc584f88f5280c980f83fe84bec676 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 28 Mar 2025 14:52:23 +0100 Subject: [PATCH 21/28] refactor: moved submit string creation into own file to facilitate testing and future tests --- snakemake_executor_plugin_slurm/__init__.py | 72 ++---- .../submit_string.py | 68 ++++++ tests/tests.py | 210 ++++++++---------- 3 files changed, 179 insertions(+), 171 deletions(-) create mode 100644 snakemake_executor_plugin_slurm/submit_string.py diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 8f26d28a..13ae04ef 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -26,10 +26,9 @@ JobExecutorInterface, ) from snakemake_interface_common.exceptions import WorkflowError -from snakemake_executor_plugin_slurm_jobstep import get_cpu_setting from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string - +from .submit_string import get_submit_command @dataclass class ExecutorSettings(ExecutorSettingsBase): @@ -135,9 +134,10 @@ class ExecutorSettings(ExecutorSettingsBase): # Required: # Implementation of your executor class Executor(RemoteExecutor): - def __post_init__(self): + def __post_init__(self, test_mode: bool = False): # run check whether we are running in a SLURM job context self.warn_on_jobcontext() + self.test_mode = test_mode self.run_uuid = str(uuid.uuid4()) self.logger.info(f"SLURM run ID: {self.run_uuid}") self._fallback_account_arg = None @@ -225,31 +225,28 @@ def run_job(self, job: JobExecutorInterface): comment_str = f"rule_{job.name}" else: comment_str = f"rule_{job.name}_wildcards_{wildcard_str}" - call = ( - f"sbatch " - f"--parsable " - f"--job-name {self.run_uuid} " - f"--output '{slurm_logfile}' " - f"--export=ALL " - f"--comment '{comment_str}'" - ) - - if not self.workflow.executor_settings.no_account: - call += self.get_account_arg(job) + # check whether the 'slurm_extra' parameter is used correctly + # prior to putatively setting in the sbatch call + if job.resources.get("slurm_extra"): + self.check_slurm_extra(job) - call += self.get_partition_arg(job) + job_params = { + "run_uuid": self.run_uuid, + "slurm_logfile": slurm_logfile, + "comment_str": comment_str, + "account": self.get_account_arg(job), + "partition": self.get_partition_arg(job), + "workdir": self.workflow.workdir_init, + } + + call = get_submit_command(job, job_params) if self.workflow.executor_settings.requeue: call += " --requeue" call += set_gres_string(job) - if job.resources.get("clusters"): - call += f" --clusters {job.resources.clusters}" - - if job.resources.get("runtime"): - call += f" -t {job.resources.runtime}" - else: + if not job.resources.get("runtime"): self.logger.warning( "No wall time information given. This might or might not " "work on your cluster. " @@ -257,30 +254,12 @@ def run_job(self, job: JobExecutorInterface): "default via --default-resources." ) - if job.resources.get("constraint"): - call += f" -C '{job.resources.constraint}'" - if job.resources.get("qos"): - call += f" --qos '{job.resources.qos}'" - if job.resources.get("mem_mb_per_cpu"): - call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" - elif job.resources.get("mem_mb"): - call += f" --mem {job.resources.mem_mb}" - else: + if not job.resources.get("mem_mb_per_cpu") and not job.resources.get("mem_mb"): self.logger.warning( "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is given " "- submitting without. This might or might not work on your cluster." ) - - if job.resources.get("nodes", False): - call += f" --nodes={job.resources.get('nodes', 1)}" - - # fixes #40 - set ntasks regardless of mpi, because - # SLURM v22.05 will require it for all jobs - gpu_job = job.resources.get("gpu") or "gpu" in job.resources.get("gres", "") - if gpu_job: - call += f" --ntasks-per-gpu={job.resources.get('tasks', 1)}" - else: - call += f" --ntasks={job.resources.get('tasks', 1)}" + # MPI job if job.resources.get("mpi", False): if not job.resources.get("tasks_per_node") and not job.resources.get( @@ -292,19 +271,8 @@ def run_job(self, job: JobExecutorInterface): "Probably not what you want." ) - # we need to set cpus-per-task OR cpus-per-gpu, the function - # will return a string with the corresponding value - call += f" {get_cpu_setting(job, gpu_job)}" - if job.resources.get("slurm_extra"): - self.check_slurm_extra(job) - call += f" {job.resources.slurm_extra}" - exec_job = self.format_job_exec(job) - # ensure that workdir is set correctly - # use short argument as this is the same in all slurm versions - # (see https://github.com/snakemake/snakemake/issues/2014) - call += f" -D {self.workflow.workdir_init}" # and finally the job to execute with all the snakemake parameters call += f' --wrap="{exec_job}"' diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py new file mode 100644 index 00000000..0cfe963d --- /dev/null +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -0,0 +1,68 @@ +from snakemake_executor_plugin_slurm_jobstep import get_cpu_setting + +def get_submit_command(job, params): + """ + Return the submit command for the job. + """ + call = ( + f"sbatch " + f"--parsable " + f"--job-name {params['run_uuid']} " + f"--output \"{params['slurm_logfile']}\" " + f"--export=ALL " + f"--comment \"{params['comment_str']}\"" + ) + + # check whether an account is set + if params.get("account"): + call += f" {params['account']} " + # check whether a partition is set + if params.get("partition"): + call += f" {params['partition']}" + + if job.resources.get("clusters"): + call += f" --clusters {job.resources.clusters}" + + if job.resources.get("runtime"): + call += f" -t {job.resources.runtime}" + + if job.resources.get("clusters"): + call += f" --clusters {job.resources.clusters}" + + if job.resources.get("constraint") or isinstance( + job.resources.get("constraint"), str + ): + call += f" -C \'{job.resources.get('constraint')}\'" + + if job.resources.get("qos") or isinstance( + job.resources.get("qos"), str + ): + call += f" --qos=\'{job.resources.get('qos')}\'" + if job.resources.get("mem_mb_per_cpu"): + call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" + elif job.resources.get("mem_mb"): + call += f" --mem {job.resources.mem_mb}" + + if job.resources.get("nodes", False): + call += f" --nodes={job.resources.get('nodes', 1)}" + + # fixes #40 - set ntasks regardless of mpi, because + # SLURM v22.05 will require it for all jobs + gpu_job = job.resources.get("gpu") or "gpu" in job.resources.get("gres", "") + if gpu_job: + call += f" --ntasks-per-gpu={job.resources.get('tasks', 1)}" + else: + call += f" --ntasks={job.resources.get('tasks', 1)}" + + # we need to set cpus-per-task OR cpus-per-gpu, the function + # will return a string with the corresponding value + call += f" {get_cpu_setting(job, gpu_job)}" + if job.resources.get("slurm_extra"): + call += f" {job.resources.slurm_extra}" + + # ensure that workdir is set correctly + # use short argument as this is the same in all slurm versions + # (see https://github.com/snakemake/snakemake/issues/2014) + call += f" -D \'{params['workdir']}\'" + + return call diff --git a/tests/tests.py b/tests/tests.py index be5cf0a5..f1fbd62e 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -6,6 +6,7 @@ from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.utils import set_gres_string +from snakemake_executor_plugin_slurm.submit_string import get_submit_command from snakemake_interface_common.exceptions import WorkflowError @@ -51,32 +52,7 @@ def _create_job(**resources): return _create_job - @pytest.fixture - def mock_executor(self): - """Create a mock executor for testing the run_job method.""" - from snakemake_executor_plugin_slurm import Executor - - # Create a mock workflow - mock_workflow = MagicMock() - mock_settings = MagicMock() - mock_settings.requeue = False - mock_settings.no_account = True - mock_settings.logdir = None - mock_workflow.executor_settings = mock_settings - mock_workflow.workdir_init = "/test/workdir" - - # Create an executor with the mock workflow - executor = Executor(mock_workflow, None) - - # Mock some executor methods to avoid external calls - executor.get_account_arg = MagicMock(return_value="") - executor.get_partition_arg = MagicMock(return_value="") - executor.report_job_submission = MagicMock() - - # Return the mocked executor - return executor - - def test_no_gres_or_gpu(self, mock_job, mock_executor): + def test_no_gres_or_gpu(self, mock_job): """Test with no GPU or GRES resources specified.""" job = mock_job() @@ -90,7 +66,7 @@ def test_no_gres_or_gpu(self, mock_job, mock_executor): assert set_gres_string(job) == "" - def test_valid_gres_simple(self, mock_job, mock_executor): + def test_valid_gres_simple(self, mock_job): """Test with valid GRES format (simple).""" job = mock_job(gres="gpu:1") @@ -104,7 +80,7 @@ def test_valid_gres_simple(self, mock_job, mock_executor): assert set_gres_string(job) == " --gres=gpu:1" - def test_valid_gres_with_model(self, mock_job, mock_executor): + def test_valid_gres_with_model(self, mock_job): """Test with valid GRES format including GPU model.""" job = mock_job(gres="gpu:tesla:2") @@ -118,7 +94,7 @@ def test_valid_gres_with_model(self, mock_job, mock_executor): assert set_gres_string(job) == " --gres=gpu:tesla:2" - def test_invalid_gres_format(self, mock_job, mock_executor): + def test_invalid_gres_format(self, mock_job): """Test with invalid GRES format.""" job = mock_job(gres="gpu") @@ -132,7 +108,7 @@ def test_invalid_gres_format(self, mock_job, mock_executor): with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) - def test_invalid_gres_format_missing_count(self, mock_job, mock_executor): + def test_invalid_gres_format_missing_count(self, mock_job): """Test with invalid GRES format (missing count).""" job = mock_job(gres="gpu:tesla:") @@ -147,7 +123,7 @@ def test_invalid_gres_format_missing_count(self, mock_job, mock_executor): with pytest.raises(WorkflowError, match="Invalid GRES format"): set_gres_string(job) - def test_valid_gpu_number(self, mock_job, mock_executor): + def test_valid_gpu_number(self, mock_job): """Test with valid GPU number.""" job = mock_job(gpu="2") @@ -161,7 +137,7 @@ def test_valid_gpu_number(self, mock_job, mock_executor): assert set_gres_string(job) == " --gpus=2" - def test_valid_gpu_with_name(self, mock_job, mock_executor): + def test_valid_gpu_with_name(self, mock_job): """Test with valid GPU name and number.""" job = mock_job(gpu="tesla:2") @@ -175,7 +151,7 @@ def test_valid_gpu_with_name(self, mock_job, mock_executor): assert set_gres_string(job) == " --gpus=tesla:2" - def test_gpu_with_model(self, mock_job, mock_executor): + def test_gpu_with_model(self, mock_job): """Test GPU with model specification.""" job = mock_job(gpu="2", gpu_model="tesla") @@ -189,7 +165,7 @@ def test_gpu_with_model(self, mock_job, mock_executor): assert set_gres_string(job) == " --gpus=tesla:2" - def test_invalid_gpu_model_format(self, mock_job, mock_executor): + def test_invalid_gpu_model_format(self, mock_job): """Test with invalid GPU model format.""" job = mock_job(gpu="2", gpu_model="invalid:model") @@ -204,7 +180,7 @@ def test_invalid_gpu_model_format(self, mock_job, mock_executor): with pytest.raises(WorkflowError, match="Invalid GPU model format"): set_gres_string(job) - def test_gpu_model_without_gpu(self, mock_job, mock_executor): + def test_gpu_model_without_gpu(self, mock_job): """Test GPU model without GPU number.""" job = mock_job(gpu_model="tesla") # Patch subprocess.Popen to capture the sbatch command @@ -221,7 +197,7 @@ def test_gpu_model_without_gpu(self, mock_job, mock_executor): ): set_gres_string(job) - def test_both_gres_and_gpu_set(self, mock_job, mock_executor): + def test_both_gres_and_gpu_set(self, mock_job): """Test error case when both GRES and GPU are specified.""" job = mock_job(gres="gpu:1", gpu="2") @@ -239,11 +215,11 @@ def test_both_gres_and_gpu_set(self, mock_job, mock_executor): ): set_gres_string(job) - -class TestSLURMResources: +class TestSLURMResources(TestWorkflows): """ - Test cases for the constraint and qos resources - in the Executor.run_job method. + Test workflows using job resources passed as part of the job configuration. + This test suite uses the `get_submit_command` function to generate the + sbatch command and validates the inclusion of resources. """ @pytest.fixture @@ -270,38 +246,22 @@ def _create_job(**resources): return _create_job - @pytest.fixture - def mock_executor(self): - """Create a mock executor for testing the run_job method.""" - from snakemake_executor_plugin_slurm import Executor - - # Create a mock workflow - mock_workflow = MagicMock() - mock_settings = MagicMock() - mock_settings.requeue = False - mock_settings.no_account = True - mock_settings.logdir = None - mock_workflow.executor_settings = mock_settings - mock_workflow.workdir_init = "/test/workdir" - - # Create an executor with the mock workflow - executor = Executor(mock_workflow, None) - - # Mock some executor methods to avoid external calls - executor.get_account_arg = MagicMock(return_value="") - executor.get_partition_arg = MagicMock(return_value="") - executor.report_job_submission = MagicMock() - - # Return the mocked executor - return executor - - def test_constraint_resource(self, mock_job, mock_executor): + def test_constraint_resource(self, mock_job): """ Test that the constraint resource is correctly added to the sbatch command. """ # Create a job with a constraint resource job = mock_job(constraint="haswell") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "constraint": "haswell", + } # Patch subprocess.Popen to capture the sbatch command with patch("subprocess.Popen") as mock_popen: @@ -310,20 +270,23 @@ def test_constraint_resource(self, mock_job, mock_executor): process_mock.communicate.return_value = ("123", "") process_mock.returncode = 0 mock_popen.return_value = process_mock + + assert " -C 'haswell'" in get_submit_command(job, params) + - # Run the job - mock_executor.run_job(job) - - # Get the sbatch command from the call - call_args = mock_popen.call_args[0][0] - - # Assert the constraint is correctly included - assert "-C 'haswell'" in call_args - - def test_qos_resource(self, mock_job, mock_executor): + def test_qos_resource(self, mock_job): """Test that the qos resource is correctly added to the sbatch command.""" # Create a job with a qos resource job = mock_job(qos="normal") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "qos": "normal", + } # Patch subprocess.Popen to capture the sbatch command with patch("subprocess.Popen") as mock_popen: @@ -333,19 +296,23 @@ def test_qos_resource(self, mock_job, mock_executor): process_mock.returncode = 0 mock_popen.return_value = process_mock - # Run the job - mock_executor.run_job(job) + assert " --qos='normal'" in get_submit_command(job, params) - # Get the sbatch command from the call - call_args = mock_popen.call_args[0][0] - # Assert the qos is correctly included - assert "--qos 'normal'" in call_args - - def test_both_constraint_and_qos(self, mock_job, mock_executor): + def test_both_constraint_and_qos(self, mock_job): """Test that both constraint and qos resources can be used together.""" # Create a job with both constraint and qos resources job = mock_job(constraint="haswell", qos="high") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "constraint": "haswell", + "qos": "high", + } # Patch subprocess.Popen to capture the sbatch command with patch("subprocess.Popen") as mock_popen: @@ -355,23 +322,27 @@ def test_both_constraint_and_qos(self, mock_job, mock_executor): process_mock.returncode = 0 mock_popen.return_value = process_mock - # Run the job - mock_executor.run_job(job) - - # Get the sbatch command from the call - call_args = mock_popen.call_args[0][0] - # Assert both resources are correctly included - assert "-C 'haswell'" in call_args - assert "--qos 'high'" in call_args + sbatch_command = get_submit_command(job, params) + assert " --qos='high'" in sbatch_command + assert " -C 'haswell'" in sbatch_command + - def test_no_resources(self, mock_job, mock_executor): + def test_no_resources(self, mock_job): """ Test that no constraint or qos flags are added when resources are not specified. """ # Create a job without constraint or qos resources job = mock_job() + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + } # Patch subprocess.Popen to capture the sbatch command with patch("subprocess.Popen") as mock_popen: @@ -381,20 +352,24 @@ def test_no_resources(self, mock_job, mock_executor): process_mock.returncode = 0 mock_popen.return_value = process_mock - # Run the job - mock_executor.run_job(job) - - # Get the sbatch command from the call - call_args = mock_popen.call_args[0][0] - # Assert neither resource is included - assert "-C " not in call_args - assert "--qos " not in call_args + sbatch_command = get_submit_command(job, params) + assert "-C " not in sbatch_command + assert "--qos " not in sbatch_command - def test_empty_constraint(self, mock_job, mock_executor): + def test_empty_constraint(self, mock_job): """Test that an empty constraint is still included in the command.""" # Create a job with an empty constraint job = mock_job(constraint="") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "constraint": "", + } # Patch subprocess.Popen to capture the sbatch command with patch("subprocess.Popen") as mock_popen: @@ -404,19 +379,22 @@ def test_empty_constraint(self, mock_job, mock_executor): process_mock.returncode = 0 mock_popen.return_value = process_mock - # Run the job - mock_executor.run_job(job) - - # Get the sbatch command from the call - call_args = mock_popen.call_args[0][0] - # Assert the constraint is included (even if empty) - assert "-C ''" in call_args + assert "-C ''" in get_submit_command(job, params) - def test_empty_qos(self, mock_job, mock_executor): + def test_empty_qos(self, mock_job): """Test that an empty qos is still included in the command.""" # Create a job with an empty qos job = mock_job(qos="") + params = { + "run_uuid": "test_run", + "slurm_logfile": "test_logfile", + "comment_str": "test_comment", + "account": None, + "partition": None, + "workdir": ".", + "qos": "", + } # Patch subprocess.Popen to capture the sbatch command with patch("subprocess.Popen") as mock_popen: @@ -425,15 +403,9 @@ def test_empty_qos(self, mock_job, mock_executor): process_mock.communicate.return_value = ("123", "") process_mock.returncode = 0 mock_popen.return_value = process_mock - - # Run the job - mock_executor.run_job(job) - - # Get the sbatch command from the call - call_args = mock_popen.call_args[0][0] - - # Assert the qos is included (even if empty) - assert "--qos ''" in call_args + # Assert the qoes is included (even if empty) + assert "--qos=''" in get_submit_command(job, params) + class TestWildcardsWithSlashes(snakemake.common.tests.TestWorkflowsLocalStorageBase): From b3cc9f86ac06db06655a88993a1b209a9404867e Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 28 Mar 2025 14:54:21 +0100 Subject: [PATCH 22/28] fix: blacked --- snakemake_executor_plugin_slurm/submit_string.py | 11 +++++------ tests/tests.py | 9 +++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 0cfe963d..54df126a 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -1,5 +1,6 @@ from snakemake_executor_plugin_slurm_jobstep import get_cpu_setting + def get_submit_command(job, params): """ Return the submit command for the job. @@ -32,12 +33,10 @@ def get_submit_command(job, params): if job.resources.get("constraint") or isinstance( job.resources.get("constraint"), str ): - call += f" -C \'{job.resources.get('constraint')}\'" + call += f" -C '{job.resources.get('constraint')}'" - if job.resources.get("qos") or isinstance( - job.resources.get("qos"), str - ): - call += f" --qos=\'{job.resources.get('qos')}\'" + if job.resources.get("qos") or isinstance(job.resources.get("qos"), str): + call += f" --qos='{job.resources.get('qos')}'" if job.resources.get("mem_mb_per_cpu"): call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" elif job.resources.get("mem_mb"): @@ -63,6 +62,6 @@ def get_submit_command(job, params): # ensure that workdir is set correctly # use short argument as this is the same in all slurm versions # (see https://github.com/snakemake/snakemake/issues/2014) - call += f" -D \'{params['workdir']}\'" + call += f" -D '{params['workdir']}'" return call diff --git a/tests/tests.py b/tests/tests.py index f1fbd62e..63e289bc 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -215,6 +215,7 @@ def test_both_gres_and_gpu_set(self, mock_job): ): set_gres_string(job) + class TestSLURMResources(TestWorkflows): """ Test workflows using job resources passed as part of the job configuration. @@ -270,9 +271,8 @@ def test_constraint_resource(self, mock_job): process_mock.communicate.return_value = ("123", "") process_mock.returncode = 0 mock_popen.return_value = process_mock - + assert " -C 'haswell'" in get_submit_command(job, params) - def test_qos_resource(self, mock_job): """Test that the qos resource is correctly added to the sbatch command.""" @@ -298,7 +298,6 @@ def test_qos_resource(self, mock_job): assert " --qos='normal'" in get_submit_command(job, params) - def test_both_constraint_and_qos(self, mock_job): """Test that both constraint and qos resources can be used together.""" # Create a job with both constraint and qos resources @@ -326,7 +325,6 @@ def test_both_constraint_and_qos(self, mock_job): sbatch_command = get_submit_command(job, params) assert " --qos='high'" in sbatch_command assert " -C 'haswell'" in sbatch_command - def test_no_resources(self, mock_job): """ @@ -403,9 +401,8 @@ def test_empty_qos(self, mock_job): process_mock.communicate.return_value = ("123", "") process_mock.returncode = 0 mock_popen.return_value = process_mock - # Assert the qoes is included (even if empty) + # Assert the qoes is included (even if empty) assert "--qos=''" in get_submit_command(job, params) - class TestWildcardsWithSlashes(snakemake.common.tests.TestWorkflowsLocalStorageBase): From ecba9c49331040d5189d9c52f0ce26a7e430f4b9 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 28 Mar 2025 14:54:45 +0100 Subject: [PATCH 23/28] fix: blacked --- snakemake_executor_plugin_slurm/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 13ae04ef..00fca3e0 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -30,6 +30,7 @@ from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string from .submit_string import get_submit_command + @dataclass class ExecutorSettings(ExecutorSettingsBase): logdir: Optional[Path] = field( @@ -237,8 +238,8 @@ def run_job(self, job: JobExecutorInterface): "account": self.get_account_arg(job), "partition": self.get_partition_arg(job), "workdir": self.workflow.workdir_init, - } - + } + call = get_submit_command(job, job_params) if self.workflow.executor_settings.requeue: @@ -259,7 +260,7 @@ def run_job(self, job: JobExecutorInterface): "No job memory information ('mem_mb' or 'mem_mb_per_cpu') is given " "- submitting without. This might or might not work on your cluster." ) - + # MPI job if job.resources.get("mpi", False): if not job.resources.get("tasks_per_node") and not job.resources.get( From da5a7272a68ef5b35d4a91dd910685c51564c1d8 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 28 Mar 2025 15:09:48 +0100 Subject: [PATCH 24/28] fix: removed doubled check for clusters --- snakemake_executor_plugin_slurm/submit_string.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 54df126a..b8c7e6b1 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -27,9 +27,6 @@ def get_submit_command(job, params): if job.resources.get("runtime"): call += f" -t {job.resources.runtime}" - if job.resources.get("clusters"): - call += f" --clusters {job.resources.clusters}" - if job.resources.get("constraint") or isinstance( job.resources.get("constraint"), str ): From 553eaf787e8d915fe1683c67ed57bff04f9959d4 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Fri, 28 Mar 2025 14:11:52 +0000 Subject: [PATCH 25/28] Update snakemake_executor_plugin_slurm/submit_string.py Co-authored-by: Filipe G. Vieira <1151762+fgvieira@users.noreply.github.com> --- snakemake_executor_plugin_slurm/submit_string.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 54df126a..8cfe653e 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -36,7 +36,7 @@ def get_submit_command(job, params): call += f" -C '{job.resources.get('constraint')}'" if job.resources.get("qos") or isinstance(job.resources.get("qos"), str): - call += f" --qos='{job.resources.get('qos')}'" + call += f" --qos='{job.resources.qos}'" if job.resources.get("mem_mb_per_cpu"): call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" elif job.resources.get("mem_mb"): From ac1c56231e1fa431c20efeb895bfd7a9e523d4af Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Fri, 28 Mar 2025 14:20:48 +0000 Subject: [PATCH 26/28] Update snakemake_executor_plugin_slurm/submit_string.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- snakemake_executor_plugin_slurm/submit_string.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 8cfe653e..95751cb8 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -16,10 +16,10 @@ def get_submit_command(job, params): # check whether an account is set if params.get("account"): - call += f" {params['account']} " + call += f" --account={params['account']}" # check whether a partition is set if params.get("partition"): - call += f" {params['partition']}" + call += f" --partition={params['partition']}" if job.resources.get("clusters"): call += f" --clusters {job.resources.clusters}" From f19859327e09a817b5582aef5c3465fa10d51d7f Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Fri, 28 Mar 2025 14:21:54 +0000 Subject: [PATCH 27/28] Update snakemake_executor_plugin_slurm/submit_string.py Co-authored-by: Filipe G. Vieira <1151762+fgvieira@users.noreply.github.com> --- snakemake_executor_plugin_slurm/submit_string.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 95751cb8..3a67611b 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -62,6 +62,6 @@ def get_submit_command(job, params): # ensure that workdir is set correctly # use short argument as this is the same in all slurm versions # (see https://github.com/snakemake/snakemake/issues/2014) - call += f" -D '{params['workdir']}'" + call += f" -D '{params.workdir}'" return call From b0a666195a6a5bcede0974b09c6540a600f2652c Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 28 Mar 2025 15:39:45 +0100 Subject: [PATCH 28/28] refactor: now, using SimpleNamespace to enable attribute-style access patterns --- .../submit_string.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 97f75efa..b7c1e160 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -1,25 +1,29 @@ from snakemake_executor_plugin_slurm_jobstep import get_cpu_setting +from types import SimpleNamespace def get_submit_command(job, params): """ Return the submit command for the job. """ + # Convert params dict to a SimpleNamespace for attribute-style access + params = SimpleNamespace(**params) + call = ( f"sbatch " f"--parsable " - f"--job-name {params['run_uuid']} " - f"--output \"{params['slurm_logfile']}\" " + f"--job-name {params.run_uuid} " + f'--output "{params.slurm_logfile}" ' f"--export=ALL " - f"--comment \"{params['comment_str']}\"" + f'--comment "{params.comment_str}"' ) # check whether an account is set - if params.get("account"): - call += f" --account={params['account']}" + if hasattr(params, "account"): + call += f" --account={params.account}" # check whether a partition is set - if params.get("partition"): - call += f" --partition={params['partition']}" + if hasattr(params, "partition"): + call += f" --partition={params.partition}" if job.resources.get("clusters"): call += f" --clusters {job.resources.clusters}" @@ -34,6 +38,7 @@ def get_submit_command(job, params): if job.resources.get("qos") or isinstance(job.resources.get("qos"), str): call += f" --qos='{job.resources.qos}'" + if job.resources.get("mem_mb_per_cpu"): call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}" elif job.resources.get("mem_mb"):