From 7245492588e44551bf95019cb1ae168abdf6645b Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Sun, 16 Feb 2025 07:55:06 -0500 Subject: [PATCH] Excision: Remove all non-Condor code --- .github/workflows/ci.yml | 62 ------------ ci/Dockerfile | 21 ---- ci/docker-compose.yml | 48 --------- ci/my_sbatch.sh | 14 --- ci/run_my_sbatch.sh | 14 --- docs/sge.md | 70 ------------- slurm_test.jl | 18 ---- src/HTCondorClusterManager.jl | 18 ++-- src/affinity.jl | 52 ---------- src/condor.jl | 12 +-- src/elastic.jl | 156 ----------------------------- src/qsub.jl | 139 -------------------------- src/scyld.jl | 63 ------------ src/slurm.jl | 180 ---------------------------------- test/elastic.jl | 25 ----- test/runtests.jl | 43 +------- test/sge_qsub.jl | 10 -- test/slurm.jl | 29 ------ 18 files changed, 14 insertions(+), 960 deletions(-) delete mode 100644 ci/Dockerfile delete mode 100644 ci/docker-compose.yml delete mode 100644 ci/my_sbatch.sh delete mode 100755 ci/run_my_sbatch.sh delete mode 100644 docs/sge.md delete mode 100644 slurm_test.jl delete mode 100644 src/affinity.jl delete mode 100644 src/elastic.jl delete mode 100644 src/qsub.jl delete mode 100644 src/scyld.jl delete mode 100644 src/slurm.jl delete mode 100644 test/elastic.jl delete mode 100644 test/sge_qsub.jl delete mode 100644 test/slurm.jl diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a4b8080..b0434cf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,68 +66,6 @@ jobs: # If this PR is NOT from a fork, then DO fail CI if the Codecov upload errors. # If this is not a PR, then DO fail CI if the Codecov upload errors. fail_ci_if_error: ${{ github.event_name != 'pull_request' || github.repository == github.event.pull_request.head.repo.full_name }} - test-slurm: - if: false - runs-on: ubuntu-latest - timeout-minutes: 20 - strategy: - fail-fast: false - matrix: - version: - # Please note: You must specify the full Julia version number (major.minor.patch). - # This is because the value here will be directly interpolated into a download URL. - # - '1.2.0' # minimum Julia version supported in Project.toml - - '1.6.7' # previous LTS - - '1.10.7' # current LTS - - '1.11.2' # currently the latest stable release - steps: - - uses: actions/checkout@v4 - with: - persist-credentials: false - - name: Print Docker version - run: | - docker --version - docker version - # This next bit of code is taken from: - # https://github.com/kleinhenz/SlurmClusterManager.jl - # Original author: Joseph Kleinhenz - # License: MIT - - name: Setup Slurm inside Docker - run: | - docker version - docker compose version - docker build --build-arg "JULIA_VERSION=${MATRIX_JULIA_VERSION:?}" -t slurm-cluster-julia -f ci/Dockerfile . - docker compose -f ci/docker-compose.yml up -d - docker ps - env: - MATRIX_JULIA_VERSION: ${{matrix.version}} - - name: Print some information for debugging purposes - run: | - docker exec -t slurmctld pwd - docker exec -t slurmctld ls -la - docker exec -t slurmctld ls -la HTCondorClusterManager - - name: Instantiate package - run: docker exec -t slurmctld julia --project=HTCondorClusterManager -e 'import Pkg; @show Base.active_project(); Pkg.instantiate(); Pkg.status()' - - name: Run tests without a Slurm allocation - run: docker exec -t slurmctld julia --project=HTCondorClusterManager -e 'import Pkg; Pkg.test(; test_args=["slurm"])' - - name: Run tests inside salloc - run: docker exec -t slurmctld salloc -t 00:10:00 -n 2 julia --project=HTCondorClusterManager -e 'import Pkg; Pkg.test(; test_args=["slurm"], coverage=true)' - - name: Run tests inside sbatch - run: docker exec -t slurmctld HTCondorClusterManager/ci/run_my_sbatch.sh - - run: find . -type f -name '*.cov' - - name: Copy .cov files out of the Docker container - run: docker exec slurmctld /bin/bash -c 'cd /home/docker/HTCondorClusterManager && tar -cf - src/*.cov' | tar -xvf - - - run: find . -type f -name '*.cov' - # - run: find . -type f -name '*.cov' -exec cat {} \; - - uses: julia-actions/julia-processcoverage@v1 - - uses: codecov/codecov-action@v5 - with: - files: lcov.info - token: ${{ secrets.CODECOV_TOKEN }} - # If this PR is from a fork, then do NOT fail CI if the Codecov upload errors. - # If this PR is NOT from a fork, then DO fail CI if the Codecov upload errors. - # If this is not a PR, then DO fail CI if the Codecov upload errors. - fail_ci_if_error: ${{ github.event_name != 'pull_request' || github.repository == github.event.pull_request.head.repo.full_name }} example-pull-gcr: runs-on: ubuntu-latest timeout-minutes: 20 diff --git a/ci/Dockerfile b/ci/Dockerfile deleted file mode 100644 index 4f7cc33..0000000 --- a/ci/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -# This file is taken from: -# https://github.com/kleinhenz/SlurmClusterManager.jl -# Original author: Joseph Kleinhenz -# License: MIT - -FROM jkleinh/slurm-cluster@sha256:afd20dafc831b0fa781460dc871232579ccf1b54955e434531394c331ce388e4 as base -MAINTAINER Joseph Kleinhenz - -ARG JULIA_VERSION=1.6.0 - -RUN mkdir -p /home/docker/.local/opt/julia \ - && cd /home/docker/.local/opt/julia \ - && folder="$(echo ${JULIA_VERSION} | cut -d. -f1-2)" \ - && curl -L https://julialang-s3.julialang.org/bin/linux/x64/${folder}/julia-${JULIA_VERSION}-linux-x86_64.tar.gz | tar xz --strip 1 \ - && /home/docker/.local/opt/julia/bin/julia --version - -ENV PATH="/home/docker/.local/opt/julia/bin:${PATH}" - -COPY --chown=docker . ClusterManagers - -CMD /bin/bash -l diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml deleted file mode 100644 index 86b1df3..0000000 --- a/ci/docker-compose.yml +++ /dev/null @@ -1,48 +0,0 @@ -# This file is taken from: -# https://github.com/kleinhenz/SlurmClusterManager.jl -# Original author: Joseph Kleinhenz -# License: MIT - -version: "3.3" - -services: - slurmctld: - image: slurm-cluster-julia - command: ["slurmctld"] - container_name: slurmctld - hostname: slurmctld - volumes: - - slurm_jobdir:/home/docker - - var_log_slurm:/var/log/slurm - expose: - - "6817" - - c1: - image: slurm-cluster-julia - command: ["slurmd"] - hostname: c1 - container_name: c1 - volumes: - - slurm_jobdir:/home/docker - - var_log_slurm:/var/log/slurm - expose: - - "6818" - depends_on: - - "slurmctld" - - c2: - image: slurm-cluster-julia - command: ["slurmd"] - hostname: c2 - container_name: c2 - volumes: - - slurm_jobdir:/home/docker - - var_log_slurm:/var/log/slurm - expose: - - "6818" - depends_on: - - "slurmctld" - -volumes: - slurm_jobdir: - var_log_slurm: diff --git a/ci/my_sbatch.sh b/ci/my_sbatch.sh deleted file mode 100644 index 33d98a8..0000000 --- a/ci/my_sbatch.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -# Slurm options: -#SBATCH --ntasks=2 -#SBATCH --time=00:10:00 - -# Important note: -# There should be no non-comment non-whitespace lines above this line. - -set -euf -o pipefail - -set -x - -julia --project=ClusterManagers -e 'import Pkg; Pkg.test(; test_args=["slurm"])' diff --git a/ci/run_my_sbatch.sh b/ci/run_my_sbatch.sh deleted file mode 100755 index 509a18d..0000000 --- a/ci/run_my_sbatch.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -set -euf -o pipefail - -set -x - -rm -fv "${HOME:?}/my_stdout.txt" -rm -fv "${HOME:?}/my_stderr.txt" - -sbatch --wait --output="${HOME:?}/my_stdout.txt" --error="${HOME:?}/my_stderr.txt" ./ClusterManagers/ci/my_sbatch.sh - -sleep 5 -cat "${HOME:?}/my_stdout.txt" -cat "${HOME:?}/my_stderr.txt" diff --git a/docs/sge.md b/docs/sge.md deleted file mode 100644 index 8a74b6b..0000000 --- a/docs/sge.md +++ /dev/null @@ -1,70 +0,0 @@ -# Sun Grid Engine (SGE) - -> [!WARNING] -> The SGE functionality is not currently being maintained. -> -> We are currently seeking a new maintainer for the SGE functionality. If you are an active user of SGE and are interested in being a maintainer, please open a GitHub issue - say that you are interested in being a maintainer for the SGE functionality. - -## SGE via `qsub`: Use `ClusterManagers.addprocs_sge` (or `ClusterManagers.SGEManager`) - -```julia -julia> using ClusterManagers - -julia> ClusterManagers.addprocs_sge(5; qsub_flags=`-q queue_name`) -job id is 961, waiting for job to start . -5-element Array{Any,1}: -2 -3 -4 -5 -6 - -julia> @parallel for i=1:5 - run(`hostname`) - end - -julia> From worker 2: compute-6 - From worker 4: compute-6 - From worker 5: compute-6 - From worker 6: compute-6 - From worker 3: compute-6 -``` - -Some clusters require the user to specify a list of required resources. -For example, it may be necessary to specify how much memory will be needed by the job - see this [issue](https://github.com/JuliaLang/julia/issues/10390). -The keyword `qsub_flags` can be used to specify these and other options. -Additionally the keyword `wd` can be used to specify the working directory (which defaults to `ENV["HOME"]`). - -```julia -julia> using Distributed, ClusterManagers - -julia> addprocs_sge(5; qsub_flags=`-q queue_name -l h_vmem=4G,tmem=4G`, wd=mktempdir()) -Job 5672349 in queue. -Running. -5-element Array{Int64,1}: - 2 - 3 - 4 - 5 - 6 - -julia> pmap(x->run(`hostname`),workers()); - -julia> From worker 26: lum-7-2.local - From worker 23: pace-6-10.local - From worker 22: chong-207-10.local - From worker 24: pace-6-11.local - From worker 25: cheech-207-16.local - -julia> rmprocs(workers()) -Task (done) -``` - -## SGE via `qrsh`: Use `ClusterManagers.addprocs_qrsh` (or `ClusterManagers.QRSHManager`) - -`SGEManager` uses SGE's `qsub` command to launch workers, which communicate the -TCP/IP host:port info back to the master via the filesystem. On filesystems -that are tuned to make heavy use of caching to increase throughput, launching -Julia workers can frequently timeout waiting for the standard output files to appear. -In this case, it's better to use the `QRSHManager`, which uses SGE's `qrsh` -command to bypass the filesystem and captures STDOUT directly. diff --git a/slurm_test.jl b/slurm_test.jl deleted file mode 100644 index ca8bc6b..0000000 --- a/slurm_test.jl +++ /dev/null @@ -1,18 +0,0 @@ -#!/cvmfs/soft.computecanada.ca/easybuild/software/2017/avx512/Compiler/gcc7.3/julia/1.1.0/bin/julia -#SBATCH --time=00:05:00 # Running time of hours -#SBATCH --ntasks=4 -#SBATCH --account=def-whitem - - -using Logging, Distributed - -include("/home/mkschleg/mkschleg/ClusterManagers.jl/src/ClusterManagers.jl") - -ClusterManagers.addprocs_slurm(4; exeflags=["--project=.", "--color=yes"], job_file_loc="test_loc") - -@sync begin - @async @sync for job_id in collect(1:100) @spawn begin - println("Hello World $(job_id)") - end - end -end diff --git a/src/HTCondorClusterManager.jl b/src/HTCondorClusterManager.jl index 3479583..aab3816 100755 --- a/src/HTCondorClusterManager.jl +++ b/src/HTCondorClusterManager.jl @@ -1,23 +1,17 @@ module HTCondorClusterManager -using Distributed -using Sockets -using Pkg +import Distributed +import Sockets +import Pkg + +using Distributed: launch, manage, kill, init_worker, connect export launch, manage, kill, init_worker, connect -import Distributed: launch, manage, kill, init_worker, connect + worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end worker_arg() = `--worker=$(worker_cookie())` - -# PBS doesn't have the same semantics as SGE wrt to file accumulate, -# a different solution will have to be found -include("qsub.jl") -include("scyld.jl") include("condor.jl") -include("slurm.jl") -include("affinity.jl") -include("elastic.jl") end diff --git a/src/affinity.jl b/src/affinity.jl deleted file mode 100644 index 0dfa177..0000000 --- a/src/affinity.jl +++ /dev/null @@ -1,52 +0,0 @@ -# `LocalAffinityManager` (for pinning local workers to specific cores) - -export LocalAffinityManager, AffinityMode, COMPACT, BALANCED - -@enum AffinityMode COMPACT BALANCED - -mutable struct LocalAffinityManager <: ClusterManager - affinities::Array{Int} - - function LocalAffinityManager(;np=Sys.CPU_THREADS, mode::AffinityMode=BALANCED, affinities::Array{Int}=[]) - @assert(Sys.Sys.KERNEL == :Linux) - - if length(affinities) == 0 - if mode == COMPACT - affinities = [i%Sys.CPU_THREADS for i in 1:np] - else - # mode == BALANCED - if np > 1 - affinities = [Int(floor(i)) for i in range(0, stop=Sys.CPU_THREADS - 1e-3, length=np)] - else - affinities = [0] - end - end - end - - return new(affinities) - end -end - - -function launch(manager::LocalAffinityManager, params::Dict, launched::Array, c::Condition) - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - - for core_id in manager.affinities - io = open(detach( - setenv(`taskset -c $core_id $(Base.julia_cmd(exename)) $exeflags $(worker_arg())`, dir=dir)), "r") - wconfig = WorkerConfig() - wconfig.process = io - wconfig.io = io.out - push!(launched, wconfig) - end - - notify(c) -end - -function manage(manager::LocalAffinityManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :interrupt - kill(get(config.process), 2) - end -end diff --git a/src/condor.jl b/src/condor.jl index fa4a4d4..f3db53b 100644 --- a/src/condor.jl +++ b/src/condor.jl @@ -2,7 +2,7 @@ export HTCManager, addprocs_htc -struct HTCManager <: ClusterManager +struct HTCManager <: Distributed.ClusterManager np::Integer end @@ -51,7 +51,7 @@ function condor_script(portnum::Integer, np::Integer, params::Dict) "$tdir/$jobname.sub" end -function launch(manager::HTCManager, params::Dict, instances_arr::Array, c::Condition) +function Distributed.launch(manager::HTCManager, params::Dict, instances_arr::Array, c::Condition) let mgr_desc = "HTCondor" msg = "The $(mgr_desc) functionality in ClusterManagers.jl is currently not actively maintained. " * @@ -63,7 +63,7 @@ function launch(manager::HTCManager, params::Dict, instances_arr::Array, c::Cond end try portnum = rand(8000:9000) - portnum, server = listenany(ip"0.0.0.0", portnum) + portnum, server = listenany(Distributed.ip"0.0.0.0", portnum) np = manager.np script = condor_script(portnum, np, params) @@ -76,7 +76,7 @@ function launch(manager::HTCManager, params::Dict, instances_arr::Array, c::Cond for i=1:np conn = accept(server) - config = WorkerConfig() + config = Distributed.WorkerConfig() config.io = conn @@ -92,12 +92,12 @@ function launch(manager::HTCManager, params::Dict, instances_arr::Array, c::Cond end end -function kill(manager::HTCManager, id::Int64, config::WorkerConfig) +function Distributed.kill(manager::HTCManager, id::Int64, config::Distributed.WorkerConfig) remotecall(exit,id) close(config.io) end -function manage(manager::HTCManager, id::Integer, config::WorkerConfig, op::Symbol) +function Distributed.manage(manager::HTCManager, id::Integer, config::Distributed.WorkerConfig, op::Symbol) if op == :finalize if !isnothing(config.io) close(config.io) diff --git a/src/elastic.jl b/src/elastic.jl deleted file mode 100644 index 78ede3f..0000000 --- a/src/elastic.jl +++ /dev/null @@ -1,156 +0,0 @@ -# The master process listens on a well-known port -# Launched workers connect to the master and redirect their STDOUTs to the same -# Workers can join and leave the cluster on demand. - -export ElasticManager, elastic_worker - -const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN - -struct ElasticManager <: ClusterManager - active::Dict{Int, WorkerConfig} # active workers - pending::Channel{TCPSocket} # to be added workers - terminated::Set{Int} # terminated worker ids - topology::Symbol - sockname - printing_kwargs - - function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=()) - Distributed.init_multi() - cookie !== nothing && cluster_cookie(cookie) - - # Automatically check for the IP address of the local machine - if addr == :auto - try - addr = Sockets.getipaddr(IPv4) - catch - error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") - end - end - - l_sock = listen(addr, port) - - lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), printing_kwargs) - - @async begin - while true - let s = accept(l_sock) - @async process_worker_conn(lman, s) - end - end - end - - @async process_pending_connections(lman) - - lman - end -end - -ElasticManager(port) = ElasticManager(;port=port) -ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) -ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) - - -function process_worker_conn(mgr::ElasticManager, s::TCPSocket) - # Socket is the worker's STDOUT - wc = WorkerConfig() - wc.io = s - - # Validate cookie - cookie = read(s, HDR_COOKIE_LEN) - if length(cookie) < HDR_COOKIE_LEN - error("Cookie read failed. Connection closed by peer.") - end - self_cookie = cluster_cookie() - for i in 1:HDR_COOKIE_LEN - if UInt8(self_cookie[i]) != cookie[i] - println(i, " ", self_cookie[i], " ", cookie[i]) - error("Invalid cookie sent by remote worker.") - end - end - - put!(mgr.pending, s) -end - -function process_pending_connections(mgr::ElasticManager) - while true - wait(mgr.pending) - try - addprocs(mgr; topology=mgr.topology) - catch e - showerror(stderr, e) - Base.show_backtrace(stderr, Base.catch_backtrace()) - end - end -end - -function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) - # The workers have already been started. - while isready(mgr.pending) - wc=WorkerConfig() - wc.io = take!(mgr.pending) - push!(launched, wc) - end - - notify(c) -end - -function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :register - mgr.active[id] = config - elseif op == :deregister - delete!(mgr.active, id) - push!(mgr.terminated, id) - end -end - -function Base.show(io::IO, mgr::ElasticManager) - iob = IOBuffer() - - println(iob, "ElasticManager:") - print(iob, " Active workers : [ ") - for id in sort(collect(keys(mgr.active))) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Number of workers to be added : ", Base.n_avail(mgr.pending)) - - print(iob, " Terminated workers : [ ") - for id in sort(collect(mgr.terminated)) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Worker connect command : ") - print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...)) - - print(io, String(take!(iob))) -end - -# Does not return. If executing from a REPL try -# @async connect_to_cluster(.....) -# addr, port that a ElasticManager on the master processes is listening on. -function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true) - c = connect(addr, port) - write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN]) - stdout_to_master && redirect_stdout(c) - start_worker(c, cookie) -end - -function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true) - - ip = string(em.sockname[1]) - port = convert(Int,em.sockname[2]) - cookie = cluster_cookie() - exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia" - project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : () - - join([ - exename, - project..., - "-e 'import HTCondorClusterManager; HTCondorClusterManager.elastic_worker(\"$cookie\",\"$ip\",$port)'" - ]," ") - -end diff --git a/src/qsub.jl b/src/qsub.jl deleted file mode 100644 index 2d61161..0000000 --- a/src/qsub.jl +++ /dev/null @@ -1,139 +0,0 @@ -export PBSManager, SGEManager, QRSHManager, addprocs_pbs, addprocs_sge, addprocs_qrsh - -struct PBSManager <: ClusterManager - np::Integer - queue - wd -end - -struct SGEManager <: ClusterManager - np::Integer - queue - wd -end - -struct QRSHManager <: ClusterManager - np::Integer - queue - wd -end - -function launch(manager::Union{PBSManager, SGEManager, QRSHManager}, - params::Dict, instances_arr::Array, c::Condition) - let - if manager isa PBSManager - mgr_desc = "Portable Batch System (PBS)" - elseif manager isa SGEManager - mgr_desc = "Sun Grid Engine (SGE)" - else - # manager isa QRSHManager - # - # `qrsh` is only available for SGE. - # It is not available for OpenPBS. - mgr_desc = "Sun Grid Engine (SGE)" - end - msg = "The $(mgr_desc) functionality in ClusterManagers.jl is currently not actively maintained. " * - "We are currently looking for a new maintainer. " * - "If you are an active user of the $(mgr_desc) functionality and are interested in becoming the maintainer, " * - "Please open an issue on the JuliaParallel/ClusterManagers.jl repo: " * - "https://github.com/JuliaParallel/ClusterManagers.jl/issues" - Base.depwarn(msg, Symbol(typeof(manager))) - end - try - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - wd = manager.wd - isPBS = isa(manager, PBSManager) - - np = manager.np - queue = manager.queue - - jobname = `julia-$(getpid())` - - if isa(manager, QRSHManager) - cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())` - qrsh_cmd = `qrsh $queue -V -N $jobname -wd $wd -now n "$cmd"` - - stream_proc = [open(qrsh_cmd) for i in 1:np] - - for i in 1:np - config = WorkerConfig() - config.io, io_proc = stream_proc[i] - config.userdata = Dict{Symbol, Any}(:task => i, - :process => io_proc) - push!(instances_arr, config) - notify(c) - end - - else # PBS & SGE - cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())` - qsub_cmd = pipeline(`echo $(Base.shell_escape(cmd))` , (isPBS ? - `qsub -N $jobname -wd $wd -j oe -k o -t 1-$np $queue` : - `qsub -N $jobname -wd $wd -terse -j y -R y -t 1-$np -V $queue`)) - - out = open(qsub_cmd) - if !success(out) - throw(error()) # qsub already gives a message - end - - id = chomp(split(readline(out),'.')[1]) - if endswith(id, "[]") - id = id[1:end-2] - end - - filenames(i) = "$wd/julia-$(getpid()).o$id-$i","$wd/julia-$(getpid())-$i.o$id","$wd/julia-$(getpid()).o$id.$i" - - println("Job $id in queue.") - for i=1:np - # wait for each output stream file to get created - fnames = filenames(i) - j = 0 - while (j=findfirst(x->isfile(x),fnames))==nothing - sleep(1.0) - end - fname = fnames[j] - - # Hack to get Base to get the host:port, the Julia process has already started. - cmd = `tail -f $fname` - - config = WorkerConfig() - - config.io = open(detach(cmd)) - - config.userdata = Dict{Symbol, Any}(:job=>id, :task=>i, :iofile=>fname) - push!(instances_arr, config) - notify(c) - end - println("Running.") - end - - catch e - println("Error launching workers") - println(e) - end -end - -function manage(manager::Union{PBSManager, SGEManager, QRSHManager}, - id::Int64, config::WorkerConfig, op::Symbol) -end - -function kill(manager::Union{PBSManager, SGEManager, QRSHManager}, id::Int64, config::WorkerConfig) - remotecall(exit,id) - close(config.io) - - if isa(manager, QRSHManager) - kill(config.userdata[:process],15) - return - end - - if isfile(config.userdata[:iofile]) - rm(config.userdata[:iofile]) - end -end - -addprocs_pbs(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(PBSManager(np, qsub_flags, wd); kwargs...) - -addprocs_sge(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(SGEManager(np, qsub_flags, wd); kwargs...) - -addprocs_qrsh(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(QRSHManager(np, qsub_flags, wd); kwargs...) diff --git a/src/scyld.jl b/src/scyld.jl deleted file mode 100644 index 4259158..0000000 --- a/src/scyld.jl +++ /dev/null @@ -1,63 +0,0 @@ -export addprocs_scyld, ScyldManager - -struct ScyldManager <: ClusterManager - np::Integer -end - -function launch(manager::ScyldManager, params::Dict, instances_arr::Array, c::Condition) - let - manager_description = "Scyld ClusterWare" - msg = "The $(mgr_desc) functionality in ClusterManagers.jl is currently not actively maintained. " * - "We are currently looking for a new maintainer. " * - "If you are an active user of the $(mgr_desc) functionality and are interested in becoming the maintainer, " * - "Please open an issue on the JuliaParallel/ClusterManagers.jl repo: " * - "https://github.com/JuliaParallel/ClusterManagers.jl/issues" - Base.depwarn(msg, Symbol(typeof(manager))) - end - try - dir = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - np = manager.np - - beomap_cmd = `bpsh -1 beomap --no-local --np $np` - out,beomap_proc = open(beomap_cmd) - wait(beomap_proc) - if !success(beomap_proc) - error("node availability inaccessible (could not run beomap)") - end - nodes = split(chomp(readline(out)),':') - for (i,node) in enumerate(nodes) - cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())` - cmd = detach(`bpsh $node sh -l -c $(Base.shell_escape(cmd))`) - config = WorkerConfig() - - config.io,_ = open(cmd) - config.io.line_buffered = true - config.userdata = Dict{Symbol, Any}() - config.userdata[:node] = node - - push!(instances_arr, config) - notify(c) - end - catch e - println("Error launching beomap") - println(e) - end -end - -function manage(manager::ScyldManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :interrupt - if !isnull(config.ospid) - node = config.userdata[:node] - if !success(`bpsh $node kill -2 $(get(config.ospid))`) - println("Error sending Ctrl-C to julia worker $id on node $node") - end - else - # This state can happen immediately after an addprocs - println("Worker $id cannot be presently interrupted.") - end - end -end - -addprocs_scyld(np::Integer) = addprocs(ScyldManager(np)) diff --git a/src/slurm.jl b/src/slurm.jl deleted file mode 100644 index 009448b..0000000 --- a/src/slurm.jl +++ /dev/null @@ -1,180 +0,0 @@ -# ClusterManager for Slurm - -export SlurmManager, addprocs_slurm - -import Logging.@warn - -struct SlurmManager <: ClusterManager - np::Integer - retry_delays -end - -struct SlurmException <: Exception - msg -end - -function launch(manager::SlurmManager, params::Dict, instances_arr::Array, - c::Condition) - let - msg = "The Slurm functionality in the `ClusterManagers.jl` package is deprecated " * - "(including `ClusterManagers.addprocs_slurm` and `ClusterManagers.SlurmManager`). " * - "It will be removed from ClusterManagers.jl in a future release. " * - "We recommend migrating to the " * - "[https://github.com/JuliaParallel/SlurmClusterManager.jl](https://github.com/JuliaParallel/SlurmClusterManager.jl) " * - "package instead." - Base.depwarn(msg, :SlurmManager; force = true) - end - try - exehome = params[:dir] - exename = params[:exename] - exeflags = params[:exeflags] - - stdkeys = keys(Distributed.default_addprocs_params()) - - p = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params) - - srunargs = [] - for k in keys(p) - if length(string(k)) == 1 - push!(srunargs, "-$k") - val = p[k] - if length(val) > 0 - push!(srunargs, "$(p[k])") - end - else - k2 = replace(string(k), "_"=>"-") - val = p[k] - if length(val) > 0 - push!(srunargs, "--$(k2)=$(p[k])") - else - push!(srunargs, "--$(k2)") - end - end - end - - # Get job file location from parameter dictionary. - job_file_loc = joinpath(exehome, get(params, :job_file_loc, ".")) - - # Make directory if not already made. - if !isdir(job_file_loc) - mkdir(job_file_loc) - end - - # Check for given output file name - jobname = "julia-$(getpid())" - has_output_name = ("-o" in srunargs) | ("--output" in srunargs) - if has_output_name - loc = findfirst(x-> x == "-o" || x == "--output", srunargs) - job_output_name = srunargs[loc+1] - job_output_template = joinpath(job_file_loc, job_output_name) - srunargs[loc+1] = job_output_template - else - job_output_name = "$(jobname)-$(trunc(Int, Base.time() * 10))" - make_job_output_path(task_num) = joinpath(job_file_loc, "$(job_output_name)-$(task_num).out") - job_output_template = make_job_output_path("%4t") - push!(srunargs, "-o", job_output_template) - end - - np = manager.np - srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())` - - @info "Starting SLURM job $jobname: $srun_cmd" - srun_proc = open(srun_cmd) - - slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})" - could_not_connect_regex = r"could not connect" - exiting_regex = r"exiting." - retry_delays = manager.retry_delays - - t_start = time() - t_waited = round(Int, time() - t_start) - for i = 0:np - 1 - slurm_spec_match::Union{RegexMatch,Nothing} = nothing - worker_errors = String[] - if has_output_name - fn = job_output_template - else - fn = make_job_output_path(lpad(i, 4, "0")) - end - for retry_delay in push!(collect(retry_delays), 0) - t_waited = round(Int, time() - t_start) - - # Wait for output log to be created and populated, then parse - - if isfile(fn) - if filesize(fn) > 0 - open(fn) do f - # Due to error and warning messages, the specification - # may not appear on the file's first line - for line in eachline(f) - re_match = match(slurm_spec_regex, line) - if !isnothing(re_match) - slurm_spec_match = re_match - end - for expr in [could_not_connect_regex, exiting_regex] - if !isnothing(match(expr, line)) - slurm_spec_match = nothing - push!(worker_errors, line) - end - end - end - end - end - if !isempty(worker_errors) || !isnothing(slurm_spec_match) - break # break if error or specification found - else - @info "Worker $i (after $t_waited s): Output file found, but no connection details yet" - end - else - @info "Worker $i (after $t_waited s): No output file \"$fn\" yet" - end - - # Sleep for some time to limit resource usage while waiting for the job to start - sleep(retry_delay) - end - - if !isempty(worker_errors) - throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))")) - elseif isnothing(slurm_spec_match) - throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready.")) - end - - config = WorkerConfig() - config.port = parse(Int, slurm_spec_match[2]) - config.host = strip(slurm_spec_match[3]) - @info "Worker $i ready after $t_waited s on host $(config.host), port $(config.port)" - # Keep a reference to the proc, so it's properly closed once - # the last worker exits. - config.userdata = srun_proc - push!(instances_arr, config) - notify(c) - end - catch e - @error "Error launching Slurm job" - rethrow(e) - end -end - -function manage(manager::SlurmManager, id::Integer, config::WorkerConfig, - op::Symbol) - # This function needs to exist, but so far we don't do anything -end - -SlurmManager(np::Integer) = SlurmManager(np, ExponentialBackOff(n=10, first_delay=1, - max_delay=512, factor=2)) - -""" -Launch `np` workers on a cluster managed by slurm. `retry_delays` is a vector of -numbers specifying in seconds how long to repeatedly wait for a worker to start. -Defaults to an exponential backoff. - -# Examples - -``` -addprocs_slurm(100; retry_delays=Iterators.repeated(0.1)) -``` -""" -addprocs_slurm(np::Integer; - retry_delays=ExponentialBackOff(n=10, first_delay=1, - max_delay=512, factor=2), - kwargs...) = addprocs(SlurmManager(np, retry_delays); kwargs...) diff --git a/test/elastic.jl b/test/elastic.jl deleted file mode 100644 index 183dbcb..0000000 --- a/test/elastic.jl +++ /dev/null @@ -1,25 +0,0 @@ -@testset "ElasticManager" begin - TIMEOUT = 10. - - em = ElasticManager(addr=:auto, port=0) - - # launch worker - run(`sh -c $(HTCondorClusterManager.get_connect_cmd(em))`, wait=false) - - # wait at most TIMEOUT seconds for it to connect - @test :ok == timedwait(TIMEOUT) do - length(em.active) == 1 - end - - wait(rmprocs(workers())) - - @testset "show(io, ::ElasticManager)" begin - str = sprint(show, em) - lines = strip.(split(strip(str), '\n')) - @test lines[1] == "ElasticManager:" - @test lines[2] == "Active workers : []" - @test lines[3] == "Number of workers to be added : 0" - @test lines[4] == "Terminated workers : [ 2]" - @test lines[5] == "Worker connect command :" - end -end diff --git a/test/runtests.jl b/test/runtests.jl index b0093e9..4d23b2d 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -9,47 +9,8 @@ using Distributed: workers, nworkers using Distributed: procs, nprocs using Distributed: remotecall_fetch, @spawnat using Test: @testset, @test, @test_skip -# ElasticManager: -using HTCondorClusterManager: ElasticManager -# Slurm: -using HTCondorClusterManager: addprocs_slurm, SlurmManager -# SGE: -using HTCondorClusterManager: addprocs_sge, SGEManager - -const test_args = lowercase.(strip.(ARGS)) - -@info "" test_args - -slurm_is_installed() = !isnothing(Sys.which("sbatch")) -qsub_is_installed() = !isnothing(Sys.which("qsub")) @testset "HTCondorClusterManager.jl" begin - include("elastic.jl") - - if slurm_is_installed() - @info "Running the Slurm tests..." Sys.which("sbatch") - include("slurm.jl") - else - if "slurm" in test_args - @error "ERROR: The Slurm tests were explicitly requested in ARGS, but sbatch was not found, so the Slurm tests cannot be run" Sys.which("sbatch") test_args - @test false - else - @warn "sbatch was not found - Slurm tests will be skipped" Sys.which("sbatch") - @test_skip false - end - end - - if qsub_is_installed() - @info "Running the SGE (via qsub) tests..." Sys.which("qsub") - include("sge_qsub.jl") - else - if "sge_qsub" in test_args - @error "ERROR: The SGE tests were explicitly requested in ARGS, but qsub was not found, so the SGE tests cannot be run" Sys.which("qsub") test_args - @test false - else - @warn "qsub was not found - SGE tests will be skipped" Sys.which("qsub") - @test_skip false - end - end - + @warn "The HTCondorClusterManager.jl package currently does not have any tests" + @test_skip false end # @testset diff --git a/test/sge_qsub.jl b/test/sge_qsub.jl deleted file mode 100644 index 9fdda5e..0000000 --- a/test/sge_qsub.jl +++ /dev/null @@ -1,10 +0,0 @@ -@testset "SGEManager (addprocs_sge via qsub)" begin - p = addprocs_sge(1, queue=``) - @test nprocs() == 2 - @test workers() == p - @test fetch(@spawnat :any myid()) == p[1] - @test remotecall_fetch(+,p[1],1,1) == 2 - rmprocs(p) - @test nprocs() == 1 - @test workers() == [1] -end diff --git a/test/slurm.jl b/test/slurm.jl deleted file mode 100644 index 6e5928d..0000000 --- a/test/slurm.jl +++ /dev/null @@ -1,29 +0,0 @@ -@testset "Slurm" begin - mktempdir() do tmpdir - cd(tmpdir) do - outfile = joinpath(tmpdir, "my_slurm_job.out") - p = addprocs_slurm(1; o=outfile) - @test nprocs() == 2 - @test workers() == p - @test fetch(@spawnat :any myid()) == p[1] - @test remotecall_fetch(+,p[1],1,1) == 2 - rmprocs(p) - @test nprocs() == 1 - @test workers() == [1] - - # Check that `outfile` exists: - @test isfile(outfile) - # Check that `outfile` is not empty: - outfile_contents = read(outfile, String) - @test length(strip(outfile_contents)) > 5 - - println(Base.stderr, "# BEGIN: contents of my_slurm_job.out") - println(Base.stderr, outfile_contents) - println(Base.stderr, "# END: contents of my_slurm_job.out") - - # No need to manually delete the `outfile` file. - # The entire `tmpdir` will automatically be removed when the `mktempdir() do ...` block ends. - # rm(outfile) - end - end -end