Skip to content

Commit 45c3eae

Browse files
concurrent-api: make context capture more generic
Motivation: Why is this change being made? Modifications: - List the changes Result: What is the result of this change?
1 parent 47601da commit 45c3eae

File tree

95 files changed

+931
-880
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+931
-880
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.concurrent.api;
17+
18+
import io.servicetalk.concurrent.CompletableSource;
19+
import io.servicetalk.concurrent.PublisherSource.Subscriber;
20+
import io.servicetalk.concurrent.SingleSource;
21+
import io.servicetalk.context.api.ContextMap;
22+
23+
import javax.annotation.Nonnull;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.Executor;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.ScheduledExecutorService;
29+
import java.util.function.BiConsumer;
30+
import java.util.function.BiFunction;
31+
import java.util.function.Consumer;
32+
import java.util.function.Function;
33+
34+
abstract class AbstractAsyncContextProvider implements AsyncContextProvider {
35+
36+
AbstractAsyncContextProvider() {
37+
// singleton
38+
}
39+
40+
@Nonnull
41+
@Override
42+
public final ContextMap context() {
43+
return ContextMapThreadLocal.context();
44+
}
45+
46+
@Override
47+
public final CompletableSource.Subscriber wrapCancellable(final CompletableSource.Subscriber subscriber,
48+
final CapturedContext context) {
49+
if (subscriber instanceof ContextPreservingCompletableSubscriber) {
50+
final ContextPreservingCompletableSubscriber s = (ContextPreservingCompletableSubscriber) subscriber;
51+
if (s.saved == context) {
52+
return subscriber instanceof ContextPreservingCompletableSubscriberAndCancellable ? subscriber :
53+
new ContextPreservingCompletableSubscriberAndCancellable(s.subscriber, context);
54+
}
55+
} else if (subscriber instanceof ContextPreservingCancellableCompletableSubscriber &&
56+
((ContextPreservingCancellableCompletableSubscriber) subscriber).saved == context) {
57+
// no need to check for instanceof ContextPreservingCompletableSubscriberAndCancellable, because
58+
// it extends from ContextPreservingSingleSubscriber.
59+
return subscriber;
60+
}
61+
return new ContextPreservingCancellableCompletableSubscriber(subscriber, context);
62+
}
63+
64+
@Override
65+
public final CompletableSource.Subscriber wrapCompletableSubscriber(final CompletableSource.Subscriber subscriber,
66+
final CapturedContext context) {
67+
if (subscriber instanceof ContextPreservingCancellableCompletableSubscriber) {
68+
final ContextPreservingCancellableCompletableSubscriber s =
69+
(ContextPreservingCancellableCompletableSubscriber) subscriber;
70+
if (s.saved == context) {
71+
// replace current wrapper with wrapper that includes Subscriber and Cancellable
72+
return new ContextPreservingCompletableSubscriberAndCancellable(s.subscriber, context);
73+
}
74+
} else if (subscriber instanceof ContextPreservingCompletableSubscriber &&
75+
((ContextPreservingCompletableSubscriber) subscriber).saved == context) {
76+
// no need to check for instanceof ContextPreservingCompletableSubscriberAndCancellable, because
77+
// it extends from ContextPreservingCompletableSubscriber.
78+
return subscriber;
79+
}
80+
return new ContextPreservingCompletableSubscriber(subscriber, context);
81+
}
82+
83+
@Override
84+
public final CompletableSource.Subscriber wrapCompletableSubscriberAndCancellable(
85+
final CompletableSource.Subscriber subscriber, final CapturedContext context) {
86+
if (subscriber instanceof ContextPreservingCompletableSubscriber) {
87+
final ContextPreservingCompletableSubscriber s = (ContextPreservingCompletableSubscriber) subscriber;
88+
if (s.saved == context) {
89+
return subscriber instanceof ContextPreservingCompletableSubscriberAndCancellable ? subscriber :
90+
new ContextPreservingCompletableSubscriberAndCancellable(s.subscriber, context);
91+
}
92+
} else if (subscriber instanceof ContextPreservingCancellableCompletableSubscriber) {
93+
final ContextPreservingCancellableCompletableSubscriber s =
94+
(ContextPreservingCancellableCompletableSubscriber) subscriber;
95+
if (s.saved == context) {
96+
return new ContextPreservingCompletableSubscriberAndCancellable(s.subscriber, context);
97+
}
98+
}
99+
return new ContextPreservingCompletableSubscriberAndCancellable(subscriber, context);
100+
}
101+
102+
@Override
103+
public final <T> SingleSource.Subscriber<T> wrapCancellable(final SingleSource.Subscriber<T> subscriber,
104+
final CapturedContext context) {
105+
if (subscriber instanceof ContextPreservingSingleSubscriber) {
106+
final ContextPreservingSingleSubscriber<T> s = (ContextPreservingSingleSubscriber<T>) subscriber;
107+
if (s.saved == context) {
108+
return subscriber instanceof ContextPreservingSingleSubscriberAndCancellable ? subscriber :
109+
new ContextPreservingSingleSubscriberAndCancellable<>(s.subscriber, context);
110+
}
111+
} else if (subscriber instanceof ContextPreservingCancellableSingleSubscriber &&
112+
((ContextPreservingCancellableSingleSubscriber<T>) subscriber).saved == context) {
113+
// no need to check for instanceof ContextPreservingSingleSubscriberAndCancellable, because
114+
// it extends from ContextPreservingSingleSubscriber.
115+
return subscriber;
116+
}
117+
return new ContextPreservingCancellableSingleSubscriber<>(subscriber, context);
118+
}
119+
120+
@Override
121+
public final <T> SingleSource.Subscriber<T> wrapSingleSubscriber(final SingleSource.Subscriber<T> subscriber,
122+
final CapturedContext context) {
123+
if (subscriber instanceof ContextPreservingCancellableSingleSubscriber) {
124+
final ContextPreservingCancellableSingleSubscriber<T> s =
125+
(ContextPreservingCancellableSingleSubscriber<T>) subscriber;
126+
if (s.saved == context) {
127+
return new ContextPreservingSingleSubscriberAndCancellable<>(s.subscriber, context);
128+
}
129+
} else if (subscriber instanceof ContextPreservingSingleSubscriber &&
130+
((ContextPreservingSingleSubscriber<T>) subscriber).saved == context) {
131+
// no need to check for instanceof ContextPreservingSingleSubscriberAndCancellable, because
132+
// it extends from ContextPreservingSingleSubscriber.
133+
return subscriber;
134+
}
135+
return new ContextPreservingSingleSubscriber<>(subscriber, context);
136+
}
137+
138+
@Override
139+
public final <T> SingleSource.Subscriber<T> wrapSingleSubscriberAndCancellable(
140+
final SingleSource.Subscriber<T> subscriber, final CapturedContext context) {
141+
if (subscriber instanceof ContextPreservingSingleSubscriber) {
142+
final ContextPreservingSingleSubscriber<T> s = (ContextPreservingSingleSubscriber<T>) subscriber;
143+
if (s.saved == context) {
144+
return subscriber instanceof ContextPreservingSingleSubscriberAndCancellable ? subscriber :
145+
new ContextPreservingSingleSubscriberAndCancellable<>(s.subscriber, context);
146+
}
147+
} else if (subscriber instanceof ContextPreservingCancellableSingleSubscriber) {
148+
final ContextPreservingCancellableSingleSubscriber<T> s =
149+
(ContextPreservingCancellableSingleSubscriber<T>) subscriber;
150+
if (s.saved == context) {
151+
return new ContextPreservingSingleSubscriberAndCancellable<>(s.subscriber, context);
152+
}
153+
}
154+
return new ContextPreservingSingleSubscriberAndCancellable<>(subscriber, context);
155+
}
156+
157+
@Override
158+
public final <T> Subscriber<T> wrapSubscription(final Subscriber<T> subscriber, final CapturedContext context) {
159+
if (subscriber instanceof ContextPreservingSubscriber) {
160+
final ContextPreservingSubscriber<T> s = (ContextPreservingSubscriber<T>) subscriber;
161+
if (s.saved == context) {
162+
return subscriber instanceof ContextPreservingSubscriberAndSubscription ? subscriber :
163+
new ContextPreservingSubscriberAndSubscription<>(s.subscriber, context);
164+
}
165+
} else if (subscriber instanceof ContextPreservingSubscriptionSubscriber &&
166+
((ContextPreservingSubscriptionSubscriber<T>) subscriber).saved == context) {
167+
// no need to check for instanceof ContextPreservingSubscriberAndSubscription, because
168+
// it extends from ContextPreservingSubscriptionSubscriber.
169+
return subscriber;
170+
}
171+
return new ContextPreservingSubscriptionSubscriber<>(subscriber, context);
172+
}
173+
174+
@Override
175+
public final <T> Subscriber<T> wrapPublisherSubscriber(final Subscriber<T> subscriber, final CapturedContext context) {
176+
if (subscriber instanceof ContextPreservingSubscriptionSubscriber) {
177+
final ContextPreservingSubscriptionSubscriber<T> s =
178+
(ContextPreservingSubscriptionSubscriber<T>) subscriber;
179+
if (s.saved == context) {
180+
return new ContextPreservingSubscriberAndSubscription<>(s.subscriber, context);
181+
}
182+
} else if (subscriber instanceof ContextPreservingSubscriber &&
183+
((ContextPreservingSubscriber<T>) subscriber).saved == context) {
184+
// no need to check for instanceof ContextPreservingSubscriberAndSubscription, because
185+
// it extends from ContextPreservingSubscriptionSubscriber.
186+
return subscriber;
187+
}
188+
return new ContextPreservingSubscriber<>(subscriber, context);
189+
}
190+
191+
@Override
192+
public final <T> Subscriber<T> wrapPublisherSubscriberAndSubscription(final Subscriber<T> subscriber,
193+
final CapturedContext context) {
194+
if (subscriber instanceof ContextPreservingSubscriber) {
195+
final ContextPreservingSubscriber<T> s = (ContextPreservingSubscriber<T>) subscriber;
196+
if (s.saved == context) {
197+
return subscriber instanceof ContextPreservingSubscriberAndSubscription ? subscriber :
198+
new ContextPreservingSubscriberAndSubscription<>(s.subscriber, context);
199+
}
200+
} else if (subscriber instanceof ContextPreservingSubscriptionSubscriber) {
201+
final ContextPreservingSubscriptionSubscriber<T> s =
202+
(ContextPreservingSubscriptionSubscriber<T>) subscriber;
203+
if (s.saved == context) {
204+
return new ContextPreservingSubscriberAndSubscription<>(s.subscriber, context);
205+
}
206+
}
207+
return new ContextPreservingSubscriberAndSubscription<>(subscriber, context);
208+
}
209+
210+
@Override
211+
public final Executor wrapJdkExecutor(final Executor executor) {
212+
return ContextPreservingExecutor.of(executor);
213+
}
214+
215+
@Override
216+
public final ExecutorService wrapJdkExecutorService(final ExecutorService executor) {
217+
return ContextPreservingExecutorService.of(executor);
218+
}
219+
220+
@Override
221+
public final io.servicetalk.concurrent.api.Executor wrapExecutor(final io.servicetalk.concurrent.api.Executor executor) {
222+
return ContextPreservingStExecutor.of(executor);
223+
}
224+
225+
@Override
226+
public final ScheduledExecutorService wrapJdkScheduledExecutorService(final ScheduledExecutorService executor) {
227+
return ContextPreservingScheduledExecutorService.of(executor);
228+
}
229+
230+
@Override
231+
public final <T> CompletableFuture<T> wrapCompletableFuture(final CompletableFuture<T> future,
232+
final CapturedContext context) {
233+
return ContextPreservingCompletableFuture.newContextPreservingFuture(future, context);
234+
}
235+
236+
@Override
237+
public final Runnable wrapRunnable(final Runnable runnable, final CapturedContext context) {
238+
return new ContextPreservingRunnable(runnable, context);
239+
}
240+
241+
@Override
242+
public final <V> Callable<V> wrapCallable(final Callable<V> callable, final CapturedContext context) {
243+
return new ContextPreservingCallable<>(callable, context);
244+
}
245+
246+
@Override
247+
public final <T> Consumer<T> wrapConsumer(final Consumer<T> consumer, final CapturedContext context) {
248+
return new ContextPreservingConsumer<>(consumer, context);
249+
}
250+
251+
@Override
252+
public final <T, U> Function<T, U> wrapFunction(final Function<T, U> func, final CapturedContext context) {
253+
return new ContextPreservingFunction<>(func, context);
254+
}
255+
256+
@Override
257+
public final <T, U> BiConsumer<T, U> wrapBiConsumer(final BiConsumer<T, U> consumer, final CapturedContext context) {
258+
return new ContextPreservingBiConsumer<>(consumer, context);
259+
}
260+
261+
@Override
262+
public final <T, U, V> BiFunction<T, U, V> wrapBiFunction(final BiFunction<T, U, V> func, final CapturedContext context) {
263+
return new ContextPreservingBiFunction<>(func, context);
264+
}
265+
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractAsynchronousCompletableOperator.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package io.servicetalk.concurrent.api;
1717

18-
import io.servicetalk.context.api.ContextMap;
19-
2018
import static java.util.Objects.requireNonNull;
2119

2220
/**
@@ -38,13 +36,13 @@ abstract class AbstractAsynchronousCompletableOperator extends AbstractNoHandleS
3836

3937
@Override
4038
final void handleSubscribe(Subscriber subscriber,
41-
ContextMap contextMap, AsyncContextProvider contextProvider) {
39+
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
4240
// The AsyncContext needs to be preserved when ever we interact with the original Subscriber, so we wrap it here
4341
// with the original contextMap. Otherwise some other context may leak into this subscriber chain from the other
4442
// side of the asynchronous boundary.
4543
final Subscriber operatorSubscriber =
46-
contextProvider.wrapCompletableSubscriberAndCancellable(subscriber, contextMap);
44+
contextProvider.wrapCompletableSubscriberAndCancellable(subscriber, capturedContext);
4745
final Subscriber upstreamSubscriber = apply(operatorSubscriber);
48-
original.delegateSubscribe(upstreamSubscriber, contextMap, contextProvider);
46+
original.delegateSubscribe(upstreamSubscriber, capturedContext, contextProvider);
4947
}
5048
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractAsynchronousPublisherOperator.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package io.servicetalk.concurrent.api;
1717

18-
import io.servicetalk.context.api.ContextMap;
19-
2018
import static java.util.Objects.requireNonNull;
2119

2220
/**
@@ -41,13 +39,13 @@ abstract class AbstractAsynchronousPublisherOperator<T, R> extends AbstractNoHan
4139

4240
@Override
4341
final void handleSubscribe(Subscriber<? super R> subscriber,
44-
ContextMap contextMap, AsyncContextProvider contextProvider) {
42+
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
4543
// The AsyncContext needs to be preserved when ever we interact with the original Subscriber, so we wrap it here
4644
// with the original contextMap. Otherwise some other context may leak into this subscriber chain from the other
4745
// side of the asynchronous boundary.
4846
final Subscriber<? super R> operatorSubscriber =
49-
contextProvider.wrapPublisherSubscriberAndSubscription(subscriber, contextMap);
47+
contextProvider.wrapPublisherSubscriberAndSubscription(subscriber, capturedContext);
5048
final Subscriber<? super T> upstreamSubscriber = apply(operatorSubscriber);
51-
original.delegateSubscribe(upstreamSubscriber, contextMap, contextProvider);
49+
original.delegateSubscribe(upstreamSubscriber, capturedContext, contextProvider);
5250
}
5351
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractAsynchronousSingleOperator.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package io.servicetalk.concurrent.api;
1717

18-
import io.servicetalk.context.api.ContextMap;
19-
2018
import static java.util.Objects.requireNonNull;
2119

2220
/**
@@ -41,13 +39,13 @@ abstract class AbstractAsynchronousSingleOperator<T, R> extends AbstractNoHandle
4139

4240
@Override
4341
final void handleSubscribe(Subscriber<? super R> subscriber,
44-
ContextMap contextMap, AsyncContextProvider contextProvider) {
42+
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
4543
// The AsyncContext needs to be preserved when ever we interact with the original Subscriber, so we wrap it here
4644
// with the original contextMap. Otherwise some other context may leak into this subscriber chain from the other
4745
// side of the asynchronous boundary.
4846
final Subscriber<? super R> operatorSubscriber =
49-
contextProvider.wrapSingleSubscriberAndCancellable(subscriber, contextMap);
47+
contextProvider.wrapSingleSubscriberAndCancellable(subscriber, capturedContext);
5048
final Subscriber<? super T> upstreamSubscriber = apply(operatorSubscriber);
51-
original.delegateSubscribe(upstreamSubscriber, contextMap, contextProvider);
49+
original.delegateSubscribe(upstreamSubscriber, capturedContext, contextProvider);
5250
}
5351
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractCompletableAndSingleConcatenated.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,20 @@
1818
import io.servicetalk.concurrent.Cancellable;
1919
import io.servicetalk.concurrent.CompletableSource;
2020
import io.servicetalk.concurrent.internal.SequentialCancellable;
21-
import io.servicetalk.context.api.ContextMap;
2221

2322
import javax.annotation.Nullable;
2423

2524
abstract class AbstractCompletableAndSingleConcatenated<T> extends AbstractNoHandleSubscribeSingle<T> {
2625

2726
@Override
2827
protected void handleSubscribe(final Subscriber<? super T> subscriber,
29-
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
30-
final Subscriber<? super T> wrappedSubscriber = contextProvider.wrapSingleSubscriber(subscriber, contextMap);
31-
delegateSubscribeToOriginal(wrappedSubscriber, contextMap, contextProvider);
28+
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
29+
final Subscriber<? super T> wrappedSubscriber = contextProvider.wrapSingleSubscriber(subscriber, capturedContext);
30+
delegateSubscribeToOriginal(wrappedSubscriber, capturedContext, contextProvider);
3231
}
3332

3433
abstract void delegateSubscribeToOriginal(Subscriber<? super T> offloadSubscriber,
35-
ContextMap contextMap, AsyncContextProvider contextProvider);
34+
CapturedContext capturedContext, AsyncContextProvider contextProvider);
3635

3736
abstract static class AbstractConcatWithSubscriber<T> implements Subscriber<T>, CompletableSource.Subscriber {
3837

0 commit comments

Comments
 (0)