Skip to content

Commit 85fd41b

Browse files
authored
Do not cancel subscription on BlockingIterable#hasNext(long, TimeUnit) (#3128)
* Do not cancel subscription on BlockingIterable#hasNext(long, TimeUnit) This makes sure that if this method, or in extend next(long, TimeUnit) is called and times out, the caller is able to retry the operation and also the upstream source will not be cancelled. In the context of a Blocking Streaming server, this means that if a timeout is thrown on the incoming request, the outgoing response can still be modified since the underlying socket will not be immediately closed. It also aligns the semantics with Single#toFuture where a blocking get with a timeout on the future also does not cancel the upstream Single. A (temporary) system property is introduced which allows to fall back to the old behavior should incompatibilities be discovered in the wild. A note for the reader who wonders how to close the subscription now: the close() method always did cancel the subscription and continues to do so.
1 parent b4be7d1 commit 85fd41b

File tree

3 files changed

+61
-24
lines changed

3 files changed

+61
-24
lines changed

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public BlockingIterator<T> iterator() {
7272
}
7373

7474
private static final class SubscriberAndIterator<T> implements Subscriber<T>, BlockingIterator<T> {
75+
/**
76+
* Allows to re-enable cancelling the subscription on {@link #hasNext(long, TimeUnit)} timeout. This flag
77+
* will be removed after a couple releases and no issues identified with the new behavior.
78+
*/
79+
private static final boolean CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT = Boolean
80+
.getBoolean("io.servicetalk.concurrent.api.cancelSubscriptionOnHasNextTimeout");
7581
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberAndIterator.class);
7682
private static final Object CANCELLED_SIGNAL = new Object();
7783
private static final TerminalNotification COMPLETE_NOTIFICATION = complete();
@@ -172,7 +178,9 @@ public boolean hasNext(final long timeout, final TimeUnit unit) throws TimeoutEx
172178
next = data.poll(timeout, unit);
173179
if (next == null) {
174180
terminated = true;
175-
subscription.cancel();
181+
if (CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT) {
182+
subscription.cancel();
183+
}
176184
throw new TimeoutException("timed out after: " + timeout + " units: " + unit);
177185
}
178186
requestMoreIfRequired();

servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterableTest.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void errorEmittedIsThrown() {
7171
DeliberateException de = new DeliberateException();
7272
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
7373
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
74-
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
74+
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
7575
}
7676

7777
@Test
@@ -80,7 +80,7 @@ void doubleHashNextWithError() {
8080
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
8181
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
8282
assertThat("Second hasNext inconsistent with first.", iterator.hasNext(), is(true));
83-
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
83+
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
8484
}
8585

8686
@Test
@@ -93,7 +93,7 @@ void hasNextWithEmpty() {
9393
void nextWithEmpty() {
9494
Iterator<Integer> iterator = Publisher.<Integer>empty().toIterable().iterator();
9595
assertThat("Item not expected but found.", iterator.hasNext(), is(false));
96-
assertThrows(NoSuchElementException.class, () -> iterator.next());
96+
assertThrows(NoSuchElementException.class, iterator::next);
9797
}
9898

9999
@Test
@@ -109,7 +109,10 @@ void hasNextWithTimeout() throws Exception {
109109

110110
assertThrows(TimeoutException.class, () -> iterator.hasNext(10, MILLISECONDS));
111111
assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
112-
assertTrue(subscription.isCancelled());
112+
113+
assertThat(subscription.isCancelled(), is(false));
114+
iterator.close();
115+
assertThat(subscription.isCancelled(), is(true));
113116
}
114117

115118
@Test
@@ -124,9 +127,11 @@ void nextWithTimeout() throws Exception {
124127
assertThat("Unexpected item found.", iterator.next(-1, MILLISECONDS), is(2));
125128

126129
assertThrows(TimeoutException.class, () -> iterator.next(10, MILLISECONDS));
127-
128130
assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
129-
assertTrue(subscription.isCancelled());
131+
132+
assertThat(subscription.isCancelled(), is(false));
133+
iterator.close();
134+
assertThat(subscription.isCancelled(), is(true));
130135
}
131136

132137
@Test
@@ -173,7 +178,7 @@ void nextWithoutHasNextAndTerminal() {
173178
source.onNext(2);
174179
assertThat("Unexpected item found.", iterator.next(), is(2));
175180
source.onComplete();
176-
assertThrows(NoSuchElementException.class, () -> iterator.next());
181+
assertThrows(NoSuchElementException.class, iterator::next);
177182
}
178183

179184
@Test
@@ -234,7 +239,7 @@ void delayOnNextThenError() {
234239
DeliberateException de = new DeliberateException();
235240
source.onError(de);
236241
assertThat("Item not expected but found.", iterator.hasNext(), is(true));
237-
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
242+
Exception e = assertThrows(DeliberateException.class, iterator::next);
238243
assertThat(e, is(de));
239244
}
240245

@@ -310,7 +315,7 @@ void queueFullButAccommodatesOnError() {
310315
source.onError(de);
311316
verifyNextIs(iterator, 1);
312317
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
313-
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
318+
Exception e = assertThrows(DeliberateException.class, iterator::next);
314319
assertThat(e, sameInstance(de));
315320
}
316321

servicetalk-concurrent/src/main/java/io/servicetalk/concurrent/BlockingIterable.java

+38-14
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ public interface BlockingIterable<T> extends CloseableIterable<T> {
4040
@Override
4141
BlockingIterator<T> iterator();
4242

43+
@Override
44+
default void forEach(final Consumer<? super T> action) {
45+
try (BlockingIterator<T> iterator = iterator()) {
46+
while (iterator.hasNext()) {
47+
action.accept(iterator.next());
48+
}
49+
} catch (RuntimeException ex) {
50+
throw ex;
51+
} catch (Exception ex) {
52+
throw new RuntimeException(ex);
53+
}
54+
}
55+
4356
/**
4457
* Mimics the behavior of {@link #forEach(Consumer)} but uses the {@code timeoutSupplier} to determine the timeout
4558
* value for interactions with the {@link BlockingIterator}.
@@ -64,9 +77,14 @@ public interface BlockingIterable<T> extends CloseableIterable<T> {
6477
default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, TimeUnit unit)
6578
throws TimeoutException {
6679
requireNonNull(action);
67-
BlockingIterator<T> iterator = iterator();
68-
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
69-
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
80+
try (BlockingIterator<T> iterator = iterator()) {
81+
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
82+
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
83+
}
84+
} catch (TimeoutException | RuntimeException ex) {
85+
throw ex;
86+
} catch (Exception ex) {
87+
throw new RuntimeException(ex);
7088
}
7189
}
7290

@@ -96,18 +114,24 @@ default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, T
96114
*/
97115
default void forEach(Consumer<? super T> action, long timeout, TimeUnit unit) throws TimeoutException {
98116
requireNonNull(action);
99-
BlockingIterator<T> iterator = iterator();
100-
long remainingTimeoutNanos = unit.toNanos(timeout);
101-
long timeStampANanos = nanoTime();
102-
while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) {
103-
final long timeStampBNanos = nanoTime();
104-
remainingTimeoutNanos -= timeStampBNanos - timeStampANanos;
105-
// We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout of
106-
// <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a fallback value.
107-
action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS));
117+
try (BlockingIterator<T> iterator = iterator()) {
118+
long remainingTimeoutNanos = unit.toNanos(timeout);
119+
long timeStampANanos = nanoTime();
120+
while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) {
121+
final long timeStampBNanos = nanoTime();
122+
remainingTimeoutNanos -= timeStampBNanos - timeStampANanos;
123+
// We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout
124+
// of <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a
125+
// fallback value.
126+
action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS));
108127

109-
timeStampANanos = nanoTime();
110-
remainingTimeoutNanos -= timeStampANanos - timeStampBNanos;
128+
timeStampANanos = nanoTime();
129+
remainingTimeoutNanos -= timeStampANanos - timeStampBNanos;
130+
}
131+
} catch (TimeoutException | RuntimeException ex) {
132+
throw ex;
133+
} catch (Exception ex) {
134+
throw new RuntimeException(ex);
111135
}
112136
}
113137

0 commit comments

Comments
 (0)