Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ElasticManager #203

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions src/elastic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ struct ElasticManager <: ClusterManager
terminated::Set{Int} # terminated worker ids
topology::Symbol
sockname
manage_callback
printing_kwargs

function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
function ElasticManager(;
addr=IPv4("127.0.0.1"), port=9009, cookie=nothing,
topology=:all_to_all, manage_callback=elastic_no_op_callback, printing_kwargs=()
)
Distributed.init_multi()
cookie !== nothing && cluster_cookie(cookie)

Expand All @@ -29,7 +33,7 @@ struct ElasticManager <: ClusterManager

l_sock = listen(addr, port)

lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), printing_kwargs)
lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), manage_callback, printing_kwargs)

@async begin
while true
Expand All @@ -49,8 +53,10 @@ ElasticManager(port) = ElasticManager(;port=port)
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)

elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing

function process_worker_conn(mgr::ElasticManager, s::TCPSocket)
@debug "ElasticManager got new worker connection"
# Socket is the worker's STDOUT
wc = WorkerConfig()
wc.io = s
Expand Down Expand Up @@ -86,6 +92,7 @@ end
function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition)
# The workers have already been started.
while isready(mgr.pending)
@debug "ElasticManager.launch new worker"
wc=WorkerConfig()
wc.io = take!(mgr.pending)
push!(launched, wc)
Expand All @@ -96,8 +103,12 @@ end

function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :register
@debug "ElasticManager registering process id $id"
mgr.active[id] = config
mgr.manage_callback(mgr, id, op)
elseif op == :deregister
@debug "ElasticManager deregistering process id $id"
mgr.manage_callback(mgr, id, op)
delete!(mgr.active, id)
push!(mgr.terminated, id)
end
Expand Down Expand Up @@ -130,27 +141,46 @@ function Base.show(io::IO, mgr::ElasticManager)
end

# Does not return. If executing from a REPL try
# @async connect_to_cluster(.....)
# @async elastic_worker(.....)
# addr, port that a ElasticManager on the master processes is listening on.
function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true)
function elastic_worker(
cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009;
stdout_to_master::Bool = true,
Base.@nospecialize(env::AbstractVector = [],)
)
@debug "ElasticManager.elastic_worker(cookie, $addr, $port; stdout_to_master=$stdout_to_master, env=$env)"
for (k, v) in env
ENV[k] = v
end

c = connect(addr, port)
write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN])
stdout_to_master && redirect_stdout(c)
start_worker(c, cookie)
end

function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true)

function get_connect_cmd(
em::ElasticManager;
absolute_exename=true, same_project=true,
@nospecialize(env::AbstractDict{<:AbstractString,<:AbstractString} = Dict{String,String}())
)
env_withdefaults = Dict{String,String}()
haskey(ENV, "JULIA_WORKER_TIMEOUT") && (env_withdefaults["JULIA_WORKER_TIMEOUT"] = ENV["JULIA_WORKER_TIMEOUT"])
env_withdefaults["JULIA_REVISE"] = "off"
merge!(env_withdefaults, env)
env_vec = isempty(env_withdefaults) ? [] : collect(env_withdefaults)

ip = string(em.sockname[1])
port = convert(Int,em.sockname[2])
cookie = cluster_cookie()
exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia"
project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : ()


join([
exename,
project...,
"-e 'using ClusterManagers; ClusterManagers.elastic_worker(\"$cookie\",\"$ip\",$port)'"
"-e 'using ClusterManagers; ClusterManagers.elastic_worker(\"$cookie\",\"$ip\",$port;env=$(string(env_vec)))'"
]," ")

end
Loading