Skip to content

Commit 8715076

Browse files
devinrsmithrcaudy
andauthored
Improve S3, parquet, and SeekableChannelsProvider (#5137)
Adds in port of PooledObjectReference from DHE Fixes #5096 --------- Co-authored-by: Ryan Caudy <rcaudy@gmail.com>
1 parent 02dc828 commit 8715076

File tree

40 files changed

+1676
-840
lines changed

40 files changed

+1676
-840
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package io.deephaven.base.reference;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
import org.jetbrains.annotations.Nullable;
5+
6+
import java.util.Objects;
7+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
8+
9+
/**
10+
* {@link SimpleReference} implementation with built-in reference-counting and pooling support.
11+
*/
12+
public abstract class PooledObjectReference<REFERENT_TYPE> implements SimpleReference<REFERENT_TYPE> {
13+
14+
/**
15+
* Field updater for {@code state}.
16+
*/
17+
@SuppressWarnings("rawtypes")
18+
private static final AtomicIntegerFieldUpdater<PooledObjectReference> STATE_UPDATER =
19+
AtomicIntegerFieldUpdater.newUpdater(PooledObjectReference.class, "state");
20+
21+
/**
22+
* An available reference with zero outstanding permits will have {@code state == 1}.
23+
*/
24+
private static final int AVAILABLE_ZERO_PERMITS = 1;
25+
26+
/**
27+
* A cleared reference with zero outstanding permits will have {@code state == 0}.
28+
*/
29+
private static final int CLEARED_ZERO_PERMITS = 0;
30+
31+
/**
32+
* The bit we use to denote availability.
33+
*/
34+
private static final int STATE_AVAILABLE_BIT = 1;
35+
36+
/**
37+
* The quantity to add to state when incrementing the number of outstanding permits.
38+
*/
39+
private static final int STATE_PERMIT_ACQUIRE_QUANTITY = 2;
40+
41+
/**
42+
* The quantity to add to state when decrementing the number of outstanding permits.
43+
*/
44+
private static final int STATE_PERMIT_RELEASE_QUANTITY = -STATE_PERMIT_ACQUIRE_QUANTITY;
45+
46+
private static boolean stateAllowsAcquire(final int currentState) {
47+
return currentState > CLEARED_ZERO_PERMITS;
48+
}
49+
50+
private static boolean stateIsAvailable(final int currentState) {
51+
return (currentState & STATE_AVAILABLE_BIT) != 0;
52+
}
53+
54+
private static boolean stateIsCleared(final int currentState) {
55+
return (currentState & STATE_AVAILABLE_BIT) == 0;
56+
}
57+
58+
private static int calculateNewStateForClear(final int currentState) {
59+
return currentState ^ STATE_AVAILABLE_BIT;
60+
}
61+
62+
private static int calculateNewStateForAcquire(final int currentState) {
63+
return currentState + STATE_PERMIT_ACQUIRE_QUANTITY;
64+
}
65+
66+
/**
67+
* Try to atomically update {@code state}.
68+
*
69+
* @param currentState The expected value
70+
* @param newState The desired result value
71+
* @return Whether {@code state} was successfully updated
72+
*/
73+
private boolean tryUpdateState(final int currentState, final int newState) {
74+
return STATE_UPDATER.compareAndSet(this, currentState, newState);
75+
}
76+
77+
/**
78+
* Atomically decrement the number of outstanding permits and get the new value of {@code state}.
79+
*
80+
* @return The new value of {@code state}
81+
*/
82+
private int decrementOutstandingPermits() {
83+
return STATE_UPDATER.addAndGet(this, STATE_PERMIT_RELEASE_QUANTITY);
84+
}
85+
86+
/**
87+
* The actual referent. Set to null after {@code state == CLEARED_ZERO_PERMITS} by the responsible thread, which
88+
* returns it to the pool.
89+
*/
90+
private volatile REFERENT_TYPE referent;
91+
92+
/**
93+
* The state of this reference. The lowest bit is used to denote whether this reference is available (1) or cleared
94+
* (0). The higher bits represent an integer count of the number of outstanding permits.
95+
*/
96+
private volatile int state = AVAILABLE_ZERO_PERMITS;
97+
98+
/**
99+
* Construct a new PooledObjectReference to the supplied referent.
100+
*
101+
* @param referent The referent of this reference
102+
*/
103+
protected PooledObjectReference(@NotNull final REFERENT_TYPE referent) {
104+
this.referent = Objects.requireNonNull(referent, "referent");
105+
}
106+
107+
/**
108+
* Get the referent. It is an error to call this method if the caller does not have any outstanding permits. Callers
109+
* are encouraged to use this in the try block of a try-finally pattern:
110+
*
111+
* <pre>
112+
* if (!ref.acquire()) {
113+
* return;
114+
* }
115+
* try {
116+
* doSomethingWith(ref.get());
117+
* } finally {
118+
* ref.release();
119+
* }
120+
* </pre>
121+
*
122+
* @return The referent if this reference has not been cleared, null otherwise (which implies an error by the
123+
* caller)
124+
*/
125+
@Override
126+
public final REFERENT_TYPE get() {
127+
return referent;
128+
}
129+
130+
/**
131+
* Acquire an active use permit. Callers should pair this with a corresponding {@link #release()}, ideally with a
132+
* try-finally pattern:
133+
*
134+
* <pre>
135+
* if (!ref.acquire()) {
136+
* return;
137+
* }
138+
* try {
139+
* doSomethingWith(ref.get());
140+
* } finally {
141+
* ref.release();
142+
* }
143+
* </pre>
144+
*
145+
* @return Whether a permit was acquired
146+
*/
147+
public final boolean acquire() {
148+
int currentState;
149+
while (stateAllowsAcquire(currentState = state)) {
150+
final int newState = calculateNewStateForAcquire(currentState);
151+
if (tryUpdateState(currentState, newState)) {
152+
return true;
153+
}
154+
}
155+
return false;
156+
}
157+
158+
/**
159+
* Acquire an active use permit if this has not been {@link #clear() cleared}. This is useful in situations where
160+
* callers want to fail-fast and don't need to guarantee reentrancy. Callers should pair this with a corresponding
161+
* {@link #release()}, ideally with a try-finally pattern:
162+
*
163+
* <pre>
164+
* if (!ref.acquireIfAvailable()) {
165+
* return;
166+
* }
167+
* try {
168+
* doSomethingWith(ref.get());
169+
* } finally {
170+
* ref.release();
171+
* }
172+
* </pre>
173+
*
174+
* @return Whether a permit was acquired
175+
*/
176+
public final boolean acquireIfAvailable() {
177+
int currentState;
178+
while (stateAllowsAcquire(currentState = state) && stateIsAvailable(currentState)) {
179+
final int newState = calculateNewStateForAcquire(currentState);
180+
if (tryUpdateState(currentState, newState)) {
181+
return true;
182+
}
183+
}
184+
return false;
185+
}
186+
187+
/**
188+
* Acquire an active use permit and return the referent, if possible. Callers should pair this with a corresponding
189+
* {@link #release()}, ideally with a try-finally pattern:
190+
*
191+
* <pre>
192+
* final Object obj;
193+
* if ((obj = ref.acquireAndGet()) == null) {
194+
* return;
195+
* }
196+
* try {
197+
* doSomethingWith(obj);
198+
* } finally {
199+
* ref.release();
200+
* }
201+
* </pre>
202+
*
203+
* @return The referent, or null if no permit could be acquired
204+
*/
205+
@Nullable
206+
public final REFERENT_TYPE acquireAndGet() {
207+
return acquire() ? referent : null;
208+
}
209+
210+
/**
211+
* Release a single active use permit. It is a serious error to release more permits than acquired. Callers are
212+
* encouraged to use this in the finally block of a try-finally pattern:
213+
*
214+
* <pre>
215+
* if (!ref.acquire()) {
216+
* return;
217+
* }
218+
* try {
219+
* doSomethingWith(ref.get());
220+
* } finally {
221+
* ref.release();
222+
* }
223+
* </pre>
224+
*/
225+
public final void release() {
226+
final int newState = decrementOutstandingPermits();
227+
if (newState < 0) {
228+
throw new IllegalStateException(this + " released more than acquired");
229+
}
230+
maybeReturnReferentToPool(newState);
231+
}
232+
233+
/**
234+
* Clear this reference (and return its referent to the pool) when it no longer has any outstanding permits, which
235+
* may mean immediately if the number of outstanding permits is already zero. All invocations after the first will
236+
* have no effect.
237+
*/
238+
@Override
239+
public final void clear() {
240+
int currentState;
241+
while (stateIsAvailable(currentState = state)) {
242+
final int newState = calculateNewStateForClear(currentState);
243+
if (tryUpdateState(currentState, newState)) {
244+
maybeReturnReferentToPool(newState);
245+
return;
246+
}
247+
}
248+
}
249+
250+
/**
251+
* Return the referent to the pool.
252+
*
253+
* @param referent The referent to return
254+
*/
255+
protected abstract void returnReferentToPool(@NotNull REFERENT_TYPE referent);
256+
257+
private void maybeReturnReferentToPool(int newState) {
258+
if (stateAllowsAcquire(newState)) {
259+
return;
260+
}
261+
final REFERENT_TYPE localReferent = referent;
262+
referent = null;
263+
returnReferentToPool(localReferent);
264+
}
265+
}

0 commit comments

Comments
 (0)