Skip to content

Commit d036efc

Browse files
Cleanup
1 parent 3c6caca commit d036efc

File tree

8 files changed

+123
-10
lines changed

8 files changed

+123
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright © 2025 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.concurrent.api;
17+
18+
import io.servicetalk.context.api.ContextMap;
19+
20+
import org.openjdk.jmh.annotations.Benchmark;
21+
import org.openjdk.jmh.annotations.BenchmarkMode;
22+
import org.openjdk.jmh.annotations.Fork;
23+
import org.openjdk.jmh.annotations.Measurement;
24+
import org.openjdk.jmh.annotations.Mode;
25+
import org.openjdk.jmh.annotations.OutputTimeUnit;
26+
import org.openjdk.jmh.annotations.Scope;
27+
import org.openjdk.jmh.annotations.Setup;
28+
import org.openjdk.jmh.annotations.State;
29+
import org.openjdk.jmh.annotations.Warmup;
30+
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.function.Function;
33+
34+
/**
35+
*
36+
*/
37+
@Fork(1)
38+
@State(Scope.Benchmark)
39+
@Warmup(iterations = 5, time = 3)
40+
@Measurement(iterations = 5, time = 3)
41+
@OutputTimeUnit(TimeUnit.NANOSECONDS)
42+
@BenchmarkMode(Mode.AverageTime)
43+
public class AsyncContextProviderBenchmark {
44+
45+
/**
46+
* gc profiling of the DefaultAsyncContextProvider shows that the Scope based detachment can be stack allocated
47+
* at least under some conditions.
48+
*
49+
* Benchmark Mode Cnt Score Error Units
50+
* AsyncContextProviderBenchmark.contextRestoreCost avgt 5 3.932 ± 0.022 ns/op
51+
* AsyncContextProviderBenchmark.contextRestoreCost:gc.alloc.rate avgt 5 ≈ 10⁻⁴ MB/sec
52+
* AsyncContextProviderBenchmark.contextRestoreCost:gc.alloc.rate.norm avgt 5 ≈ 10⁻⁶ B/op
53+
* AsyncContextProviderBenchmark.contextRestoreCost:gc.count avgt 5 ≈ 0 counts
54+
* AsyncContextProviderBenchmark.contextSaveAndRestoreCost avgt 5 1.712 ± 0.005 ns/op
55+
* AsyncContextProviderBenchmark.contextSaveAndRestoreCost:gc.alloc.rate avgt 5 ≈ 10⁻⁴ MB/sec
56+
* AsyncContextProviderBenchmark.contextSaveAndRestoreCost:gc.alloc.rate.norm avgt 5 ≈ 10⁻⁷ B/op
57+
* AsyncContextProviderBenchmark.contextSaveAndRestoreCost:gc.count avgt 5 ≈ 0 counts
58+
*/
59+
60+
private static final ContextMap.Key<String> KEY = ContextMap.Key.newKey("test-key", String.class);
61+
private static final String EXPECTED = "hello, world!";
62+
63+
private static Function<String, String> wrappedFunction;
64+
65+
@Setup
66+
public void setup() {
67+
// This will capture the current context
68+
wrappedFunction = AsyncContext.wrapFunction(ignored -> AsyncContext.context().get(KEY));
69+
AsyncContext.context().put(KEY, EXPECTED);
70+
}
71+
72+
@Benchmark
73+
public String contextRestoreCost() {
74+
return wrappedFunction.apply("ignored");
75+
}
76+
77+
@Benchmark
78+
public String contextSaveAndRestoreCost() {
79+
return AsyncContext.wrapFunction(Function.<String>identity()).apply("ignored");
80+
}
81+
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AsyncContext.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,10 @@ public final class AsyncContext {
7272

7373
static {
7474
AsyncContextProvider result = DefaultAsyncContextProvider.INSTANCE;
75-
for (UnaryOperator<AsyncContextProvider> wrapper : asyncProviderWrappers()) {
76-
System.out.println("Wrapping with " + wrapper.getClass().getName());
75+
List<UnaryOperator<AsyncContextProvider>> wrappers = asyncProviderWrappers();
76+
for (UnaryOperator<AsyncContextProvider> wrapper : wrappers) {
7777
result = wrapper.apply(result);
7878
}
79-
System.out.println("Default AsyncContextProvider: " + result.getClass().getName());
8079
DEFAULT_ENABLED_PROVIDER = result;
8180
provider = DEFAULT_ENABLED_PROVIDER;
8281
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AsyncContextProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
/**
3737
* Implementation that backs the {@link AsyncContext}.
3838
*/
39-
public interface AsyncContextProvider {
39+
interface AsyncContextProvider {
4040
/**
4141
* Get the current context.
4242
*

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1730,7 +1730,7 @@ public final Future<Void> toFuture() {
17301730
*/
17311731
ContextMap contextForSubscribe(AsyncContextProvider provider) {
17321732
// the default behavior is to copy the map. Some operators may want to use shared map
1733-
return provider.saveContext().copy(); // TODO: should we have a `saveContextCopy()` for perf reasons?
1733+
return provider.saveContext().copy();
17341734
}
17351735

17361736
/**
@@ -2266,12 +2266,14 @@ private void subscribeWithContext(Subscriber subscriber,
22662266
AsyncContextProvider contextProvider, ContextMap contextMap) {
22672267
requireNonNull(subscriber);
22682268
Subscriber wrapped = contextProvider.wrapCancellable(subscriber, contextMap);
2269-
if (contextProvider.context() == contextMap) { // TODO: this shortcut will not work anymore if `saveContext()` returns a result different than `context()`. Maybe a provider.isActive(context) would work?
2269+
if (contextProvider.context() == contextMap) {
22702270
// No need to wrap as we are sharing the AsyncContext
22712271
handleSubscribe(wrapped, contextMap, contextProvider);
22722272
} else {
22732273
// Ensure that AsyncContext used for handleSubscribe() is the contextMap for the subscribe()
2274-
contextProvider.wrapRunnable(() -> handleSubscribe(wrapped, contextMap, contextProvider), contextMap).run();
2274+
try(Scope unused = contextProvider.attachContext(contextMap)) {
2275+
handleSubscribe(wrapped, contextMap, contextProvider);
2276+
}
22752277
}
22762278
}
22772279

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -4867,7 +4867,9 @@ private void subscribeWithContext(Subscriber<? super T> subscriber,
48674867
handleSubscribe(wrapped, contextMap, provider);
48684868
} else {
48694869
// Ensure that AsyncContext used for handleSubscribe() is the contextMap for the subscribe()
4870-
provider.wrapRunnable(() -> handleSubscribe(wrapped, contextMap, provider), contextMap).run();
4870+
try (Scope ignored = provider.attachContext(contextMap)) {
4871+
handleSubscribe(wrapped, contextMap, provider);
4872+
}
48714873
}
48724874
}
48734875

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Scope.java

+28
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,35 @@
1+
/*
2+
* Copyright © 2025 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.servicetalk.concurrent.api;
217

18+
import io.servicetalk.context.api.ContextMap;
19+
20+
/**
21+
* An abstraction for detaching a context from the current thread.
22+
*
23+
* This abstraction is intended to allow the modifications performed by
24+
* {@link AsyncContextProvider#attachContext(ContextMap)} to be undone. In practice, this may look like restoring
25+
* a {@link ThreadLocal} to the state it had before the call to {@link AsyncContextProvider#attachContext(ContextMap)}
26+
* call.
27+
*/
328
public interface Scope extends AutoCloseable {
429

30+
/**
31+
* No-op {@link Scope}.
32+
*/
533
Scope NOOP = () -> {};
634

735
@Override

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -2711,7 +2711,9 @@ private void subscribeWithContext(Subscriber<? super T> subscriber,
27112711
handleSubscribe(wrapped, contextMap, contextProvider);
27122712
} else {
27132713
// Ensure that AsyncContext used for handleSubscribe() is the contextMap for the subscribe()
2714-
contextProvider.wrapRunnable(() -> handleSubscribe(wrapped, contextMap, contextProvider), contextMap).run();
2714+
try (Scope ignored = contextProvider.attachContext(contextMap)) {
2715+
handleSubscribe(wrapped, contextMap, contextProvider);
2716+
}
27152717
}
27162718
}
27172719

servicetalk-opentelemetry-http/src/main/java/io/servicetalk/opentelemetry/http/OpenTelemetryHttpRequestFilter.java

-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
172172
private Single<StreamingHttpResponse> trackRequest(final StreamingHttpRequester delegate,
173173
final StreamingHttpRequest request) {
174174
final Context parentContext = Context.current();
175-
System.out.println(OpenTelemetryHttpRequestFilter.class.getSimpleName() + ": Current context: " + parentContext + ", thread: " + Thread.currentThread());
176175
final Context context = instrumenter.start(parentContext, request);
177176

178177
final Scope scope = context.makeCurrent();

0 commit comments

Comments
 (0)