Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 0.4.0 #4

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## 0.4.0

- Update supervisor status at process termination [#3](https://github.com/cardo-org/Visor.jl/issues/3)

## 0.3.0

- Setup supervisors settings with `setsupervisor` and `setroot` functions.
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Visor"
uuid = "cf786855-3531-4b86-ba6e-3e33dce7dcdb"
authors = ["Attilio Donà"]
version = "0.3.0"
version = "0.4.0"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
2 changes: 1 addition & 1 deletion docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function producer(td)
end
end

# Tasks are started following the list order and are shutted down in reverse order:
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
process(consumer)
Expand Down
2 changes: 1 addition & 1 deletion examples/producer_consumer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function producer(self)
end
end

# Tasks are started following the list order and are shutted down in reverse order:
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
process(consumer)
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher_tcp.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ function producer(self)
end
end

# Tasks are started following the list order and are shutted down in reverse order:
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
process(publisher; debounce_time=1)
Expand Down
125 changes: 79 additions & 46 deletions src/Visor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
export call
export cast
export from
export getphase
export ifrestart
export process
export hassupervised
export isprocstarted
export isrequest
Expand All @@ -34,6 +34,7 @@
export procs
export receive
export reply
export setphase
export shutdown
export startup
export supervise
Expand All @@ -52,7 +53,7 @@
request::Any
end

Base.show(io::IO, message::Request) = print(io, "$(message.request)")
Base.show(io::IO, message::Request) = show(io, message.request)

Base.@kwdef struct Shutdown <: Command
reset::Bool = true
Expand All @@ -78,6 +79,7 @@
mutable struct Supervisor <: Supervised
id::String
status::SupervisedStatus
phase::Symbol
processes::OrderedDict{String,Supervised}
intensity::Int
period::Int
Expand All @@ -98,7 +100,16 @@
evhandler=nothing,
)
return new(
id, idle, processes, intensity, period, strategy, terminateif, evhandler, []
id,
idle,
:undef,
processes,
intensity,
period,
strategy,
terminateif,
evhandler,
[],
)
end
function Supervisor(
Expand All @@ -114,6 +125,7 @@
return new(
id,
idle,
:undef,
processes,
intensity,
period,
Expand All @@ -131,6 +143,7 @@
mutable struct Process <: Supervised
id::String
status::SupervisedStatus
phase::Symbol
fn::Function
args::Tuple
namedargs::NamedTuple
Expand Down Expand Up @@ -161,6 +174,7 @@
new(
id,
idle,
:undef,
fn,
args,
namedargs,
Expand All @@ -179,6 +193,10 @@
end
end

setphase(node::Supervised, phase::Symbol) = node.phase = phase

getphase(node::Supervised) = node.phase

function __init__()
@async wait_signal(__ROOT__)
end
Expand All @@ -189,7 +207,7 @@
hold(process::Process) = process.onhold = true
hold(::Supervisor) = nothing

Base.show(io::IO, process::Supervised) = print(io, "$(process.id)")
Base.show(io::IO, process::Supervised) = print(io, process.id)

function dump(node::Supervisor)
children = [
Expand Down Expand Up @@ -303,7 +321,7 @@
process(id, fn;
args=(),
namedargs=(;),
force_interrupt_after::Real=0,
force_interrupt_after::Real=1.0,
stop_waiting_after::Real=Inf,
debounce_time=NaN,
thread=false,
Expand Down Expand Up @@ -414,6 +432,9 @@
id::String
end

"Trigger a supervisor resync"
struct SupervisorResync end

## include("supervisor.jl")
# Returns the list of running nodes supervised by `supervisor` (processes and supervisors direct children).
function running_nodes(supervisor)
Expand Down Expand Up @@ -470,7 +491,7 @@
end

function startchain(proc)
if isdefined(proc, :supervisor) &&
if isdefined(proc, :supervisor) &&
(!isdefined(proc.supervisor, :task) || istaskdone(proc.supervisor.task))
startchain(proc.supervisor)
else
Expand All @@ -493,7 +514,12 @@
if !isnan(process.debounce_time)
sleep(process.debounce_time)
end
if supervisor.strategy === :one_for_one

if process.status === idle
# a shutdown was issued, terminate the restarts
@debug "[$process]: honore the shutdown request"
delete!(supervisor.processes, process.id)
elseif supervisor.strategy === :one_for_one
process.task = start(process)
elseif supervisor.strategy === :one_for_all
# If a child process terminates, all other child processes are terminated,
Expand Down Expand Up @@ -535,7 +561,7 @@

Start the supervised process defined by `proc` as child of `supervisor`.

```jldoctest

Check failure on line 564 in src/Visor.jl

View workflow job for this annotation

GitHub Actions / Documentation

doctest failure in ~/work/Visor.jl/Visor.jl/src/Visor.jl:564-573 ```jldoctest julia> using Visor julia> foo(self) = println("foo process started"); julia> main(self) = startup(self.supervisor, process(foo)); julia> supervise([process(main)]); foo process started ``` Subexpression: supervise([process(main)]); Evaluated output: ERROR: supervise already active Stacktrace: [1] supervise(processes::Vector{Visor.Process}; intensity::Int64, period::Int64, strategy::Symbol, terminateif::Symbol, handler::Nothing, wait::Bool) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1249 [2] supervise(processes::Vector{Visor.Process}) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1230 [3] top-level scope @ none:1 Expected output: foo process started diff = Warning: Diff output requires color. foo process startedERROR: supervise already active Stacktrace: [1] supervise(processes::Vector{Visor.Process}; intensity::Int64, period::Int64, strategy::Symbol, terminateif::Symbol, handler::Nothing, wait::Bool) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1249 [2] supervise(processes::Vector{Visor.Process}) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1230 [3] top-level scope @ none:1

Check failure on line 564 in src/Visor.jl

View workflow job for this annotation

GitHub Actions / Documentation

doctest failure in ~/work/Visor.jl/Visor.jl/src/Visor.jl:564-573 ```jldoctest julia> using Visor julia> foo(self) = println("foo process started"); julia> main(self) = startup(self.supervisor, process(foo)); julia> supervise([process(main)]); foo process started ``` Subexpression: supervise([process(main)]); Evaluated output: ERROR: supervise already active Stacktrace: [1] supervise(processes::Vector{Visor.Process}; intensity::Int64, period::Int64, strategy::Symbol, terminateif::Symbol, handler::Nothing, wait::Bool) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1249 [2] supervise(processes::Vector{Visor.Process}) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1230 [3] top-level scope @ none:1 Expected output: foo process started diff = Warning: Diff output requires color. foo process startedERROR: supervise already active Stacktrace: [1] supervise(processes::Vector{Visor.Process}; intensity::Int64, period::Int64, strategy::Symbol, terminateif::Symbol, handler::Nothing, wait::Bool) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1249 [2] supervise(processes::Vector{Visor.Process}) @ Visor ~/work/Visor.jl/Visor.jl/src/Visor.jl:1230 [3] top-level scope @ none:1
julia> using Visor

julia> foo(self) = println("foo process started");
Expand Down Expand Up @@ -637,7 +663,7 @@
@debug "[$p] skipping shutdown: task already done"
else
# shutdown is sequential because in case a node refuses
# to shutdown remaining nodes aren't shutted down.
# to shutdown remaining nodes aren't shut down.
shutdown(p, reset)
end
end
Expand Down Expand Up @@ -708,29 +734,44 @@
return res
end

"""
resync(supervisor)

Restart processes previously stopped by supervisor policies.

Return true if all supervised processes terminated.
"""
function resync(supervisor)
if !isempty(supervisor.restarts)
@debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))"
# check all required processes are terminated
if all(proc -> proc.status !== running, supervisor.restarts)
@debug "[$supervisor] restarting processes"
restart_processes(supervisor, supervisor.restarts)
end
end

@debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)"
if supervisor.terminateif === :empty && isalldone(supervisor)
return true
end
return false
end

# Supervisor main loop.
function manage(supervisor)
@debug "[$supervisor] start supervisor event loop"
try
for msg in supervisor.inbox
trace(supervisor, msg)
@debug "[$supervisor] recv: $msg"
if isa(msg, Shutdown)
supervisor_shutdown(supervisor, nothing, msg.reset)
break
elseif isa(msg, ProcessReturn)
@debug "[$supervisor]: process [$(msg.process)] normal termination"
normal_return(supervisor, msg.process)
elseif isa(msg, ProcessInterrupted)
@debug "[$supervisor]: process [$(msg.process)] forcibly interrupted"
exitby_forced_shutdown(supervisor, msg.process)
elseif isa(msg, ProcessError)
@debug "[$supervisor]: applying restart policy for [$(msg.process)] ($(Int.(floor.(msg.process.startstamps))))"
msg.process.status = failed
exitby_exception(supervisor, msg.process)
elseif isa(msg, SupervisorResync)
# do nothing here, just a resync() is needed
elseif isa(msg, ProcessFatal)
@async evhandler(msg.process, msg)
@debug "[$supervisor] manage process fatal: delete [$(msg.process)]"
@debug "[$supervisor] manage process fatal: process done [$(msg.process)]"
msg.process.status = done
elseif isa(msg, Supervised)
add_node(supervisor, msg)
Expand All @@ -749,24 +790,13 @@
put!(msg.inbox, ErrorException("unknown message [$(msg.request)]"))
end
catch e
#showerror(stdout, e, catch_backtrace())
put!(msg.inbox, e)
end
else
unknown_message(supervisor, msg)
end

if !isempty(supervisor.restarts)
@debug "[$supervisor] to be restarted: $(format4print(supervisor.restarts))"
# check all required processes are terminated
if all(proc -> proc.status !== running, supervisor.restarts)
@debug "[$supervisor] restarting processes"
restart_processes(supervisor, supervisor.restarts)
end
end

@debug "[$supervisor] procs:[$(format4print(supervisor.processes))], terminateif $(supervisor.terminateif)"
if supervisor.terminateif === :empty && isalldone(supervisor)
if resync(supervisor)
break
end
end
Expand All @@ -778,7 +808,7 @@
while isready(supervisor.inbox)
msg = take!(supervisor.inbox)
if isrequest(msg)
put!(msg.inbox, ErrorException("[$(msg.request)]: supervisor shutted down"))
put!(msg.inbox, ErrorException("[$(msg.request)]: supervisor shut down"))
else
@debug "[$supervisor]: skipped msg: $msg"
end
Expand Down Expand Up @@ -1290,39 +1320,42 @@
function wait_child(supervisor::Supervisor, process::Process)
try
wait(process.task)
put!(supervisor.inbox, ProcessReturn(process))
normal_return(supervisor, process)
trace(supervisor, ProcessReturn(process))
catch e
taskerr = e.task.exception
process.status = failed
if isa(taskerr, ProcessInterrupt)
@debug "[$process] exit on exception: $taskerr"
put!(supervisor.inbox, ProcessInterrupted(process))
# elseif isa(taskerr, MethodError)
# @warn "[$process]: task failed: $taskerr"
# put!(supervisor.inbox, ProcessFatal(process))
@debug "[$supervisor]: process [$process] forcibly interrupted"
exitby_forced_shutdown(supervisor, process)
trace(supervisor, ProcessInterrupted(process))
else
@debug "[$process] exception: $taskerr"
evhandler(process, taskerr)
#showerror(stdout, e, catch_backtrace())
@debug "[$supervisor]: applying restart policy for [$process] ($(Int.(floor.(process.startstamps))))"
exitby_exception(supervisor, process)
trace(supervisor, ProcessInterrupted(process))

if isa(taskerr, Exception)
put!(supervisor.inbox, ProcessError(process, taskerr))
trace(supervisor, ProcessError(process, taskerr))
else
put!(
supervisor.inbox,
ProcessError(process, SystemError("process exception")),
)
trace(supervisor, ProcessError(process, SystemError("process exception")))
end
end
finally
if process.restart === :temporary
@debug "removing temporary process $process"
delete!(supervisor.processes, process.id)
end
put!(supervisor.inbox, SupervisorResync())
end
end

function wait_child(supervisor::Supervisor, process::Supervisor)
wait(process.task)
return put!(supervisor.inbox, ProcessReturn(process))
normal_return(supervisor, process)
put!(supervisor.inbox, SupervisorResync())
return nothing
end

function evhandler(process, event)
Expand Down
Loading