Skip to content

Commit 74bd030

Browse files
committed
Change the mpsc counting to fix #49
1 parent 46cf323 commit 74bd030

File tree

1 file changed

+25
-11
lines changed

1 file changed

+25
-11
lines changed

weave/cross_thread_com/channels_mpsc_unbounded_batch.nim

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ proc trySend*[T](chan: var ChannelMpscUnboundedBatch[T], src: sink T): bool {.in
102102
## Send an item to the back of the channel
103103
## As the channel has unbounded capacity, this should never fail
104104

105-
discard chan.count.fetchAdd(1, moRelaxed)
105+
let oldCount {.used.} = chan.count.fetchAdd(1, moRelease)
106+
postCondition: oldCount >= 0
107+
106108
src.next.store(nil, moRelease)
107109
let oldBack = chan.back.exchange(src, moAcquireRelease)
108110
# Consumer can be blocked here, it doesn't see the (potentially growing) end of the queue
@@ -116,7 +118,9 @@ proc trySendBatch*[T](chan: var ChannelMpscUnboundedBatch[T], first, last: sink
116118
## They should be linked together by their next field
117119
## As the channel has unbounded capacity this should never fail
118120

119-
discard chan.count.fetchAdd(int(count), moRelaxed)
121+
let oldCount {.used.} = chan.count.fetchAdd(1, moRelease)
122+
postCondition: oldCount >= 0
123+
120124
last.next.store(nil, moRelease)
121125
let oldBack = chan.back.exchange(last, moAcquireRelease)
122126
# Consumer can be blocked here, it doesn't see the (potentially growing) end of the queue
@@ -144,10 +148,13 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool =
144148
if not next.isNil:
145149
# Not competing with producers
146150
prefetch(first)
147-
discard chan.count.fetchSub(1, moRelaxed)
151+
148152
chan.front.next.store(next, moRelaxed)
149153
# fence(moAcquire) # Sync "first.next.load(moRelaxed)"
150154
dst = first
155+
156+
let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed)
157+
ascertain: oldCount >= 1 # The producers may overestimate the count
151158
return true
152159
# End fast-path
153160

@@ -162,8 +169,10 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool =
162169
if compareExchange(chan.back, last, chan.front.addr, moAcquireRelease):
163170
# We won and replaced the last node with the channel front
164171
prefetch(first)
165-
discard chan.count.fetchSub(1, moRelaxed)
166172
dst = first
173+
174+
let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed)
175+
ascertain: oldCount >= 1 # The producers may overestimate the count
167176
return true
168177

169178
# We lost but now we know that there is an extra node coming very soon
@@ -179,10 +188,12 @@ proc tryRecv*[T](chan: var ChannelMpscUnboundedBatch[T], dst: var T): bool =
179188
next = first.next.load(moAcquire)
180189

181190
prefetch(first)
182-
discard chan.count.fetchSub(1, moRelaxed)
183191
chan.front.next.store(next, moRelaxed)
184192
# fence(moAcquire) # sync first.next.load(moRelaxed)
185193
dst = first
194+
195+
let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed)
196+
ascertain: oldCount >= 1 # The producers may overestimate the count
186197
return true
187198

188199
# # Alternative implementation
@@ -235,8 +246,9 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var
235246
if front != last:
236247
# We lose the competition, bail out
237248
chan.front.next.store(front, moRelease)
238-
discard chan.count.fetchSub(result, moRelaxed)
239-
postCondition: chan.count.load(moRelaxed) >= 0 # TODO: somehow it can be negative
249+
250+
let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed)
251+
postCondition: oldCount >= result # TODO: somehow it can be negative
240252
return
241253

242254
# front == last
@@ -245,9 +257,10 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var
245257
# We won and replaced the last node with the channel front
246258
prefetch(front)
247259
result += 1
248-
discard chan.count.fetchSub(result, moRelaxed)
249260
bLast = front
250-
postCondition: chan.count.load(moRelaxed) >= 0
261+
262+
let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed)
263+
postCondition: oldCount >= result # TODO: somehow it can be negative
251264
return
252265

253266
# We lost but now we know that there is an extra node
@@ -267,11 +280,12 @@ proc tryRecvBatch*[T](chan: var ChannelMpscUnboundedBatch[T], bFirst, bLast: var
267280

268281
prefetch(front)
269282
result += 1
270-
discard chan.count.fetchSub(result, moRelaxed)
271283
chan.front.next.store(next, moRelaxed)
272284
# fence(moAcquire) # sync front.next.load(moRelaxed)
273285
bLast = front
274-
postCondition: chan.count.load(moRelaxed) >= 0
286+
287+
let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed)
288+
postCondition: oldCount >= result # TODO: somehow it can be negative
275289

276290
func peek*(chan: var ChannelMpscUnboundedBatch): int32 {.inline.} =
277291
## Estimates the number of items pending in the channel

0 commit comments

Comments
 (0)