Skip to content

Model Checked - MPSC queue + Rework State Machines #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions thread_collider/greenlet
Submodule greenlet added at aed081
40 changes: 31 additions & 9 deletions weave/cross_thread_com/channels_mpsc_unbounded_batch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool =
## This can fail spuriously on the last element if producer
## enqueues a new element while the consumer was dequeing it

let first = cast[T](chan.front.next.load(moRelaxed))
let first = cast[T](chan.front.next.load(moAcquire))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This moRelaxed triggered the race detector every single time:

image

The rest of the changes were not checked

if first.isNil:
# According to the model checker, we can't put "fence(moAcquire)"
# here and use relaxed semantics for "first" as it may read from initialized load
# (due to compiler reordering?)
return false

# fast path
Expand All @@ -145,7 +148,7 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool =
prefetch(first)
discard chan.count.fetchSub(1, moRelaxed)
chan.front.next.store(next, moRelaxed)
fence(moAcquire)
fence(moAcquire) # Sync "first.next.load(moRelaxed)"
dst = first
return true
# End fast-path
Expand All @@ -154,11 +157,13 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool =
var last = chan.back.load(moRelaxed)
if first != last:
# We lose the competition before even trying
fence(moAcquire) # Sync "chan.back.load(moRelaxed)"
return false

chan.front.next.store(nil, moRelaxed)
if compareExchange(chan.back, last, chan.front.addr, moAcquireRelease):
# We won and replaced the last node with the channel front
prefetch(first)
discard chan.count.fetchSub(1, moRelaxed)
dst = first
return true
Expand All @@ -178,10 +183,27 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool =
prefetch(first)
discard chan.count.fetchSub(1, moRelaxed)
chan.front.next.store(next, moRelaxed)
fence(moAcquire)
fence(moAcquire) # sync first.next.load(moRelaxed)
dst = first
return true

# Alternative implementation
#
# # We lost but now we know that there is an extra node coming very soon
# cpuRelax()
# let next = first.next.load(moRelaxed)
# if not next.isNil:
# # Extra nodes after this one, no more competition with producers
# prefetch(first)
# discard chan.count.fetchSub(1, moRelaxed)
# chan.front.next.store(next, moRelaxed)
# fence(moAcquire) # Sync "first.next.load(moRelaxed)"
# dst = first
# return true

# # The last item wasn't linked to the list yet, bail out
# return false

proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var T): int32 =
## Try receiving all items buffered in the channel
## Returns true if at least some items are dequeued.
Expand All @@ -198,24 +220,24 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var

result = 0

var front = cast[T](chan.front.next.load(moRelaxed))
var front = cast[T](chan.front.next.load(moAcquire))
bFirst = front
if front.isNil:
return

# Fast-forward to the end of the channel
var next = cast[T](front.next.load(moRelaxed))
var next = cast[T](front.next.load(moAcquire))
while not next.isNil:
result += 1
bLast = front
front = next
next = cast[T](next.next.load(moRelaxed))
next = cast[T](next.next.load(moAcquire))

# Competing with producers at the back
var last = chan.back.load(moRelaxed)
var last = chan.back.load(moAcquire)
if front != last:
# We lose the competition, bail out
chan.front.next.store(front, moRelaxed)
chan.front.next.store(front, moRelease)
discard chan.count.fetchSub(result, moRelaxed)
postCondition: chan.count.load(moRelaxed) >= 0 # TODO: somehow it can be negative
return
Expand Down Expand Up @@ -250,7 +272,7 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var
result += 1
discard chan.count.fetchSub(result, moRelaxed)
chan.front.next.store(next, moRelaxed)
fence(moAcquire)
fence(moAcquire) # sync front.next.load(moRelaxed)
bLast = front
postCondition: chan.count.load(moRelaxed) >= 0

Expand Down