1
1
package io .vproxy .base .util .objectpool ;
2
2
3
+ import io .vproxy .base .util .Utils ;
4
+ import io .vproxy .base .util .lock .ReadWriteSpinLock ;
5
+ import io .vproxy .base .util .thread .VProxyThread ;
6
+
3
7
import java .util .concurrent .atomic .AtomicInteger ;
4
- import java .util .concurrent .atomic .AtomicReference ;
5
8
import java .util .concurrent .atomic .AtomicReferenceArray ;
6
9
7
10
/**
8
11
* The pool is split into a few partitions, each partition has a read array and a write array.
9
12
* When adding, elements will be added into the write array.
10
13
* When polling, elements will be polled from the read array.
11
- * If read array is empty and write array is full, and when running polling, the two arrays will be swapped
12
- * (they will not be swapped when adding).
14
+ * If read array is empty and write array is full, and when running polling or adding, the two arrays will be swapped
13
15
* The arrays will not be operated when they are being swapped.
14
16
* When concurrency occurs, the operations will retry for maximum 10 times.
15
17
*
16
18
* @param <E> element type
17
19
*/
18
20
public class ConcurrentObjectPool <E > {
19
21
private final int partitionCount ;
22
+ private final int partitionCountMinusOne ;
20
23
private final Partition <E >[] partitions ;
24
+ private final int maxTraversal ;
21
25
22
26
public ConcurrentObjectPool (int capacityHint ) {
23
- this (capacityHint , 16 , 4 );
27
+ this (capacityHint , 16 , 0 );
24
28
}
25
29
26
- public ConcurrentObjectPool (int capacityHint , int partitionCountHint , int minPartitionCapHint ) {
27
- capacityHint -= 1 ;
28
- capacityHint |= capacityHint >>> 1 ;
29
- capacityHint |= capacityHint >>> 2 ;
30
- capacityHint |= capacityHint >>> 4 ;
31
- capacityHint |= capacityHint >>> 8 ;
32
- capacityHint |= capacityHint >>> 16 ;
33
- capacityHint += 1 ;
30
+ public ConcurrentObjectPool (int capacityHint , int partitionCountHint , int maxTraversalHint ) {
31
+ capacityHint = Utils .minPow2GreaterThan (capacityHint ) / 2 ;
32
+ partitionCountHint = Utils .minPow2GreaterThan (partitionCountHint );
34
33
35
- if (capacityHint / minPartitionCapHint == 0 ) {
34
+ if (capacityHint / partitionCountHint == 0 ) {
36
35
partitionCount = 1 ;
37
36
} else {
38
- partitionCount = Math . min ( capacityHint / minPartitionCapHint , partitionCountHint ) ;
37
+ partitionCount = partitionCountHint ;
39
38
}
39
+ partitionCountMinusOne = partitionCount - 1 ;
40
40
41
41
//noinspection unchecked
42
42
this .partitions = new Partition [partitionCount ];
43
43
for (int i = 0 ; i < partitionCount ; ++i ) {
44
44
partitions [i ] = new Partition <>(capacityHint / partitionCount );
45
45
}
46
+
47
+ if (maxTraversalHint <= 0 || maxTraversalHint >= partitionCount ) {
48
+ maxTraversal = partitionCount ;
49
+ } else {
50
+ maxTraversal = maxTraversalHint ;
51
+ }
52
+ }
53
+
54
+ private int hashForPartition () {
55
+ var tid = VProxyThread .current ().threadId ;
56
+ return (int ) (tid & partitionCountMinusOne );
46
57
}
47
58
48
59
public boolean add (E e ) {
49
- for (int i = 0 ; i < partitionCount ; ++i ) {
50
- if (partitions [i ].add (e )) {
60
+ int m = maxTraversal ;
61
+ int hash = hashForPartition ();
62
+ for (int i = hash ; m > 0 ; ++i , --m ) {
63
+ if (partitions [i & partitionCountMinusOne ].add (e )) {
51
64
return true ;
52
65
}
53
66
}
54
67
return false ;
55
68
}
56
69
57
70
public E poll () {
58
- for (int i = 0 ; i < partitionCount ; ++i ) {
59
- E e = partitions [i ].poll ();
71
+ int m = maxTraversal ;
72
+ int hash = hashForPartition ();
73
+ for (int i = hash ; m > 0 ; ++i , --m ) {
74
+ E e = partitions [i & partitionCountMinusOne ].poll ();
60
75
if (e != null ) {
61
76
return e ;
62
77
}
@@ -73,27 +88,37 @@ public int size() {
73
88
}
74
89
75
90
private static class Partition <E > {
76
- private final AtomicReference <StorageArray <E >> read ;
77
- private volatile StorageArray <E > write ;
78
- private final StorageArray <E > _1 ;
79
- private final StorageArray <E > _2 ;
91
+ private final ReadWriteSpinLock lock = new ReadWriteSpinLock ();
92
+ private volatile ArrayQueue <E > read ;
93
+ private volatile ArrayQueue <E > write ;
94
+ private final ArrayQueue <E > _1 ;
95
+ private final ArrayQueue <E > _2 ;
80
96
81
97
public Partition (int capacity ) {
82
- _1 = new StorageArray <>(capacity );
83
- _2 = new StorageArray <>(capacity );
84
- read = new AtomicReference <>( _1 ) ;
98
+ _1 = new ArrayQueue <>(capacity , lock );
99
+ _2 = new ArrayQueue <>(capacity , lock );
100
+ read = _1 ;
85
101
write = _2 ;
86
102
}
87
103
88
104
public boolean add (E e ) {
89
- StorageArray <E > write = this .write ;
105
+ return add (1 , e );
106
+ }
107
+
108
+ private boolean add (int retry , E e ) {
109
+ if (retry > 10 ) { // max retry for 10 times
110
+ return false ; // too many retries
111
+ }
90
112
91
- // adding is always safe
92
- //noinspection RedundantIfStatement
113
+ var write = this .write ;
93
114
if (write .add (e )) {
94
115
return true ;
95
116
}
96
- // $write is full, storing fails
117
+
118
+ // the $write is full now
119
+ if (swap (read , write , false )) {
120
+ return add (retry + 1 , e );
121
+ }
97
122
return false ;
98
123
}
99
124
@@ -106,103 +131,134 @@ private E poll(int retry) {
106
131
return null ; // too many retries
107
132
}
108
133
109
- StorageArray < E > read = this .read . get () ;
110
- StorageArray < E > write = this .write ;
134
+ var read = this .read ;
135
+ var write = this .write ;
111
136
112
- // polling is always safe
113
- E ret = read .poll ();
137
+ var ret = read .poll ();
114
138
if (ret != null ) {
115
139
return ret ;
116
140
}
117
141
118
142
// no elements in the $read now
119
- // check whether we can swap (whether $write is full)
143
+ if (swap (read , write , true )) {
144
+ return poll (retry + 1 );
145
+ }
146
+ return null ;
147
+ }
120
148
121
- int writeEnd = write .end .get ();
122
- if (writeEnd < write .capacity ) {
123
- return null ; // capacity not reached, do not swap and return nothing
124
- // no retry here because the write array will not change until something written into it
149
+ // return true -> need retry
150
+ // return false -> failed and should not retry
151
+ private boolean swap (ArrayQueue <E > read , ArrayQueue <E > write , boolean isPolling ) {
152
+ // check whether we can swap
153
+ if (read == write ) {
154
+ // is being swapped by another thread
155
+ return true ;
125
156
}
126
- // also we should check whether there are no elements being stored
127
- if (write .storing .get () != 0 ) { // element is being stored into the array
128
- return poll (retry + 1 ); // try again
157
+
158
+ if (isPolling ) { // $read is empty
159
+ int writeEnd = write .end .get ();
160
+ if (writeEnd < write .capacity ) {
161
+ return false ; // capacity not reached, do not swap and return nothing
162
+ // no retry here because the write array will not change until something written into it
163
+ }
164
+ } else { // $write is full
165
+ int readStart = read .start .get ();
166
+ if (readStart < read .end .get ()) {
167
+ return false ; // still have objects to fetch, do not swap
168
+ // no retry here because the read array will not change until something polling from it
169
+ }
129
170
}
130
- // now we can know that writing operations will not happen in this partition
131
171
132
- // we can safely swap the two arrays now
133
- if (!this .read .compareAndSet (read , write )) {
134
- return poll (retry + 1 ); // concurrency detected: another thread is swapping
172
+ lock .writeLock ();
173
+ if (this .read != read ) {
174
+ // already swapped by another thread
175
+ lock .writeUnlock ();
176
+ return true ;
135
177
}
178
+ // we can safely swap the two arrays now
179
+ this .read = write ;
136
180
// the $read is expected to be empty
137
181
assert read .size () == 0 ;
138
182
read .reset (); // reset the cursors, so further operations can store data into this array
139
183
this .write = read ; // swapping is done
140
- return poll (retry + 1 ); // poll again
184
+ lock .writeUnlock ();
185
+
186
+ return true ;
141
187
}
142
188
143
189
public int size () {
144
190
return _1 .size () + _2 .size ();
145
191
}
146
192
}
147
193
148
- private static class StorageArray <E > {
194
+ private static class ArrayQueue <E > {
149
195
private final int capacity ;
196
+ private final ReadWriteSpinLock lock ;
150
197
private final AtomicReferenceArray <E > array ;
151
- private final AtomicInteger start = new AtomicInteger (-1 );
152
- private final AtomicInteger end = new AtomicInteger (-1 );
153
- private final AtomicInteger storing = new AtomicInteger (0 );
198
+ private final AtomicInteger start = new AtomicInteger (0 );
199
+ private final AtomicInteger end = new AtomicInteger (0 );
154
200
155
- private StorageArray (int capacity ) {
201
+ private ArrayQueue (int capacity , ReadWriteSpinLock lock ) {
156
202
this .capacity = capacity ;
203
+ this .lock = lock ;
157
204
this .array = new AtomicReferenceArray <>(capacity );
158
205
}
159
206
160
207
boolean add (E e ) {
161
- storing . incrementAndGet ();
208
+ lock . readLock ();
162
209
163
210
if (end .get () >= capacity ) {
164
- storing . decrementAndGet ();
211
+ lock . readUnlock ();
165
212
return false ; // exceeds capacity
166
213
}
167
- int index = end .incrementAndGet ();
214
+ int index = end .getAndIncrement ();
168
215
if (index < capacity ) {
169
216
// storing should succeed
170
217
array .set (index , e );
171
- storing . decrementAndGet ();
218
+ lock . readUnlock ();
172
219
return true ;
173
220
} else {
174
221
// storing failed
175
- storing . decrementAndGet ();
222
+ lock . readUnlock ();
176
223
return false ;
177
224
}
178
225
}
179
226
180
227
E poll () {
181
- if (start .get () + 1 >= end .get () || start .get () + 1 >= capacity ) {
228
+ lock .readLock ();
229
+
230
+ if (start .get () >= end .get () || start .get () >= capacity ) {
231
+ lock .readUnlock ();
182
232
return null ;
183
233
}
184
- int idx = start .incrementAndGet ();
234
+ int idx = start .getAndIncrement ();
185
235
if (idx >= end .get () || idx >= capacity ) {
236
+ lock .readUnlock ();
186
237
return null ; // concurrent polling
187
238
}
188
- return array .get (idx );
239
+ var e = array .get (idx );
240
+ lock .readUnlock ();
241
+ return e ;
189
242
}
190
243
191
244
int size () {
192
- int start = this .start .get () + 1 ;
245
+ int start = this .start .get ();
193
246
if (start >= capacity ) {
194
247
return 0 ;
195
248
}
196
- int cap = end .get () + 1 ;
249
+ int cap = end .get ();
197
250
if (cap > capacity ) {
198
251
cap = capacity ;
199
252
}
253
+ if (start > cap ) {
254
+ return 0 ;
255
+ }
200
256
return cap - start ;
201
257
}
202
258
203
259
void reset () {
204
- end .set (- 1 );
205
- start .set (- 1 );
260
+ end .set (0 );
261
+ start .set (0 );
206
262
}
207
263
}
208
264
}
0 commit comments