Skip to content

Commit

Permalink
use single atomic operation to avoid dropping multiple producer events
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Oct 2, 2022
1 parent 0a1fbdd commit 24eb4f4
Showing 1 changed file with 164 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,24 +270,24 @@ private static final class SpscBlockingQueue<T> implements BlockingQueue<T> {
/**
* Amount of times to call {@link Thread#yield()} before calling {@link LockSupport#park()}.
* {@link LockSupport#park()} can be expensive and if the producer is generating data it is likely we will see
* it without park/unpark.
* it without parking.
*/
private static final int POLL_YIELD_SPIN_COUNT =
Integer.getInteger("io.servicetalk.concurrent.internal.blockingIterableYieldSpinCount", 1);
/**
* Amount of nanoseconds to spin on {@link Thread#yield()} before calling {@link LockSupport#parkNanos(long)}.
* {@link LockSupport#parkNanos(long)} can be expensive and if the producer is generating data it is likely
* we will see it without park/unpark.
* we will see it without parking.
*/
private static final long POLL_YIELD_SPIN_NS =
Long.getLong("io.servicetalk.concurrent.internal.blockingIterableYieldSpinNs", 1024);
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SpscBlockingQueue, Thread> consumerThreadUpdater =
AtomicReferenceFieldUpdater.newUpdater(SpscBlockingQueue.class, Thread.class, "consumerThread");
private static final AtomicReferenceFieldUpdater<SpscBlockingQueue, ThreadStamp> threadStampUpdater =
AtomicReferenceFieldUpdater.newUpdater(SpscBlockingQueue.class, ThreadStamp.class, "threadStamp");
private static final Thread PRODUCED_THREAD = new Thread(() -> { });
private final Queue<T> spscQueue;
@Nullable
private volatile Thread consumerThread;
private volatile ThreadStamp threadStamp;

SpscBlockingQueue(Queue<T> spscQueue) {
this.spscQueue = requireNonNull(spscQueue);
Expand All @@ -311,13 +311,6 @@ public boolean offer(final T t) {
return false;
}

private void signalConsumer() {
final Thread thread = consumerThreadUpdater.getAndSet(this, PRODUCED_THREAD);
if (thread != null && thread != PRODUCED_THREAD) {
LockSupport.unpark(thread);
}
}

@Override
public T remove() {
return spscQueue.remove();
Expand Down Expand Up @@ -352,99 +345,157 @@ public boolean offer(final T t, final long timeout, final TimeUnit unit) {
throw new UnsupportedOperationException();
}

private void signalConsumer() {
ThreadStamp nextStamp = null;
for (;;) {
final ThreadStamp currStamp = threadStamp;
if (currStamp == null) {
if (nextStamp == null) {
nextStamp = new ThreadStamp(PRODUCED_THREAD, 1);
} else {
nextStamp.count = 1;
}
if (threadStampUpdater.compareAndSet(this, null, nextStamp)) {
break;
}
} else if (currStamp.thread == PRODUCED_THREAD) {
if (nextStamp == null) {
nextStamp = new ThreadStamp(PRODUCED_THREAD, currStamp.count + 1);
} else {
nextStamp.count = currStamp.count + 1;
}

if (threadStampUpdater.compareAndSet(this, currStamp, nextStamp)) {
break;
}
} else {
assert currStamp.count == 0; // only a single consumer allowed
if (threadStampUpdater.compareAndSet(this, currStamp, null)) {
LockSupport.unpark(currStamp.thread);
break;
}
}
}
}

private T pollAndPark() throws InterruptedException {
T item;
int pollCount = 0;
while ((item = spscQueue.poll()) == null) {
// Benchmarks show that park/unpark is expensive when producer is the EventLoop thread and
// unpark has to wakeup a thread that is parked. Yield has been shown to lower this cost
// on the EventLoop thread and increase throughput in these scenarios.
if (pollCount++ > POLL_YIELD_SPIN_COUNT) {
LockSupport.park();
} else {
Thread.yield();
}
checkInterrupted();
}
return item;
}

@Nullable
private T pollAndPark(final long timeout, final TimeUnit unit) throws InterruptedException {
T item;
final long originalNs = unit.toNanos(timeout);
long remainingNs = originalNs;
long beforeTimeNs = System.nanoTime();
while ((item = spscQueue.poll()) == null) {
// Benchmarks show that park/unpark is expensive when producer is the EventLoop thread and
// unpark has to wakeup a thread that is parked. Yield has been shown to lower this cost
// on the EventLoop thread and increase throughput in these scenarios.
if (originalNs - remainingNs > POLL_YIELD_SPIN_NS) {
LockSupport.parkNanos(remainingNs);
} else {
Thread.yield();
}
checkInterrupted();
final long afterTimeNs = System.nanoTime();
final long durationNs = afterTimeNs - beforeTimeNs;
if (durationNs > remainingNs) {
return null;
}
remainingNs -= durationNs;
beforeTimeNs = afterTimeNs;
}
return item;
}

@Override
public T take() throws InterruptedException {
final Thread currentThread = Thread.currentThread();
ThreadStamp parkStamp = null;
ThreadStamp releaseStamp = null;
for (;;) {
final Thread thread = consumerThread;
if (thread != null && thread != currentThread && thread != PRODUCED_THREAD) {
throwTooManyConsumers(currentThread);
} else if (thread == currentThread ||
consumerThreadUpdater.compareAndSet(this, thread, currentThread)) {
try {
T item;
int pollCount = 0;
while ((item = spscQueue.poll()) == null) {
// Benchmarks show that park/unpark is expensive when producer is the EventLoop thread and
// unpark has to wakeup a thread that is parked. Yield has been shown to lower this cost
// on the EventLoop thread and increase throughput in these scenarios.
if (pollCount++ > POLL_YIELD_SPIN_COUNT) {
LockSupport.park();
} else {
Thread.yield();
}
checkInterrupted();
final ThreadStamp currStamp = threadStamp;
if (currStamp == null) {
if (parkStamp == null) {
parkStamp = new ThreadStamp(currentThread);
} else {
parkStamp.count = 0;
}
if (threadStampUpdater.compareAndSet(this, null, parkStamp)) {
try {
return pollAndPark();
} finally {
threadStampUpdater.compareAndSet(this, parkStamp, null);
}

}
} else if (currStamp.thread == PRODUCED_THREAD) {
if (releaseStamp == null) {
releaseStamp = new ThreadStamp(PRODUCED_THREAD, currStamp.count - 1);
} else {
releaseStamp.count = currStamp.count - 1;
}
if (threadStampUpdater.compareAndSet(this, currStamp, currStamp.count == 1 ? null : releaseStamp)) {
final T item = spscQueue.poll();
assert item != null;
return item;
} finally {
// If this call changed the consumerThread before the poll call we should restore it after.
// This should be done atomically in case another thread has produced concurrently and swapped
// the value to PRODUCED_THREAD.
if (thread != currentThread) {
consumerThreadUpdater.compareAndSet(this, currentThread, null);
}
}
} else {
throwTooManyConsumers(currStamp.thread, currentThread);
}
}
}

@Override
public T poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final Thread currentThread = Thread.currentThread();
ThreadStamp parkStamp = null;
ThreadStamp releaseStamp = null;
for (;;) {
final Thread thread = consumerThread;
if (thread != null && thread != currentThread && thread != PRODUCED_THREAD) {
throwTooManyConsumers(currentThread);
} else if (thread == currentThread ||
consumerThreadUpdater.compareAndSet(this, thread, currentThread)) {
try {
final long originalNs = unit.toNanos(timeout);
long remainingNs = originalNs;
long beforeTimeNs = System.nanoTime();
T item;
while ((item = spscQueue.poll()) == null) {
// Benchmarks show that park/unpark is expensive when producer is the EventLoop thread and
// unpark has to wakeup a thread that is parked. Yield has been shown to lower this cost
// on the EventLoop thread and increase throughput in these scenarios.
if (originalNs - remainingNs > POLL_YIELD_SPIN_NS) {
LockSupport.parkNanos(remainingNs);
} else {
Thread.yield();
}
checkInterrupted();
final long afterTimeNs = System.nanoTime();
final long durationNs = afterTimeNs - beforeTimeNs;
if (durationNs > remainingNs) {
return null;
}
remainingNs -= durationNs;
beforeTimeNs = afterTimeNs;
final ThreadStamp currStamp = threadStamp;
if (currStamp == null) {
if (parkStamp == null) {
parkStamp = new ThreadStamp(currentThread);
} else {
parkStamp.count = 0;
}
if (threadStampUpdater.compareAndSet(this, null, parkStamp)) {
try {
return pollAndPark(timeout, unit);
} finally {
threadStampUpdater.compareAndSet(this, parkStamp, null);
}

}
} else if (currStamp.thread == PRODUCED_THREAD) {
if (releaseStamp == null) {
releaseStamp = new ThreadStamp(PRODUCED_THREAD, currStamp.count - 1);
} else {
releaseStamp.count = currStamp.count - 1;
}
if (threadStampUpdater.compareAndSet(this, currStamp, currStamp.count == 1 ? null : releaseStamp)) {
final T item = spscQueue.poll();
assert item != null;
return item;
} finally {
// If this call changed the consumerThread before the poll call we should restore it after.
// This should be done atomically in case another thread has produced concurrently and swapped
// the value to PRODUCED_THREAD.
if (thread != currentThread) {
consumerThreadUpdater.compareAndSet(this, currentThread, null);
}
}
} else {
throwTooManyConsumers(currStamp.thread, currentThread);
}
}
}

private static void throwTooManyConsumers(Thread currentThread) {
throw new IllegalStateException("Only single consumer allowed, current consumer: " + currentThread);
}

private static void checkInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}

@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
Expand Down Expand Up @@ -565,5 +616,35 @@ public int hashCode() {
public String toString() {
return spscQueue.toString();
}

private static void throwTooManyConsumers(Thread currentConsumer, Thread currentThread) {
throw new IllegalStateException("Only single consumer allowed. Existing consumer: " + currentConsumer
+ " attempted new consumer: " + currentThread);
}

private static void checkInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}

private static final class ThreadStamp {
final Thread thread;
int count;

ThreadStamp(Thread thread) {
this.thread = thread;
}

ThreadStamp(Thread thread, int count) {
this.thread = thread;
this.count = count;
}

@Override
public String toString() {
return "thread: " + thread + " count: " + count;
}
}
}
}

0 comments on commit 24eb4f4

Please sign in to comment.