Skip to content

Commit 0d38660

Browse files
authored
Merge pull request #49 from glessard/devel
tweaks
2 parents 9a6d8e9 + d445787 commit 0d38660

File tree

1 file changed

+47
-48
lines changed

1 file changed

+47
-48
lines changed

Sources/ReactiveStreams/stream-post.swift

+47-48
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,20 @@
99
import Dispatch
1010
import CAtomics
1111

12-
private let hptrOffset = 0
13-
private let tptrOffset = MemoryLayout<AtomicMutableRawPointer>.stride
14-
private let fptrOffset = MemoryLayout<AtomicMutableRawPointer>.stride*2
15-
1612
open class PostBox<Value>: EventStream<Value>
1713
{
1814
private typealias Node = BufferNode<Event<Value>>
1915

20-
private let s = UnsafeMutableRawPointer.allocate(byteCount: MemoryLayout<AtomicMutableRawPointer>.stride*3,
21-
alignment: MemoryLayout<AtomicMutableRawPointer>.alignment)
22-
private var hptr: UnsafeMutablePointer<AtomicMutableRawPointer> {
23-
return (s+hptrOffset).assumingMemoryBound(to: AtomicMutableRawPointer.self)
24-
}
25-
private var head: Node {
26-
get { return Node(storage: CAtomicsLoad(hptr, .relaxed)) }
27-
set { CAtomicsStore(hptr, newValue.storage, .relaxed) }
16+
private let s = UnsafeMutableRawPointer.allocate(byteCount: MemoryLayout<PostBoxState>.size,
17+
alignment: MemoryLayout<PostBoxState>.alignment)
18+
private var head: UnsafeMutablePointer<AtomicMutableRawPointer> {
19+
return (s+headOffset).assumingMemoryBound(to: AtomicMutableRawPointer.self)
2820
}
29-
private var tptr: UnsafeMutablePointer<AtomicMutableRawPointer> {
30-
return (s+tptrOffset).assumingMemoryBound(to: AtomicMutableRawPointer.self)
21+
private var tail: UnsafeMutablePointer<AtomicMutableRawPointer> {
22+
return (s+tailOffset).assumingMemoryBound(to: AtomicMutableRawPointer.self)
3123
}
32-
private var fptr: UnsafeMutablePointer<AtomicOptionalMutableRawPointer> {
33-
return (s+fptrOffset).assumingMemoryBound(to: AtomicOptionalMutableRawPointer.self)
24+
private var last: UnsafeMutablePointer<AtomicOptionalMutableRawPointer> {
25+
return (s+lastOffset).assumingMemoryBound(to: AtomicOptionalMutableRawPointer.self)
3426
}
3527

3628
override init(validated: ValidatedQueue)
@@ -39,47 +31,46 @@ open class PostBox<Value>: EventStream<Value>
3931

4032
// set up an initial dummy node
4133
let node = Node.dummy
42-
(s+hptrOffset).bindMemory(to: AtomicMutableRawPointer.self, capacity: 2)
43-
CAtomicsInitialize(hptr, node.storage)
44-
CAtomicsInitialize(tptr, node.storage)
45-
(s+fptrOffset).bindMemory(to: AtomicOptionalMutableRawPointer.self, capacity: 1)
46-
CAtomicsInitialize(fptr, nil)
34+
(s+headOffset).bindMemory(to: AtomicMutableRawPointer.self, capacity: 2)
35+
CAtomicsInitialize(head, node.storage)
36+
CAtomicsInitialize(tail, node.storage)
37+
(s+lastOffset).bindMemory(to: AtomicOptionalMutableRawPointer.self, capacity: 1)
38+
CAtomicsInitialize(last, nil)
4739
}
4840

4941
deinit {
5042
// empty the queue
51-
let head = self.head
52-
var next = head.next
43+
let head = Node(storage: CAtomicsLoad(self.head, .relaxed))
44+
var next = Node(storage: CAtomicsLoad(head.next, .relaxed))
5345
while let node = next
5446
{
55-
next = node.next
47+
next = Node(storage: CAtomicsLoad(node.next, .relaxed))
5648
node.deinitialize()
5749
node.deallocate()
5850
}
5951

6052
s.deallocate()
6153
}
6254

63-
final public var isEmpty: Bool { return CAtomicsLoad(hptr, .relaxed) == CAtomicsLoad(tptr, .relaxed) }
55+
final public var isEmpty: Bool { return CAtomicsLoad(head, .relaxed) == CAtomicsLoad(tail, .relaxed) }
6456

6557
final public func post(_ event: Event<Value>)
6658
{
67-
guard completed == false, CAtomicsLoad(fptr, .relaxed) == nil else { return }
59+
guard completed == false, CAtomicsLoad(last, .relaxed) == nil else { return }
6860

6961
let node = Node(initializedWith: event)
7062
if event.isError
7163
{
72-
guard CAtomicsCompareAndExchange(fptr, nil, node.storage, .strong, .relaxed) else { return }
64+
guard CAtomicsCompareAndExchange(last, nil, node.storage, .strong, .relaxed) else { return }
7365
}
7466

7567
// events posted "simultaneously" synchronize with each other here
76-
let previousTailPointer = CAtomicsExchange(tptr, node.storage, .acqrel)
77-
let previousTail = Node(storage: previousTailPointer)
68+
let previousTail = Node(storage: CAtomicsExchange(tail, node.storage, .acqrel))
7869

7970
// publish the new node to processing loop here
80-
CAtomicsStore(previousTail.nptr, node.storage, .release)
71+
CAtomicsStore(previousTail.next, node.storage, .release)
8172

82-
if previousTailPointer == CAtomicsLoad(hptr, .relaxed)
73+
if previousTail.storage == CAtomicsLoad(head, .relaxed)
8374
{ // the queue had been empty or blocked
8475
// resume processing enqueued events
8576
queue.async(execute: self.processNext)
@@ -110,17 +101,21 @@ open class PostBox<Value>: EventStream<Value>
110101
}
111102
#endif
112103

104+
let requested = self.requested
105+
if requested <= 0 && CAtomicsLoad(last, .relaxed) == nil { return }
106+
113107
// try to dequeue the next event
114-
let oldHead = head
115-
let next = CAtomicsLoad(oldHead.nptr, .acquire)
108+
let head = Node(storage: CAtomicsLoad(self.head, .acquire))
109+
let next = CAtomicsLoad(head.next, .acquire)
116110

117-
if requested <= 0 && CAtomicsLoad(fptr, .relaxed) != next { return }
111+
if requested <= 0 && CAtomicsLoad(last, .relaxed) != next { return }
118112

119-
if let next = Node(storage: next)
113+
if let next = next
120114
{
121-
let event = next.move()
122-
head = next
123-
oldHead.deallocate()
115+
let node = Node(storage: next)
116+
let event = node.move()
117+
CAtomicsStore(self.head, next, .release)
118+
head.deallocate()
124119

125120
dispatch(event)
126121
queue.async(execute: self.processNext)
@@ -140,6 +135,16 @@ open class PostBox<Value>: EventStream<Value>
140135
}
141136
}
142137

138+
private struct PostBoxState
139+
{
140+
var head: AtomicMutableRawPointer
141+
var tail: AtomicMutableRawPointer
142+
var last: AtomicOptionalMutableRawPointer
143+
}
144+
private let headOffset = MemoryLayout.offset(of: \PostBoxState.head)!
145+
private let tailOffset = MemoryLayout.offset(of: \PostBoxState.tail)!
146+
private let lastOffset = MemoryLayout.offset(of: \PostBoxState.last)!
147+
143148
private let nextOffset = 0
144149
private let dataOffset = (MemoryLayout<AtomicOptionalMutableRawPointer>.stride + 15) & ~15
145150

@@ -160,10 +165,10 @@ private struct BufferNode<Element>: Equatable
160165

161166
private init()
162167
{
163-
let size = dataOffset + MemoryLayout<Element>.stride
168+
let size = dataOffset + MemoryLayout<Element>.size
164169
storage = UnsafeMutableRawPointer.allocate(byteCount: size, alignment: 16)
165170
(storage+nextOffset).bindMemory(to: AtomicOptionalMutableRawPointer.self, capacity: 1)
166-
CAtomicsInitialize(nptr, nil)
171+
CAtomicsInitialize(next, nil)
167172
(storage+dataOffset).bindMemory(to: Element.self, capacity: 1)
168173
}
169174

@@ -180,14 +185,8 @@ private struct BufferNode<Element>: Equatable
180185
storage.deallocate()
181186
}
182187

183-
var nptr: UnsafeMutablePointer<AtomicOptionalMutableRawPointer> {
184-
get {
185-
return (storage+nextOffset).assumingMemoryBound(to: AtomicOptionalMutableRawPointer.self)
186-
}
187-
}
188-
189-
var next: BufferNode? {
190-
get { return BufferNode(storage: CAtomicsLoad(nptr, .acquire)) }
188+
var next: UnsafeMutablePointer<AtomicOptionalMutableRawPointer> {
189+
return (storage+nextOffset).assumingMemoryBound(to: AtomicOptionalMutableRawPointer.self)
191190
}
192191

193192
private var data: UnsafeMutablePointer<Element> {

0 commit comments

Comments
 (0)