diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e92c03b..820100e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,5 +1,6 @@ name: CI on: + merge_group: # GitHub Merge Queue pull_request: push: branches: @@ -15,7 +16,7 @@ jobs: finalize: timeout-minutes: 10 needs: - - unit-tests + - test # Important: the next line MUST be `if: always()`. # Do not change that line. # That line is necessary to make sure that this job runs even if tests fail. @@ -23,13 +24,13 @@ jobs: runs-on: ubuntu-latest steps: - run: | - echo unit-tests: ${{ needs.unit-tests.result }} + echo test: ${{ needs.test.result }} - run: exit 1 # The last line must NOT end with || # All other lines MUST end with || if: | - (needs.unit-tests.result != 'success') - unit-tests: + (needs.test.result != 'success') + test: runs-on: ubuntu-latest timeout-minutes: 20 strategy: @@ -59,65 +60,3 @@ jobs: # If this PR is NOT from a fork, then DO fail CI if the Codecov upload errors. # If this is not a PR, then DO fail CI if the Codecov upload errors. fail_ci_if_error: ${{ github.event_name != 'pull_request' || github.repository == github.event.pull_request.head.repo.full_name }} - test-slurm: - if: false - runs-on: ubuntu-latest - timeout-minutes: 20 - strategy: - fail-fast: false - matrix: - version: - # Please note: You must specify the full Julia version number (major.minor.patch). - # This is because the value here will be directly interpolated into a download URL. - # - '1.2.0' # minimum Julia version supported in Project.toml - - '1.6.7' # previous LTS - - '1.10.7' # current LTS - - '1.11.2' # currently the latest stable release - steps: - - uses: actions/checkout@v4 - with: - persist-credentials: false - - name: Print Docker version - run: | - docker --version - docker version - # This next bit of code is taken from: - # https://github.com/kleinhenz/SlurmClusterManager.jl - # Original author: Joseph Kleinhenz - # License: MIT - - name: Setup Slurm inside Docker - run: | - docker version - docker compose version - docker build --build-arg "JULIA_VERSION=${MATRIX_JULIA_VERSION:?}" -t slurm-cluster-julia -f ci/Dockerfile . - docker compose -f ci/docker-compose.yml up -d - docker ps - env: - MATRIX_JULIA_VERSION: ${{matrix.version}} - - name: Print some information for debugging purposes - run: | - docker exec -t slurmctld pwd - docker exec -t slurmctld ls -la - docker exec -t slurmctld ls -la ElasticClusterManager - - name: Instantiate package - run: docker exec -t slurmctld julia --project=ElasticClusterManager -e 'import Pkg; @show Base.active_project(); Pkg.instantiate(); Pkg.status()' - - name: Run tests without a Slurm allocation - run: docker exec -t slurmctld julia --project=ElasticClusterManager -e 'import Pkg; Pkg.test(; test_args=["slurm"])' - - name: Run tests inside salloc - run: docker exec -t slurmctld salloc -t 00:10:00 -n 2 julia --project=ElasticClusterManager -e 'import Pkg; Pkg.test(; test_args=["slurm"], coverage=true)' - - name: Run tests inside sbatch - run: docker exec -t slurmctld ElasticClusterManager/ci/run_my_sbatch.sh - - run: find . -type f -name '*.cov' - - name: Copy .cov files out of the Docker container - run: docker exec slurmctld /bin/bash -c 'cd /home/docker/ElasticClusterManager && tar -cf - src/*.cov' | tar -xvf - - - run: find . -type f -name '*.cov' - # - run: find . -type f -name '*.cov' -exec cat {} \; - - uses: julia-actions/julia-processcoverage@v1 - - uses: codecov/codecov-action@v5 - with: - files: lcov.info - token: ${{ secrets.CODECOV_TOKEN }} - # If this PR is from a fork, then do NOT fail CI if the Codecov upload errors. - # If this PR is NOT from a fork, then DO fail CI if the Codecov upload errors. - # If this is not a PR, then DO fail CI if the Codecov upload errors. - fail_ci_if_error: ${{ github.event_name != 'pull_request' || github.repository == github.event.pull_request.head.repo.full_name }} diff --git a/Project.toml b/Project.toml index a16fdd4..c33f202 100644 --- a/Project.toml +++ b/Project.toml @@ -12,11 +12,13 @@ Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" Distributed = "< 0.0.1, 1" Logging = "< 0.0.1, 1" Pkg = "< 0.0.1, 1" +Random = "< 0.0.1, 1" Sockets = "< 0.0.1, 1" julia = "1.2" [extras] +Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test"] +test = ["Test", "Random"] diff --git a/README.md b/README.md index 78c5b42..ca85e68 100755 --- a/README.md +++ b/README.md @@ -1,76 +1,18 @@ -# ClusterManagers.jl +# ElasticClusterManager.jl -The `ClusterManagers.jl` package implements code for different job queue systems commonly used on compute clusters. +The ElasticClusterManager.jl package implements the `ElasticManager`. -> [!WARNING] -> This package is not currently being actively maintained or tested. -> -> We are in the process of splitting this package up into multiple smaller packages, with a separate package for each job queue systems. -> -> We are seeking maintainers for these new packages. If you are an active user of any of the job queue systems listed below and are interested in being a maintainer, please open a GitHub issue - say that you are interested in being a maintainer, and specify which job queue system you use. +This code originally lived in the [`ClusterManagers.jl`](https://github.com/JuliaParallel/ClusterManagers.jl) package. -## Available job queue systems - -### In this package The following managers are implemented in this package (the `ClusterManagers.jl` package): -| Job queue system | Command to add processors | +| Manager | Command to add processors | | ---------------- | ------------------------- | -| Local manager with CPU affinity setting | `addprocs(LocalAffinityManager(;np=CPU_CORES, mode::AffinityMode=BALANCED, affinities=[]); kwargs...)` | - -### Implemented in external packages - -| Job queue system | External package | Command to add processors | -| ---------------- | ---------------- | ------------------------- | -| Slurm | [SlurmClusterManager.jl](https://github.com/JuliaParallel/SlurmClusterManager.jl) | `addprocs(SlurmManager(); kwargs...)` | -| Load Sharing Facility (LSF) | [LSFClusterManager.jl](https://github.com/JuliaParallel/LSFClusterManager.jl) | `addprocs_lsf(np::Integer; bsub_flags=``, ssh_cmd=``)` or `addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle))` | -| Kubernetes (K8s) | [K8sClusterManagers.jl](https://github.com/beacon-biosignals/K8sClusterManagers.jl) | `addprocs(K8sClusterManager(np; kwargs...))` | -| Azure scale-sets | [AzManagers.jl](https://github.com/ChevronETC/AzManagers.jl) | `addprocs(vmtemplate, n; kwargs...)` | - -### Not currently being actively maintained - -> [!WARNING] -> The following managers are not currently being actively maintained or tested. -> -> We are seeking maintainers for the following managers. If you are an active user of any of the following job queue systems listed and are interested in being a maintainer, please open a GitHub issue - say that you are interested in being a maintainer, and specify which job queue system you use. -> - -| Job queue system | Command to add processors | -| ---------------- | ------------------------- | -| Sun Grid Engine (SGE) via `qsub` | `addprocs_sge(np::Integer; qsub_flags=``)` or `addprocs(SGEManager(np, qsub_flags))` | -| Sun Grid Engine (SGE) via `qrsh` | `addprocs_qrsh(np::Integer; qsub_flags=``)` or `addprocs(QRSHManager(np, qsub_flags))` | -| PBS (Portable Batch System) | `addprocs_pbs(np::Integer; qsub_flags=``)` or `addprocs(PBSManager(np, qsub_flags))` | -| Scyld | `addprocs_scyld(np::Integer)` or `addprocs(ScyldManager(np))` | -| HTCondor | `addprocs_htc(np::Integer)` or `addprocs(HTCManager(np))` | - -### Custom managers - -You can also write your own custom cluster manager; see the instructions in the [Julia manual](https://docs.julialang.org/en/v1/manual/distributed-computing/#ClusterManagers). +| ElasticManager | `addprocs(ElasticManager(...)` | -## Notes on specific managers -### Slurm: please see [SlurmClusterManager.jl](https://github.com/JuliaParallel/SlurmClusterManager.jl) - -For Slurm, please see the [SlurmClusterManager.jl](https://github.com/JuliaParallel/SlurmClusterManager.jl) package. - -### Using `LocalAffinityManager` (for pinning local workers to specific cores) - -- Linux only feature. -- Requires the Linux `taskset` command to be installed. -- Usage : `addprocs(LocalAffinityManager(;np=CPU_CORES, mode::AffinityMode=BALANCED, affinities=[]); kwargs...)`. - -where - -- `np` is the number of workers to be started. -- `affinities`, if specified, is a list of CPU IDs. As many workers as entries in `affinities` are launched. Each worker is pinned -to the specified CPU ID. -- `mode` (used only when `affinities` is not specified, can be either `COMPACT` or `BALANCED`) - `COMPACT` results in the requested number -of workers pinned to cores in increasing order, For example, worker1 => CPU0, worker2 => CPU1 and so on. `BALANCED` tries to spread -the workers. Useful when we have multiple CPU sockets, with each socket having multiple cores. A `BALANCED` mode results in workers -spread across CPU sockets. Default is `BALANCED`. - -### Using `ElasticManager` (dynamically adding workers to a cluster) +## Using `ElasticManager` (dynamically adding workers to a cluster) The `ElasticManager` is useful in scenarios where we want to dynamically add workers to a cluster. It achieves this by listening on a known port on the master. The launched workers connect to this @@ -100,7 +42,3 @@ ElasticManager: By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor. Once workers are connected, you can print the `em` object again to see them added to the list of active workers. - -### Sun Grid Engine (SGE) - -See [`docs/sge.md`](docs/sge.md) diff --git a/ci/Dockerfile b/ci/Dockerfile deleted file mode 100644 index 4f7cc33..0000000 --- a/ci/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -# This file is taken from: -# https://github.com/kleinhenz/SlurmClusterManager.jl -# Original author: Joseph Kleinhenz -# License: MIT - -FROM jkleinh/slurm-cluster@sha256:afd20dafc831b0fa781460dc871232579ccf1b54955e434531394c331ce388e4 as base -MAINTAINER Joseph Kleinhenz - -ARG JULIA_VERSION=1.6.0 - -RUN mkdir -p /home/docker/.local/opt/julia \ - && cd /home/docker/.local/opt/julia \ - && folder="$(echo ${JULIA_VERSION} | cut -d. -f1-2)" \ - && curl -L https://julialang-s3.julialang.org/bin/linux/x64/${folder}/julia-${JULIA_VERSION}-linux-x86_64.tar.gz | tar xz --strip 1 \ - && /home/docker/.local/opt/julia/bin/julia --version - -ENV PATH="/home/docker/.local/opt/julia/bin:${PATH}" - -COPY --chown=docker . ClusterManagers - -CMD /bin/bash -l diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml deleted file mode 100644 index 86b1df3..0000000 --- a/ci/docker-compose.yml +++ /dev/null @@ -1,48 +0,0 @@ -# This file is taken from: -# https://github.com/kleinhenz/SlurmClusterManager.jl -# Original author: Joseph Kleinhenz -# License: MIT - -version: "3.3" - -services: - slurmctld: - image: slurm-cluster-julia - command: ["slurmctld"] - container_name: slurmctld - hostname: slurmctld - volumes: - - slurm_jobdir:/home/docker - - var_log_slurm:/var/log/slurm - expose: - - "6817" - - c1: - image: slurm-cluster-julia - command: ["slurmd"] - hostname: c1 - container_name: c1 - volumes: - - slurm_jobdir:/home/docker - - var_log_slurm:/var/log/slurm - expose: - - "6818" - depends_on: - - "slurmctld" - - c2: - image: slurm-cluster-julia - command: ["slurmd"] - hostname: c2 - container_name: c2 - volumes: - - slurm_jobdir:/home/docker - - var_log_slurm:/var/log/slurm - expose: - - "6818" - depends_on: - - "slurmctld" - -volumes: - slurm_jobdir: - var_log_slurm: diff --git a/ci/my_sbatch.sh b/ci/my_sbatch.sh deleted file mode 100644 index 33d98a8..0000000 --- a/ci/my_sbatch.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -# Slurm options: -#SBATCH --ntasks=2 -#SBATCH --time=00:10:00 - -# Important note: -# There should be no non-comment non-whitespace lines above this line. - -set -euf -o pipefail - -set -x - -julia --project=ClusterManagers -e 'import Pkg; Pkg.test(; test_args=["slurm"])' diff --git a/ci/run_my_sbatch.sh b/ci/run_my_sbatch.sh deleted file mode 100755 index 509a18d..0000000 --- a/ci/run_my_sbatch.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -set -euf -o pipefail - -set -x - -rm -fv "${HOME:?}/my_stdout.txt" -rm -fv "${HOME:?}/my_stderr.txt" - -sbatch --wait --output="${HOME:?}/my_stdout.txt" --error="${HOME:?}/my_stderr.txt" ./ClusterManagers/ci/my_sbatch.sh - -sleep 5 -cat "${HOME:?}/my_stdout.txt" -cat "${HOME:?}/my_stderr.txt" diff --git a/docs/sge.md b/docs/sge.md deleted file mode 100644 index 8a74b6b..0000000 --- a/docs/sge.md +++ /dev/null @@ -1,70 +0,0 @@ -# Sun Grid Engine (SGE) - -> [!WARNING] -> The SGE functionality is not currently being maintained. -> -> We are currently seeking a new maintainer for the SGE functionality. If you are an active user of SGE and are interested in being a maintainer, please open a GitHub issue - say that you are interested in being a maintainer for the SGE functionality. - -## SGE via `qsub`: Use `ClusterManagers.addprocs_sge` (or `ClusterManagers.SGEManager`) - -```julia -julia> using ClusterManagers - -julia> ClusterManagers.addprocs_sge(5; qsub_flags=`-q queue_name`) -job id is 961, waiting for job to start . -5-element Array{Any,1}: -2 -3 -4 -5 -6 - -julia> @parallel for i=1:5 - run(`hostname`) - end - -julia> From worker 2: compute-6 - From worker 4: compute-6 - From worker 5: compute-6 - From worker 6: compute-6 - From worker 3: compute-6 -``` - -Some clusters require the user to specify a list of required resources. -For example, it may be necessary to specify how much memory will be needed by the job - see this [issue](https://github.com/JuliaLang/julia/issues/10390). -The keyword `qsub_flags` can be used to specify these and other options. -Additionally the keyword `wd` can be used to specify the working directory (which defaults to `ENV["HOME"]`). - -```julia -julia> using Distributed, ClusterManagers - -julia> addprocs_sge(5; qsub_flags=`-q queue_name -l h_vmem=4G,tmem=4G`, wd=mktempdir()) -Job 5672349 in queue. -Running. -5-element Array{Int64,1}: - 2 - 3 - 4 - 5 - 6 - -julia> pmap(x->run(`hostname`),workers()); - -julia> From worker 26: lum-7-2.local - From worker 23: pace-6-10.local - From worker 22: chong-207-10.local - From worker 24: pace-6-11.local - From worker 25: cheech-207-16.local - -julia> rmprocs(workers()) -Task (done) -``` - -## SGE via `qrsh`: Use `ClusterManagers.addprocs_qrsh` (or `ClusterManagers.QRSHManager`) - -`SGEManager` uses SGE's `qsub` command to launch workers, which communicate the -TCP/IP host:port info back to the master via the filesystem. On filesystems -that are tuned to make heavy use of caching to increase throughput, launching -Julia workers can frequently timeout waiting for the standard output files to appear. -In this case, it's better to use the `QRSHManager`, which uses SGE's `qrsh` -command to bypass the filesystem and captures STDOUT directly. diff --git a/slurm_test.jl b/slurm_test.jl deleted file mode 100644 index ca8bc6b..0000000 --- a/slurm_test.jl +++ /dev/null @@ -1,18 +0,0 @@ -#!/cvmfs/soft.computecanada.ca/easybuild/software/2017/avx512/Compiler/gcc7.3/julia/1.1.0/bin/julia -#SBATCH --time=00:05:00 # Running time of hours -#SBATCH --ntasks=4 -#SBATCH --account=def-whitem - - -using Logging, Distributed - -include("/home/mkschleg/mkschleg/ClusterManagers.jl/src/ClusterManagers.jl") - -ClusterManagers.addprocs_slurm(4; exeflags=["--project=.", "--color=yes"], job_file_loc="test_loc") - -@sync begin - @async @sync for job_id in collect(1:100) @spawn begin - println("Hello World $(job_id)") - end - end -end diff --git a/src/ElasticClusterManager.jl b/src/ElasticClusterManager.jl index 05ae9b8..5a3ca09 100755 --- a/src/ElasticClusterManager.jl +++ b/src/ElasticClusterManager.jl @@ -2,22 +2,21 @@ module ElasticClusterManager using Distributed using Sockets -using Pkg +import Pkg export launch, manage, kill, init_worker, connect + +export ElasticManager, elastic_worker + import Distributed: launch, manage, kill, init_worker, connect -worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end -worker_arg() = `--worker=$(worker_cookie())` +function worker_cookie() + Distributed.init_multi() + return Distributed.cluster_cookie() +end +worker_arg() = `--worker=$(worker_cookie())` -# PBS doesn't have the same semantics as SGE wrt to file accumulate, -# a different solution will have to be found -include("qsub.jl") -include("scyld.jl") -include("condor.jl") -include("slurm.jl") -include("affinity.jl") include("elastic.jl") end diff --git a/src/affinity.jl b/src/affinity.jl deleted file mode 100644 index 0dfa177..0000000 --- a/src/affinity.jl +++ /dev/null @@ -1,52 +0,0 @@ -# `LocalAffinityManager` (for pinning local workers to specific cores) - -export LocalAffinityManager, AffinityMode, COMPACT, BALANCED - -@enum AffinityMode COMPACT BALANCED - -mutable struct LocalAffinityManager <: ClusterManager - affinities::Array{Int} - - function LocalAffinityManager(;np=Sys.CPU_THREADS, mode::AffinityMode=BALANCED, affinities::Array{Int}=[]) - @assert(Sys.Sys.KERNEL == :Linux) - - if length(affinities) == 0 - if mode == COMPACT - affinities = [i%Sys.CPU_THREADS for i in 1:np] - else - # mode == BALANCED - if np > 1 - affinities = [Int(floor(i)) for i in range(0, stop=Sys.CPU_THREADS - 1e-3, length=np)] - else - affinities = [0] - end - end - end - - return new(affinities) - end -end - - -function launch(manager::LocalAffinityManager, params::Dict, launched::Array, c::Condition) - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - - for core_id in manager.affinities - io = open(detach( - setenv(`taskset -c $core_id $(Base.julia_cmd(exename)) $exeflags $(worker_arg())`, dir=dir)), "r") - wconfig = WorkerConfig() - wconfig.process = io - wconfig.io = io.out - push!(launched, wconfig) - end - - notify(c) -end - -function manage(manager::LocalAffinityManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :interrupt - kill(get(config.process), 2) - end -end diff --git a/src/condor.jl b/src/condor.jl deleted file mode 100644 index 74b7b61..0000000 --- a/src/condor.jl +++ /dev/null @@ -1,115 +0,0 @@ -# ClusterManager for HTCondor - -export HTCManager, addprocs_htc - -struct HTCManager <: ClusterManager - np::Integer -end - -function condor_script(portnum::Integer, np::Integer, params::Dict) - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - extrajdl = get(params, :extrajdl, []) - extraenv = get(params, :extraenv, []) - extrainputs = get(params, :extrainputs, []) - telnetexe = get(params, :telnetexe, "/usr/bin/telnet") - home = ENV["HOME"] - hostname = ENV["HOSTNAME"] - jobname = "julia-$(getpid())" - tdir = "$home/.julia-htc" - run(`mkdir -p $tdir`) - - scriptf = open("$tdir/$jobname.sh", "w") - println(scriptf, "#!/bin/sh") - for line in extraenv - println(scriptf, line) - end - println(scriptf, "cd $(Base.shell_escape(dir))") - println(scriptf, "$(Base.shell_escape(exename)) $(Base.shell_escape(exeflags)) -e 'using Distributed; start_worker($(repr(worker_cookie())))' | $telnetexe $(Base.shell_escape(hostname)) $portnum") - close(scriptf) - - input_files = ["$tdir/$jobname.sh"] - append!(input_files, extrainputs) - subf = open("$tdir/$jobname.sub", "w") - println(subf, "executable = /bin/bash") - println(subf, "arguments = ./$jobname.sh") - println(subf, "universe = vanilla") - println(subf, "should_transfer_files = yes") - println(subf, "transfer_input_files = $(join(input_files, ','))") - println(subf, "Notification = Error") - for line in extrajdl - println(subf, line) - end - for i = 1:np - println(subf, "output = $tdir/$jobname-$i.o") - println(subf, "error= $tdir/$jobname-$i.e") - println(subf, "queue") - end - close(subf) - - "$tdir/$jobname.sub" -end - -function launch(manager::HTCManager, params::Dict, instances_arr::Array, c::Condition) - let - mgr_desc = "HTCondor" - msg = "The $(mgr_desc) functionality in ClusterManagers.jl is currently not actively maintained. " * - "We are currently looking for a new maintainer. " * - "If you are an active user of the $(mgr_desc) functionality and are interested in becoming the maintainer, " * - "Please open an issue on the JuliaParallel/ClusterManagers.jl repo: " * - "https://github.com/JuliaParallel/ClusterManagers.jl/issues" - Base.depwarn(msg, Symbol(typeof(manager))) - end - try - portnum = rand(8000:9000) - portnum, server = listenany(ip"0.0.0.0", portnum) - np = manager.np - - script = condor_script(portnum, np, params) - cmd = `condor_submit $script` - if !success(cmd) - println("batch queue not available (could not run condor_submit)") - return - end - print("Waiting for $np workers: ") - - for i=1:np - conn = accept(server) - config = WorkerConfig() - - config.io = conn - - push!(instances_arr, config) - notify(c) - print("$i ") - end - println(".") - - catch e - println("Error launching condor") - println(e) - end -end - -function kill(manager::HTCManager, id::Int64, config::WorkerConfig) - remotecall(exit,id) - close(config.io) -end - -function manage(manager::HTCManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :finalize - if !isnothing(config.io) - close(config.io) - end -# elseif op == :interrupt -# job = config[:job] -# task = config[:task] -# # this does not currently work -# if !success(`qsig -s 2 -t $task $job`) -# println("Error sending a Ctrl-C to julia worker $id (job: $job, task: $task)") -# end - end -end - -addprocs_htc(np::Integer) = addprocs(HTCManager(np)) diff --git a/src/elastic.jl b/src/elastic.jl index 49a0907..c419627 100644 --- a/src/elastic.jl +++ b/src/elastic.jl @@ -2,8 +2,6 @@ # Launched workers connect to the master and redirect their STDOUTs to the same # Workers can join and leave the cluster on demand. -export ElasticManager, elastic_worker - const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN @static if Base.VERSION >= v"1.7-" @@ -21,22 +19,22 @@ struct ElasticManager <: ClusterManager sockname printing_kwargs - function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=()) + function ElasticManager(;addr=Sockets.IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=()) Distributed.init_multi() - cookie !== nothing && cluster_cookie(cookie) + cookie !== nothing && Distributed.cluster_cookie(cookie) # Automatically check for the IP address of the local machine if addr == :auto try - addr = Sockets.getipaddr(IPv4) + addr = Sockets.getipaddr(Distributed.IPv4) catch error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") end end - l_sock = listen(addr, port) + l_sock = Distributed.listen(addr, port) - lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), printing_kwargs) + lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), printing_kwargs) t1 = @async begin while true @@ -60,9 +58,9 @@ ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) -function process_worker_conn(mgr::ElasticManager, s::TCPSocket) +function process_worker_conn(mgr::ElasticManager, s::Sockets.TCPSocket) # Socket is the worker's STDOUT - wc = WorkerConfig() + wc = Distributed.WorkerConfig() wc.io = s # Validate cookie @@ -70,7 +68,7 @@ function process_worker_conn(mgr::ElasticManager, s::TCPSocket) if length(cookie) < HDR_COOKIE_LEN error("Cookie read failed. Connection closed by peer.") end - self_cookie = cluster_cookie() + self_cookie = Distributed.cluster_cookie() for i in 1:HDR_COOKIE_LEN if UInt8(self_cookie[i]) != cookie[i] println(i, " ", self_cookie[i], " ", cookie[i]) @@ -93,10 +91,10 @@ function process_pending_connections(mgr::ElasticManager) end end -function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) +function Distributed.launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) # The workers have already been started. while isready(mgr.pending) - wc=WorkerConfig() + wc=Distributed.WorkerConfig() wc.io = take!(mgr.pending) push!(launched, wc) end @@ -104,7 +102,7 @@ function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition notify(c) end -function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol) +function Distributed.manage(mgr::ElasticManager, id::Integer, config::Distributed.WorkerConfig, op::Symbol) if op == :register mgr.active[id] = config elseif op == :deregister @@ -153,7 +151,7 @@ function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project ip = string(em.sockname[1]) port = convert(Int,em.sockname[2]) - cookie = cluster_cookie() + cookie = Distributed.cluster_cookie() exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia" project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : () diff --git a/src/qsub.jl b/src/qsub.jl deleted file mode 100644 index 2d61161..0000000 --- a/src/qsub.jl +++ /dev/null @@ -1,139 +0,0 @@ -export PBSManager, SGEManager, QRSHManager, addprocs_pbs, addprocs_sge, addprocs_qrsh - -struct PBSManager <: ClusterManager - np::Integer - queue - wd -end - -struct SGEManager <: ClusterManager - np::Integer - queue - wd -end - -struct QRSHManager <: ClusterManager - np::Integer - queue - wd -end - -function launch(manager::Union{PBSManager, SGEManager, QRSHManager}, - params::Dict, instances_arr::Array, c::Condition) - let - if manager isa PBSManager - mgr_desc = "Portable Batch System (PBS)" - elseif manager isa SGEManager - mgr_desc = "Sun Grid Engine (SGE)" - else - # manager isa QRSHManager - # - # `qrsh` is only available for SGE. - # It is not available for OpenPBS. - mgr_desc = "Sun Grid Engine (SGE)" - end - msg = "The $(mgr_desc) functionality in ClusterManagers.jl is currently not actively maintained. " * - "We are currently looking for a new maintainer. " * - "If you are an active user of the $(mgr_desc) functionality and are interested in becoming the maintainer, " * - "Please open an issue on the JuliaParallel/ClusterManagers.jl repo: " * - "https://github.com/JuliaParallel/ClusterManagers.jl/issues" - Base.depwarn(msg, Symbol(typeof(manager))) - end - try - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - wd = manager.wd - isPBS = isa(manager, PBSManager) - - np = manager.np - queue = manager.queue - - jobname = `julia-$(getpid())` - - if isa(manager, QRSHManager) - cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())` - qrsh_cmd = `qrsh $queue -V -N $jobname -wd $wd -now n "$cmd"` - - stream_proc = [open(qrsh_cmd) for i in 1:np] - - for i in 1:np - config = WorkerConfig() - config.io, io_proc = stream_proc[i] - config.userdata = Dict{Symbol, Any}(:task => i, - :process => io_proc) - push!(instances_arr, config) - notify(c) - end - - else # PBS & SGE - cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())` - qsub_cmd = pipeline(`echo $(Base.shell_escape(cmd))` , (isPBS ? - `qsub -N $jobname -wd $wd -j oe -k o -t 1-$np $queue` : - `qsub -N $jobname -wd $wd -terse -j y -R y -t 1-$np -V $queue`)) - - out = open(qsub_cmd) - if !success(out) - throw(error()) # qsub already gives a message - end - - id = chomp(split(readline(out),'.')[1]) - if endswith(id, "[]") - id = id[1:end-2] - end - - filenames(i) = "$wd/julia-$(getpid()).o$id-$i","$wd/julia-$(getpid())-$i.o$id","$wd/julia-$(getpid()).o$id.$i" - - println("Job $id in queue.") - for i=1:np - # wait for each output stream file to get created - fnames = filenames(i) - j = 0 - while (j=findfirst(x->isfile(x),fnames))==nothing - sleep(1.0) - end - fname = fnames[j] - - # Hack to get Base to get the host:port, the Julia process has already started. - cmd = `tail -f $fname` - - config = WorkerConfig() - - config.io = open(detach(cmd)) - - config.userdata = Dict{Symbol, Any}(:job=>id, :task=>i, :iofile=>fname) - push!(instances_arr, config) - notify(c) - end - println("Running.") - end - - catch e - println("Error launching workers") - println(e) - end -end - -function manage(manager::Union{PBSManager, SGEManager, QRSHManager}, - id::Int64, config::WorkerConfig, op::Symbol) -end - -function kill(manager::Union{PBSManager, SGEManager, QRSHManager}, id::Int64, config::WorkerConfig) - remotecall(exit,id) - close(config.io) - - if isa(manager, QRSHManager) - kill(config.userdata[:process],15) - return - end - - if isfile(config.userdata[:iofile]) - rm(config.userdata[:iofile]) - end -end - -addprocs_pbs(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(PBSManager(np, qsub_flags, wd); kwargs...) - -addprocs_sge(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(SGEManager(np, qsub_flags, wd); kwargs...) - -addprocs_qrsh(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(QRSHManager(np, qsub_flags, wd); kwargs...) diff --git a/src/scyld.jl b/src/scyld.jl deleted file mode 100644 index 4259158..0000000 --- a/src/scyld.jl +++ /dev/null @@ -1,63 +0,0 @@ -export addprocs_scyld, ScyldManager - -struct ScyldManager <: ClusterManager - np::Integer -end - -function launch(manager::ScyldManager, params::Dict, instances_arr::Array, c::Condition) - let - manager_description = "Scyld ClusterWare" - msg = "The $(mgr_desc) functionality in ClusterManagers.jl is currently not actively maintained. " * - "We are currently looking for a new maintainer. " * - "If you are an active user of the $(mgr_desc) functionality and are interested in becoming the maintainer, " * - "Please open an issue on the JuliaParallel/ClusterManagers.jl repo: " * - "https://github.com/JuliaParallel/ClusterManagers.jl/issues" - Base.depwarn(msg, Symbol(typeof(manager))) - end - try - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - np = manager.np - - beomap_cmd = `bpsh -1 beomap --no-local --np $np` - out,beomap_proc = open(beomap_cmd) - wait(beomap_proc) - if !success(beomap_proc) - error("node availability inaccessible (could not run beomap)") - end - nodes = split(chomp(readline(out)),':') - for (i,node) in enumerate(nodes) - cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())` - cmd = detach(`bpsh $node sh -l -c $(Base.shell_escape(cmd))`) - config = WorkerConfig() - - config.io,_ = open(cmd) - config.io.line_buffered = true - config.userdata = Dict{Symbol, Any}() - config.userdata[:node] = node - - push!(instances_arr, config) - notify(c) - end - catch e - println("Error launching beomap") - println(e) - end -end - -function manage(manager::ScyldManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :interrupt - if !isnull(config.ospid) - node = config.userdata[:node] - if !success(`bpsh $node kill -2 $(get(config.ospid))`) - println("Error sending Ctrl-C to julia worker $id on node $node") - end - else - # This state can happen immediately after an addprocs - println("Worker $id cannot be presently interrupted.") - end - end -end - -addprocs_scyld(np::Integer) = addprocs(ScyldManager(np)) diff --git a/src/slurm.jl b/src/slurm.jl deleted file mode 100644 index 009448b..0000000 --- a/src/slurm.jl +++ /dev/null @@ -1,180 +0,0 @@ -# ClusterManager for Slurm - -export SlurmManager, addprocs_slurm - -import Logging.@warn - -struct SlurmManager <: ClusterManager - np::Integer - retry_delays -end - -struct SlurmException <: Exception - msg -end - -function launch(manager::SlurmManager, params::Dict, instances_arr::Array, - c::Condition) - let - msg = "The Slurm functionality in the `ClusterManagers.jl` package is deprecated " * - "(including `ClusterManagers.addprocs_slurm` and `ClusterManagers.SlurmManager`). " * - "It will be removed from ClusterManagers.jl in a future release. " * - "We recommend migrating to the " * - "[https://github.com/JuliaParallel/SlurmClusterManager.jl](https://github.com/JuliaParallel/SlurmClusterManager.jl) " * - "package instead." - Base.depwarn(msg, :SlurmManager; force = true) - end - try - exehome = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - - stdkeys = keys(Distributed.default_addprocs_params()) - - p = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params) - - srunargs = [] - for k in keys(p) - if length(string(k)) == 1 - push!(srunargs, "-$k") - val = p[k] - if length(val) > 0 - push!(srunargs, "$(p[k])") - end - else - k2 = replace(string(k), "_"=>"-") - val = p[k] - if length(val) > 0 - push!(srunargs, "--$(k2)=$(p[k])") - else - push!(srunargs, "--$(k2)") - end - end - end - - # Get job file location from parameter dictionary. - job_file_loc = joinpath(exehome, get(params, :job_file_loc, ".")) - - # Make directory if not already made. - if !isdir(job_file_loc) - mkdir(job_file_loc) - end - - # Check for given output file name - jobname = "julia-$(getpid())" - has_output_name = ("-o" in srunargs) | ("--output" in srunargs) - if has_output_name - loc = findfirst(x-> x == "-o" || x == "--output", srunargs) - job_output_name = srunargs[loc+1] - job_output_template = joinpath(job_file_loc, job_output_name) - srunargs[loc+1] = job_output_template - else - job_output_name = "$(jobname)-$(trunc(Int, Base.time() * 10))" - make_job_output_path(task_num) = joinpath(job_file_loc, "$(job_output_name)-$(task_num).out") - job_output_template = make_job_output_path("%4t") - push!(srunargs, "-o", job_output_template) - end - - np = manager.np - srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())` - - @info "Starting SLURM job $jobname: $srun_cmd" - srun_proc = open(srun_cmd) - - slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})" - could_not_connect_regex = r"could not connect" - exiting_regex = r"exiting." - retry_delays = manager.retry_delays - - t_start = time() - t_waited = round(Int, time() - t_start) - for i = 0:np - 1 - slurm_spec_match::Union{RegexMatch,Nothing} = nothing - worker_errors = String[] - if has_output_name - fn = job_output_template - else - fn = make_job_output_path(lpad(i, 4, "0")) - end - for retry_delay in push!(collect(retry_delays), 0) - t_waited = round(Int, time() - t_start) - - # Wait for output log to be created and populated, then parse - - if isfile(fn) - if filesize(fn) > 0 - open(fn) do f - # Due to error and warning messages, the specification - # may not appear on the file's first line - for line in eachline(f) - re_match = match(slurm_spec_regex, line) - if !isnothing(re_match) - slurm_spec_match = re_match - end - for expr in [could_not_connect_regex, exiting_regex] - if !isnothing(match(expr, line)) - slurm_spec_match = nothing - push!(worker_errors, line) - end - end - end - end - end - if !isempty(worker_errors) || !isnothing(slurm_spec_match) - break # break if error or specification found - else - @info "Worker $i (after $t_waited s): Output file found, but no connection details yet" - end - else - @info "Worker $i (after $t_waited s): No output file \"$fn\" yet" - end - - # Sleep for some time to limit resource usage while waiting for the job to start - sleep(retry_delay) - end - - if !isempty(worker_errors) - throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))")) - elseif isnothing(slurm_spec_match) - throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready.")) - end - - config = WorkerConfig() - config.port = parse(Int, slurm_spec_match[2]) - config.host = strip(slurm_spec_match[3]) - @info "Worker $i ready after $t_waited s on host $(config.host), port $(config.port)" - # Keep a reference to the proc, so it's properly closed once - # the last worker exits. - config.userdata = srun_proc - push!(instances_arr, config) - notify(c) - end - catch e - @error "Error launching Slurm job" - rethrow(e) - end -end - -function manage(manager::SlurmManager, id::Integer, config::WorkerConfig, - op::Symbol) - # This function needs to exist, but so far we don't do anything -end - -SlurmManager(np::Integer) = SlurmManager(np, ExponentialBackOff(n=10, first_delay=1, - max_delay=512, factor=2)) - -""" -Launch `np` workers on a cluster managed by slurm. `retry_delays` is a vector of -numbers specifying in seconds how long to repeatedly wait for a worker to start. -Defaults to an exponential backoff. - -# Examples - -``` -addprocs_slurm(100; retry_delays=Iterators.repeated(0.1)) -``` -""" -addprocs_slurm(np::Integer; - retry_delays=ExponentialBackOff(n=10, first_delay=1, - max_delay=512, factor=2), - kwargs...) = addprocs(SlurmManager(np, retry_delays); kwargs...) diff --git a/test/cookie.jl b/test/cookie.jl new file mode 100644 index 0000000..ade716d --- /dev/null +++ b/test/cookie.jl @@ -0,0 +1,3 @@ +@test ElasticClusterManager.worker_arg() == `--worker=$(ElasticClusterManager.worker_cookie())` + +@test ElasticClusterManager.worker_cookie() == Distributed.cluster_cookie() diff --git a/test/elastic.jl b/test/elastic.jl index 54fc19a..26194c0 100644 --- a/test/elastic.jl +++ b/test/elastic.jl @@ -1,10 +1,13 @@ @testset "ElasticManager" begin TIMEOUT = 10. - em = ElasticManager(addr=:auto, port=0) + em = ElasticManager(; addr=:auto, port=0) + @test em isa ElasticManager # launch worker - run(`sh -c $(ElasticClusterManager.get_connect_cmd(em))`, wait=false) + old_cmd = ElasticClusterManager.get_connect_cmd(em) + new_cmd = `$(old_cmd) --coverage=user` + run(`sh -c $(new_cmd)`, wait=false) # wait at most TIMEOUT seconds for it to connect @test :ok == timedwait(TIMEOUT) do @@ -22,4 +25,12 @@ @test lines[4] == "Terminated workers : [ 2]" @test lines[5] == "Worker connect command :" end + + @testset "Other constructors for ElasticManager()" begin + @test ElasticManager(9001) isa ElasticManager + @test ElasticManager(ip"127.0.0.1", 9002) isa ElasticManager + @test Distributed.HDR_COOKIE_LEN isa Real + @test Distributed.HDR_COOKIE_LEN >= 16 + @test ElasticManager(ip"127.0.0.1", 9003, Random.randstring(Distributed.HDR_COOKIE_LEN)) isa ElasticManager + end end diff --git a/test/runtests.jl b/test/runtests.jl index 093e049..c8c24f3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,54 +2,20 @@ import ElasticClusterManager import Test import Distributed +import Random # Bring some names into scope, just for convenience: using Distributed: addprocs, rmprocs using Distributed: workers, nworkers using Distributed: procs, nprocs using Distributed: remotecall_fetch, @spawnat +using Distributed: @ip_str using Test: @testset, @test, @test_skip + # ElasticManager: using ElasticClusterManager: ElasticManager -# Slurm: -using ElasticClusterManager: addprocs_slurm, SlurmManager -# SGE: -using ElasticClusterManager: addprocs_sge, SGEManager - -const test_args = lowercase.(strip.(ARGS)) - -@info "" test_args - -slurm_is_installed() = !isnothing(Sys.which("sbatch")) -qsub_is_installed() = !isnothing(Sys.which("qsub")) @testset "ElasticClusterManager.jl" begin include("elastic.jl") - - if slurm_is_installed() - @info "Running the Slurm tests..." Sys.which("sbatch") - include("slurm.jl") - else - if "slurm" in test_args - @error "ERROR: The Slurm tests were explicitly requested in ARGS, but sbatch was not found, so the Slurm tests cannot be run" Sys.which("sbatch") test_args - @test false - else - @warn "sbatch was not found - Slurm tests will be skipped" Sys.which("sbatch") - @test_skip false - end - end - - if qsub_is_installed() - @info "Running the SGE (via qsub) tests..." Sys.which("qsub") - include("sge_qsub.jl") - else - if "sge_qsub" in test_args - @error "ERROR: The SGE tests were explicitly requested in ARGS, but qsub was not found, so the SGE tests cannot be run" Sys.which("qsub") test_args - @test false - else - @warn "qsub was not found - SGE tests will be skipped" Sys.which("qsub") - @test_skip false - end - end - + include("cookie.jl") end # @testset diff --git a/test/sge_qsub.jl b/test/sge_qsub.jl deleted file mode 100644 index 9fdda5e..0000000 --- a/test/sge_qsub.jl +++ /dev/null @@ -1,10 +0,0 @@ -@testset "SGEManager (addprocs_sge via qsub)" begin - p = addprocs_sge(1, queue=``) - @test nprocs() == 2 - @test workers() == p - @test fetch(@spawnat :any myid()) == p[1] - @test remotecall_fetch(+,p[1],1,1) == 2 - rmprocs(p) - @test nprocs() == 1 - @test workers() == [1] -end diff --git a/test/slurm.jl b/test/slurm.jl deleted file mode 100644 index 6e5928d..0000000 --- a/test/slurm.jl +++ /dev/null @@ -1,29 +0,0 @@ -@testset "Slurm" begin - mktempdir() do tmpdir - cd(tmpdir) do - outfile = joinpath(tmpdir, "my_slurm_job.out") - p = addprocs_slurm(1; o=outfile) - @test nprocs() == 2 - @test workers() == p - @test fetch(@spawnat :any myid()) == p[1] - @test remotecall_fetch(+,p[1],1,1) == 2 - rmprocs(p) - @test nprocs() == 1 - @test workers() == [1] - - # Check that `outfile` exists: - @test isfile(outfile) - # Check that `outfile` is not empty: - outfile_contents = read(outfile, String) - @test length(strip(outfile_contents)) > 5 - - println(Base.stderr, "# BEGIN: contents of my_slurm_job.out") - println(Base.stderr, outfile_contents) - println(Base.stderr, "# END: contents of my_slurm_job.out") - - # No need to manually delete the `outfile` file. - # The entire `tmpdir` will automatically be removed when the `mktempdir() do ...` block ends. - # rm(outfile) - end - end -end