diff --git a/CHANGELOG.md b/CHANGELOG.md index 60397b9..aab7d22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Change Log +## 0.4.0 + +- Update supervisor status at process termination [#3](https://github.com/cardo-org/Visor.jl/issues/3) + ## 0.3.0 - Setup supervisors settings with `setsupervisor` and `setroot` functions. diff --git a/Project.toml b/Project.toml index d5b5dcc..7c2d2dc 100755 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Visor" uuid = "cf786855-3531-4b86-ba6e-3e33dce7dcdb" authors = ["Attilio DonĂ "] -version = "0.3.0" +version = "0.4.0" [deps] DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" diff --git a/docs/src/index.md b/docs/src/index.md index 2677af1..d1372e0 100755 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -175,7 +175,7 @@ function producer(td) end end -# Tasks are started following the list order and are shutted down in reverse order: +# Tasks are started following the list order and are shut down in reverse order: # producer is started last and terminated first. tasks = [ process(consumer) diff --git a/examples/producer_consumer.jl b/examples/producer_consumer.jl index c87b706..4f0fb8c 100755 --- a/examples/producer_consumer.jl +++ b/examples/producer_consumer.jl @@ -38,7 +38,7 @@ function producer(self) end end -# Tasks are started following the list order and are shutted down in reverse order: +# Tasks are started following the list order and are shut down in reverse order: # producer is started last and terminated first. tasks = [ process(consumer) diff --git a/examples/publisher_tcp.jl b/examples/publisher_tcp.jl index 9f1764c..914dfef 100755 --- a/examples/publisher_tcp.jl +++ b/examples/publisher_tcp.jl @@ -53,7 +53,7 @@ function producer(self) end end -# Tasks are started following the list order and are shutted down in reverse order: +# Tasks are started following the list order and are shut down in reverse order: # producer is started last and terminated first. tasks = [ process(publisher; debounce_time=1) diff --git a/src/Visor.jl b/src/Visor.jl index 57e2799..fb994d5 100755 --- a/src/Visor.jl +++ b/src/Visor.jl @@ -23,8 +23,8 @@ export application export call export cast export from +export getphase export ifrestart -export process export hassupervised export isprocstarted export isrequest @@ -34,6 +34,7 @@ export process export procs export receive export reply +export setphase export shutdown export startup export supervise @@ -52,7 +53,7 @@ struct Request <: Message request::Any end -Base.show(io::IO, message::Request) = print(io, "$(message.request)") +Base.show(io::IO, message::Request) = show(io, message.request) Base.@kwdef struct Shutdown <: Command reset::Bool = true @@ -78,6 +79,7 @@ abstract type Supervised end mutable struct Supervisor <: Supervised id::String status::SupervisedStatus + phase::Symbol processes::OrderedDict{String,Supervised} intensity::Int period::Int @@ -98,7 +100,16 @@ mutable struct Supervisor <: Supervised evhandler=nothing, ) return new( - id, idle, processes, intensity, period, strategy, terminateif, evhandler, [] + id, + idle, + :undef, + processes, + intensity, + period, + strategy, + terminateif, + evhandler, + [], ) end function Supervisor( @@ -114,6 +125,7 @@ mutable struct Supervisor <: Supervised return new( id, idle, + :undef, processes, intensity, period, @@ -131,6 +143,7 @@ nproc(process::Supervisor) = length(process.processes) mutable struct Process <: Supervised id::String status::SupervisedStatus + phase::Symbol fn::Function args::Tuple namedargs::NamedTuple @@ -161,6 +174,7 @@ mutable struct Process <: Supervised new( id, idle, + :undef, fn, args, namedargs, @@ -179,6 +193,10 @@ mutable struct Process <: Supervised end end +setphase(node::Supervised, phase::Symbol) = node.phase = phase + +getphase(node::Supervised) = node.phase + function __init__() @async wait_signal(__ROOT__) end @@ -189,7 +207,7 @@ clear_hold(::Supervisor) = nothing hold(process::Process) = process.onhold = true hold(::Supervisor) = nothing -Base.show(io::IO, process::Supervised) = print(io, "$(process.id)") +Base.show(io::IO, process::Supervised) = print(io, process.id) function dump(node::Supervisor) children = [ @@ -303,7 +321,7 @@ end process(id, fn; args=(), namedargs=(;), - force_interrupt_after::Real=0, + force_interrupt_after::Real=1.0, stop_waiting_after::Real=Inf, debounce_time=NaN, thread=false, @@ -414,6 +432,9 @@ struct ProcessInterrupt <: Exception id::String end +"Trigger a supervisor resync" +struct SupervisorResync end + ## include("supervisor.jl") # Returns the list of running nodes supervised by `supervisor` (processes and supervisors direct children). function running_nodes(supervisor) @@ -470,7 +491,7 @@ function start(sv::Supervisor) end function startchain(proc) - if isdefined(proc, :supervisor) && + if isdefined(proc, :supervisor) && (!isdefined(proc.supervisor, :task) || istaskdone(proc.supervisor.task)) startchain(proc.supervisor) else @@ -493,7 +514,12 @@ function restart_policy(supervisor, process) if !isnan(process.debounce_time) sleep(process.debounce_time) end - if supervisor.strategy === :one_for_one + + if process.status === idle + # a shutdown was issued, terminate the restarts + @debug "[$process]: honore the shutdown request" + delete!(supervisor.processes, process.id) + elseif supervisor.strategy === :one_for_one process.task = start(process) elseif supervisor.strategy === :one_for_all # If a child process terminates, all other child processes are terminated, @@ -637,7 +663,7 @@ function supervisor_shutdown( @debug "[$p] skipping shutdown: task already done" else # shutdown is sequential because in case a node refuses - # to shutdown remaining nodes aren't shutted down. + # to shutdown remaining nodes aren't shut down. shutdown(p, reset) end end @@ -708,29 +734,44 @@ function isalldone(supervisor) return res end +""" + resync(supervisor) + +Restart processes previously stopped by supervisor policies. + +Return true if all supervised processes terminated. +""" +function resync(supervisor) + if !isempty(supervisor.restarts) + @debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))" + # check all required processes are terminated + if all(proc -> proc.status !== running, supervisor.restarts) + @debug "[$supervisor] restarting processes" + restart_processes(supervisor, supervisor.restarts) + end + end + + @debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)" + if supervisor.terminateif === :empty && isalldone(supervisor) + return true + end + return false +end + # Supervisor main loop. function manage(supervisor) @debug "[$supervisor] start supervisor event loop" try for msg in supervisor.inbox - trace(supervisor, msg) @debug "[$supervisor] recv: $msg" if isa(msg, Shutdown) supervisor_shutdown(supervisor, nothing, msg.reset) break - elseif isa(msg, ProcessReturn) - @debug "[$supervisor]: process [$(msg.process)] normal termination" - normal_return(supervisor, msg.process) - elseif isa(msg, ProcessInterrupted) - @debug "[$supervisor]: process [$(msg.process)] forcibly interrupted" - exitby_forced_shutdown(supervisor, msg.process) - elseif isa(msg, ProcessError) - @debug "[$supervisor]: applying restart policy for [$(msg.process)] ($(Int.(floor.(msg.process.startstamps))))" - msg.process.status = failed - exitby_exception(supervisor, msg.process) + elseif isa(msg, SupervisorResync) + # do nothing here, just a resync() is needed elseif isa(msg, ProcessFatal) @async evhandler(msg.process, msg) - @debug "[$supervisor] manage process fatal: delete [$(msg.process)]" + @debug "[$supervisor] manage process fatal: process done [$(msg.process)]" msg.process.status = done elseif isa(msg, Supervised) add_node(supervisor, msg) @@ -749,24 +790,13 @@ function manage(supervisor) put!(msg.inbox, ErrorException("unknown message [$(msg.request)]")) end catch e - #showerror(stdout, e, catch_backtrace()) put!(msg.inbox, e) end else unknown_message(supervisor, msg) end - if !isempty(supervisor.restarts) - @debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))" - # check all required processes are terminated - if all(proc -> proc.status !== running, supervisor.restarts) - @debug "[$supervisor] restarting processes" - restart_processes(supervisor, supervisor.restarts) - end - end - - @debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)" - if supervisor.terminateif === :empty && isalldone(supervisor) + if resync(supervisor) break end end @@ -778,7 +808,7 @@ function manage(supervisor) while isready(supervisor.inbox) msg = take!(supervisor.inbox) if isrequest(msg) - put!(msg.inbox, ErrorException("[$(msg.request)]: supervisor shutted down")) + put!(msg.inbox, ErrorException("[$(msg.request)]: supervisor shut down")) else @debug "[$supervisor]: skipped msg: $msg" end @@ -1290,26 +1320,26 @@ end function wait_child(supervisor::Supervisor, process::Process) try wait(process.task) - put!(supervisor.inbox, ProcessReturn(process)) + normal_return(supervisor, process) + trace(supervisor, ProcessReturn(process)) catch e taskerr = e.task.exception + process.status = failed if isa(taskerr, ProcessInterrupt) - @debug "[$process] exit on exception: $taskerr" - put!(supervisor.inbox, ProcessInterrupted(process)) - # elseif isa(taskerr, MethodError) - # @warn "[$process]: task failed: $taskerr" - # put!(supervisor.inbox, ProcessFatal(process)) + @debug "[$supervisor]: process [$process] forcibly interrupted" + exitby_forced_shutdown(supervisor, process) + trace(supervisor, ProcessInterrupted(process)) else @debug "[$process] exception: $taskerr" evhandler(process, taskerr) - #showerror(stdout, e, catch_backtrace()) + @debug "[$supervisor]: applying restart policy for [$process] ($(Int.(floor.(process.startstamps))))" + exitby_exception(supervisor, process) + trace(supervisor, ProcessInterrupted(process)) + if isa(taskerr, Exception) - put!(supervisor.inbox, ProcessError(process, taskerr)) + trace(supervisor, ProcessError(process, taskerr)) else - put!( - supervisor.inbox, - ProcessError(process, SystemError("process exception")), - ) + trace(supervisor, ProcessError(process, SystemError("process exception"))) end end finally @@ -1317,12 +1347,15 @@ function wait_child(supervisor::Supervisor, process::Process) @debug "removing temporary process $process" delete!(supervisor.processes, process.id) end + put!(supervisor.inbox, SupervisorResync()) end end function wait_child(supervisor::Supervisor, process::Supervisor) wait(process.task) - return put!(supervisor.inbox, ProcessReturn(process)) + normal_return(supervisor, process) + put!(supervisor.inbox, SupervisorResync()) + return nothing end function evhandler(process, event)