Skip to content

Commit 0625b3a

Browse files
kpamnanyvtjnashnickrobinson251
authored
Add Experimental.wait_with_timeout (#57148)
Remove the `timeout` parameter to `wait(::Condition)` added in #56974 and add this logic to `Experimental.wait_with_timeout`. --------- Co-authored-by: Jameson Nash <vtjnash@gmail.com> Co-authored-by: Nick Robinson <npr251@gmail.com>
1 parent 6cd750d commit 0625b3a

File tree

3 files changed

+115
-91
lines changed

3 files changed

+115
-91
lines changed

base/condition.jl

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -125,104 +125,20 @@ proceeding.
125125
"""
126126
function wait end
127127

128-
# wait with timeout
129-
#
130-
# The behavior of wait changes if a timeout is specified. There are
131-
# three concurrent entities that can interact:
132-
# 1. Task W: the task that calls wait w/timeout.
133-
# 2. Task T: the task created to handle a timeout.
134-
# 3. Task N: the task that notifies the Condition being waited on.
135-
#
136-
# Typical flow:
137-
# - W enters the Condition's wait queue.
138-
# - W creates T and stops running (calls wait()).
139-
# - T, when scheduled, waits on a Timer.
140-
# - Two common outcomes:
141-
# - N notifies the Condition.
142-
# - W starts running, closes the Timer, sets waiter_left and returns
143-
# the notify'ed value.
144-
# - The closed Timer throws an EOFError to T which simply ends.
145-
# - The Timer expires.
146-
# - T starts running and locks the Condition.
147-
# - T confirms that waiter_left is unset and that W is still in the
148-
# Condition's wait queue; it then removes W from the wait queue,
149-
# sets dosched to true and unlocks the Condition.
150-
# - If dosched is true, T schedules W with the special :timed_out
151-
# value.
152-
# - T ends.
153-
# - W runs and returns :timed_out.
154-
#
155-
# Some possible interleavings:
156-
# - N notifies the Condition but the Timer expires and T starts running
157-
# before W:
158-
# - W closing the expired Timer is benign.
159-
# - T will find that W is no longer in the Condition's wait queue
160-
# (which is protected by a lock) and will not schedule W.
161-
# - N notifies the Condition; W runs and calls wait on the Condition
162-
# again before the Timer expires:
163-
# - W sets waiter_left before leaving. When T runs, it will find that
164-
# waiter_left is set and will not schedule W.
165-
#
166-
# The lock on the Condition's wait queue and waiter_left together
167-
# ensure proper synchronization and behavior of the tasks involved.
168-
169128
"""
170-
wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
129+
wait(c::GenericCondition; first::Bool=false)
171130
172131
Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.
173132
174133
If the keyword `first` is set to `true`, the waiter will be put _first_
175134
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.
176-
177-
If `timeout` is specified, cancel the `wait` when it expires and return
178-
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
179-
millisecond.
180135
"""
181-
function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
182-
timeout == 0.0 || timeout 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond"))
183-
136+
function wait(c::GenericCondition; first::Bool=false)
184137
ct = current_task()
185138
_wait2(c, ct, first)
186139
token = unlockall(c.lock)
187-
188-
timer::Union{Timer, Nothing} = nothing
189-
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
190-
if timeout > 0.0
191-
timer = Timer(timeout)
192-
waiter_left = Threads.Atomic{Bool}(false)
193-
# start a task to wait on the timer
194-
t = Task() do
195-
try
196-
wait(timer)
197-
catch e
198-
# if the timer was closed, the waiting task has been scheduled; do nothing
199-
e isa EOFError && return
200-
end
201-
dosched = false
202-
lock(c.lock)
203-
# Confirm that the waiting task is still in the wait queue and remove it. If
204-
# the task is not in the wait queue, it must have been notified already so we
205-
# don't do anything here.
206-
if !waiter_left[] && ct.queue == c.waitq
207-
dosched = true
208-
Base.list_deletefirst!(c.waitq, ct)
209-
end
210-
unlock(c.lock)
211-
# send the waiting task a timeout
212-
dosched && schedule(ct, :timed_out)
213-
end
214-
t.sticky = false
215-
Threads._spawn_set_thrpool(t, :interactive)
216-
schedule(t)
217-
end
218-
219140
try
220-
res = wait()
221-
if timer !== nothing
222-
close(timer)
223-
waiter_left[] = true
224-
end
225-
return res
141+
return wait()
226142
catch
227143
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
228144
rethrow()

base/experimental.jl

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
module Experimental
1111

1212
using Base: Threads, sync_varname, is_function_def, @propagate_inbounds
13+
using Base: GenericCondition
1314
using Base.Meta
1415

1516
"""
@@ -577,4 +578,112 @@ function task_wall_time_ns(t::Task=current_task())
577578
return end_at - start_at
578579
end
579580

581+
# wait_with_timeout
582+
#
583+
# A version of `wait(c::Condition)` that additionally allows the
584+
# specification of a timeout. This is experimental as it will likely
585+
# be dropped when a cancellation framework is added.
586+
#
587+
# The parallel behavior of wait_with_timeout is specified here. There
588+
# are three concurrent entities that can interact:
589+
# 1. Task W: the task that calls wait_with_timeout.
590+
# 2. Task T: the task created to handle a timeout.
591+
# 3. Task N: the task that notifies the Condition being waited on.
592+
#
593+
# Typical flow:
594+
# - W enters the Condition's wait queue.
595+
# - W creates T and stops running (calls wait()).
596+
# - T, when scheduled, waits on a Timer.
597+
# - Two common outcomes:
598+
# - N notifies the Condition.
599+
# - W starts running, closes the Timer, sets waiter_left and returns
600+
# the notify'ed value.
601+
# - The closed Timer throws an EOFError to T which simply ends.
602+
# - The Timer expires.
603+
# - T starts running and locks the Condition.
604+
# - T confirms that waiter_left is unset and that W is still in the
605+
# Condition's wait queue; it then removes W from the wait queue,
606+
# sets dosched to true and unlocks the Condition.
607+
# - If dosched is true, T schedules W with the special :timed_out
608+
# value.
609+
# - T ends.
610+
# - W runs and returns :timed_out.
611+
#
612+
# Some possible interleavings:
613+
# - N notifies the Condition but the Timer expires and T starts running
614+
# before W:
615+
# - W closing the expired Timer is benign.
616+
# - T will find that W is no longer in the Condition's wait queue
617+
# (which is protected by a lock) and will not schedule W.
618+
# - N notifies the Condition; W runs and calls wait on the Condition
619+
# again before the Timer expires:
620+
# - W sets waiter_left before leaving. When T runs, it will find that
621+
# waiter_left is set and will not schedule W.
622+
#
623+
# The lock on the Condition's wait queue and waiter_left together
624+
# ensure proper synchronization and behavior of the tasks involved.
625+
626+
"""
627+
wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
628+
629+
Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.
630+
631+
If the keyword `first` is set to `true`, the waiter will be put _first_
632+
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.
633+
634+
If `timeout` is specified, cancel the `wait` when it expires and return
635+
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
636+
millisecond.
637+
"""
638+
function wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
639+
ct = current_task()
640+
Base._wait2(c, ct, first)
641+
token = Base.unlockall(c.lock)
642+
643+
timer::Union{Timer, Nothing} = nothing
644+
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
645+
if timeout > 0.0
646+
timer = Timer(timeout)
647+
waiter_left = Threads.Atomic{Bool}(false)
648+
# start a task to wait on the timer
649+
t = Task() do
650+
try
651+
wait(timer)
652+
catch e
653+
# if the timer was closed, the waiting task has been scheduled; do nothing
654+
e isa EOFError && return
655+
end
656+
dosched = false
657+
lock(c.lock)
658+
# Confirm that the waiting task is still in the wait queue and remove it. If
659+
# the task is not in the wait queue, it must have been notified already so we
660+
# don't do anything here.
661+
if !waiter_left[] && ct.queue == c.waitq
662+
dosched = true
663+
Base.list_deletefirst!(c.waitq, ct)
664+
end
665+
unlock(c.lock)
666+
# send the waiting task a timeout
667+
dosched && schedule(ct, :timed_out)
668+
end
669+
t.sticky = false
670+
Threads._spawn_set_thrpool(t, :interactive)
671+
schedule(t)
672+
end
673+
674+
try
675+
res = wait()
676+
if timer !== nothing
677+
close(timer)
678+
waiter_left[] = true
679+
end
680+
return res
681+
catch
682+
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
683+
rethrow()
684+
finally
685+
Base.relockall(c.lock, token)
686+
end
687+
end
688+
580689
end # module

test/channels.jl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,15 @@ end
4040
@test fetch(t) == "finished"
4141
end
4242

43-
@testset "timed wait on Condition" begin
43+
@testset "wait_with_timeout on Condition" begin
4444
a = Threads.Condition()
45-
@test_throws ArgumentError @lock a wait(a; timeout=0.0005)
46-
@test @lock a wait(a; timeout=0.1)==:timed_out
45+
@test @lock a Experimental.wait_with_timeout(a; timeout=0.1)==:timed_out
4746
lock(a)
4847
@spawn begin
4948
@lock a notify(a)
5049
end
5150
@test try
52-
wait(a; timeout=2)
51+
Experimental.wait_with_timeout(a; timeout=2)
5352
true
5453
finally
5554
unlock(a)

0 commit comments

Comments
 (0)