Skip to content

Commit 0f0ed1d

Browse files
authored
Add wait(deadline future) implementation. (#535)
* Add waitUntil(deadline) implementation. * Add one more test. * Fix rare race condition and tests for it. * Rename waitUntil() to wait().
1 parent d184a92 commit 0f0ed1d

File tree

2 files changed

+481
-10
lines changed

2 files changed

+481
-10
lines changed

chronos/internal/asyncfutures.nim

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,6 +1529,60 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {.
15291529
inline, deprecated: "Use withTimeout(Future[T], Duration)".} =
15301530
withTimeout(fut, timeout.milliseconds())
15311531
1532+
proc waitUntilImpl[F: SomeFuture](fut: F, retFuture: auto,
1533+
deadline: auto): auto =
1534+
var timeouted = false
1535+
1536+
template completeFuture(fut: untyped, timeout: bool): untyped =
1537+
if fut.failed():
1538+
retFuture.fail(fut.error(), warn = false)
1539+
elif fut.cancelled():
1540+
if timeout:
1541+
# Its possible that `future` could be cancelled in some other place. In
1542+
# such case we can't detect if it was our cancellation due to timeout,
1543+
# or some other cancellation.
1544+
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
1545+
else:
1546+
retFuture.cancelAndSchedule()
1547+
else:
1548+
when type(fut).T is void:
1549+
retFuture.complete()
1550+
else:
1551+
retFuture.complete(fut.value)
1552+
1553+
proc continuation(udata: pointer) {.raises: [].} =
1554+
if not(retFuture.finished()):
1555+
if timeouted:
1556+
# When timeout is exceeded and we cancelled future via cancelSoon(),
1557+
# its possible that future at this moment already has value
1558+
# and/or error.
1559+
fut.completeFuture(timeouted)
1560+
return
1561+
if not(fut.finished()):
1562+
timeouted = true
1563+
fut.cancelSoon()
1564+
else:
1565+
fut.completeFuture(false)
1566+
1567+
var cancellation: proc(udata: pointer) {.gcsafe, raises: [].}
1568+
cancellation = proc(udata: pointer) {.gcsafe, raises: [].} =
1569+
deadline.removeCallback(continuation)
1570+
if not(fut.finished()):
1571+
fut.cancelSoon()
1572+
else:
1573+
fut.completeFuture(false)
1574+
1575+
if fut.finished():
1576+
fut.completeFuture(false)
1577+
else:
1578+
if deadline.finished():
1579+
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
1580+
else:
1581+
retFuture.cancelCallback = cancellation
1582+
fut.addCallback(continuation)
1583+
deadline.addCallback(continuation)
1584+
retFuture
1585+
15321586
proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
15331587
var
15341588
moment: Moment
@@ -1606,7 +1660,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
16061660
## TODO: In case when ``fut`` got cancelled, what result Future[T]
16071661
## should return, because it can't be cancelled too.
16081662
var
1609-
retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
1663+
retFuture = newFuture[T]("chronos.wait(duration)",
1664+
{FutureFlag.OwnCancelSchedule})
16101665
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
16111666
# manually at proper time.
16121667
@@ -1621,6 +1676,28 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
16211676
else:
16221677
wait(fut, timeout.milliseconds())
16231678
1679+
proc wait*[T](fut: Future[T], deadline: SomeFuture): Future[T] =
1680+
## Returns a future which will complete once future ``fut`` completes
1681+
## or if ``deadline`` future completes.
1682+
##
1683+
## If `deadline` future completes before future `fut` -
1684+
## `AsyncTimeoutError` exception will be raised.
1685+
##
1686+
## Note: `deadline` future will not be cancelled and/or failed.
1687+
##
1688+
## Note: While `waitUntil(future)` operation is pending, please avoid any
1689+
## attempts to cancel future `fut`. If it happens `waitUntil()` could
1690+
## introduce undefined behavior - it could raise`CancelledError` or
1691+
## `AsyncTimeoutError`.
1692+
##
1693+
## If you need to cancel `future` - cancel `waitUntil(future)` instead.
1694+
var
1695+
retFuture = newFuture[T]("chronos.wait(future)",
1696+
{FutureFlag.OwnCancelSchedule})
1697+
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
1698+
# manually at proper time.
1699+
waitUntilImpl(fut, retFuture, deadline)
1700+
16241701
proc join*(future: FutureBase): Future[void] {.
16251702
async: (raw: true, raises: [CancelledError]).} =
16261703
## Returns a future which will complete once future ``future`` completes.
@@ -1783,8 +1860,21 @@ proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto =
17831860
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)
17841861

17851862
let
1786-
retFuture = newFuture[T]("chronos.wait()", {OwnCancelSchedule})
1863+
retFuture = newFuture[T]("chronos.wait(duration)", {OwnCancelSchedule})
17871864
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
17881865
# manually at proper time.
17891866
17901867
waitImpl(fut, retFuture, timeout)
1868+
1869+
proc wait*(fut: InternalRaisesFuture, deadline: InternalRaisesFuture): auto =
1870+
type
1871+
T = type(fut).T
1872+
E = type(fut).E
1873+
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)
1874+
1875+
let
1876+
retFuture = newFuture[T]("chronos.wait(future)", {OwnCancelSchedule})
1877+
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
1878+
# manually at proper time.
1879+
1880+
waitUntilImpl(fut, retFuture, deadline)

0 commit comments

Comments
 (0)