Skip to content

Commit d02f655

Browse files
Prevent concurrent execution of the same mutable request object
Motivation: Our `HttpRequestMetaData` object is mutable, and we expect users to create a new request every time they need to make a new call. Sequential retries are acceptable, but concurrent execution can corrupt internal state. While these expectations are more clear for HTTP users, with gRPC it gets less obvious that they can not subscribe to the same returned `Single<Message>` concurrently. Modifications: - Enhance `FilterableClientToClient` to protect users from concurrent execution of the same request, while still allowing sequential retries. - Verify concurrent execution is not allowed for HTTP and gRPC. Result: Users get `RejectedSubscribeException` if they subscribe to the same Single that shares underlying meta-data object concurrently.
1 parent 8545083 commit d02f655

File tree

5 files changed

+598
-17
lines changed

5 files changed

+598
-17
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Copyright © 2025 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.grpc.netty;
17+
18+
import io.servicetalk.concurrent.CompletableSource;
19+
import io.servicetalk.concurrent.api.Executor;
20+
import io.servicetalk.concurrent.api.ExecutorExtension;
21+
import io.servicetalk.concurrent.api.Processors;
22+
import io.servicetalk.concurrent.api.Publisher;
23+
import io.servicetalk.concurrent.api.Single;
24+
import io.servicetalk.concurrent.api.SourceAdapters;
25+
import io.servicetalk.concurrent.internal.RejectedSubscribeException;
26+
import io.servicetalk.grpc.api.DefaultGrpcClientMetadata;
27+
import io.servicetalk.grpc.api.GrpcClientMetadata;
28+
import io.servicetalk.grpc.api.GrpcServiceContext;
29+
import io.servicetalk.grpc.api.GrpcStatusException;
30+
import io.servicetalk.grpc.netty.TesterProto.TestRequest;
31+
import io.servicetalk.grpc.netty.TesterProto.TestResponse;
32+
import io.servicetalk.grpc.netty.TesterProto.Tester.ClientFactory;
33+
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
34+
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
35+
import io.servicetalk.transport.api.ServerContext;
36+
37+
import org.junit.jupiter.api.AfterEach;
38+
import org.junit.jupiter.api.extension.RegisterExtension;
39+
import org.junit.jupiter.params.ParameterizedTest;
40+
import org.junit.jupiter.params.provider.Arguments;
41+
import org.junit.jupiter.params.provider.MethodSource;
42+
43+
import java.util.ArrayList;
44+
import java.util.Collections;
45+
import java.util.List;
46+
import java.util.concurrent.CountDownLatch;
47+
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.Future;
49+
import java.util.concurrent.atomic.AtomicInteger;
50+
import javax.annotation.Nullable;
51+
52+
import static io.servicetalk.concurrent.api.ExecutorExtension.withCachedExecutor;
53+
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
54+
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
55+
import static org.hamcrest.MatcherAssert.assertThat;
56+
import static org.hamcrest.Matchers.instanceOf;
57+
import static org.hamcrest.Matchers.is;
58+
import static org.junit.jupiter.api.Assertions.assertThrows;
59+
60+
class ConcurrentGrpcRequestTest {
61+
62+
private enum AsyncVariant {
63+
TEST,
64+
TEST_REQUEST_STREAM,
65+
TEST_RESPONSE_STREAM,
66+
TEST_BI_DI_STREAM,
67+
BLOCKING_TEST,
68+
BLOCKING_TEST_REQUEST_STREAM,
69+
BLOCKING_TEST_RESPONSE_STREAM,
70+
BLOCKING_TEST_BI_DI_STREAM
71+
}
72+
73+
@RegisterExtension
74+
static final ExecutorExtension<Executor> executorExtension = withCachedExecutor().setClassLevel(true);
75+
76+
private final CountDownLatch receivedFirstRequest = new CountDownLatch(1);
77+
private final AtomicInteger receivedRequests = new AtomicInteger();
78+
private final CompletableSource.Processor responseProcessor = Processors.newCompletableProcessor();
79+
private final ServerContext serverCtx;
80+
81+
ConcurrentGrpcRequestTest() throws Exception {
82+
serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(new TesterService() {
83+
@Override
84+
public Single<TestResponse> test(GrpcServiceContext ctx, TestRequest request) {
85+
return testSingle(Publisher.from(request));
86+
}
87+
88+
@Override
89+
public Single<TestResponse> testRequestStream(GrpcServiceContext ctx, Publisher<TestRequest> request) {
90+
return testSingle(request);
91+
}
92+
93+
@Override
94+
public Publisher<TestResponse> testResponseStream(GrpcServiceContext ctx, TestRequest request) {
95+
return testSingle(Publisher.from(request)).toPublisher();
96+
}
97+
98+
@Override
99+
public Publisher<TestResponse> testBiDiStream(GrpcServiceContext ctx, Publisher<TestRequest> request) {
100+
return testSingle(request).toPublisher();
101+
}
102+
103+
private Single<TestResponse> testSingle(Publisher<TestRequest> request) {
104+
receivedFirstRequest.countDown();
105+
if (receivedRequests.incrementAndGet() == 1) {
106+
return SourceAdapters.fromSource(responseProcessor)
107+
.concat(request.ignoreElements())
108+
.concat(Single.succeeded(TestResponse.newBuilder().setMessage("first").build()));
109+
}
110+
return Single.succeeded(TestResponse.newBuilder().setMessage("other").build());
111+
}
112+
});
113+
}
114+
115+
@AfterEach
116+
void tearDown() throws Exception {
117+
serverCtx.close();
118+
}
119+
120+
private static List<Arguments> asyncVariants() {
121+
List<Arguments> arguments = new ArrayList<>();
122+
for (AsyncVariant variant : AsyncVariant.values()) {
123+
arguments.add(Arguments.of(true, variant));
124+
// Blocking calls without metadata always create a new underlying request, there is no risk
125+
if (!variant.name().startsWith("BLOCKING")) {
126+
arguments.add(Arguments.of(false, variant));
127+
}
128+
}
129+
return arguments;
130+
}
131+
132+
@ParameterizedTest(name = "{displayName} [{index}] withMetadata={0} variant={1}")
133+
@MethodSource("asyncVariants")
134+
void test(boolean withMetadata, AsyncVariant variant) throws Exception {
135+
GrpcClientMetadata metadata = withMetadata ? new DefaultGrpcClientMetadata() : null;
136+
try (TesterClient client = GrpcClients.forAddress(serverHostAndPort(serverCtx)).build(new ClientFactory())) {
137+
Single<TestResponse> firstSingle = newSingle(variant, client, metadata);
138+
Future<TestResponse> first = firstSingle.toFuture();
139+
receivedFirstRequest.await();
140+
Future<TestResponse> firstConcurrent = firstSingle.toFuture();
141+
Future<TestResponse> secondConcurrent = newSingle(variant, client, metadata).toFuture();
142+
143+
responseProcessor.onComplete();
144+
assertThat(first.get().getMessage(), is("first"));
145+
assertRejected(firstConcurrent);
146+
if (metadata != null) {
147+
assertRejected(secondConcurrent);
148+
} else {
149+
// Requests are independent when metadata is not shared between them
150+
assertThat(secondConcurrent.get().getMessage(), is("other"));
151+
}
152+
153+
Future<TestResponse> firstSequential = firstSingle.toFuture();
154+
assertThat(firstSequential.get().getMessage(), is("other"));
155+
Future<TestResponse> thirdSequential = newSingle(variant, client, metadata).toFuture();
156+
assertThat(thirdSequential.get().getMessage(), is("other"));
157+
}
158+
assertThat(receivedRequests.get(), is(metadata != null ? 3 : 4));
159+
}
160+
161+
private static Single<TestResponse> newSingle(AsyncVariant variant, TesterClient client,
162+
@Nullable GrpcClientMetadata metadata) {
163+
switch (variant) {
164+
case TEST:
165+
return metadata == null ?
166+
client.test(newRequest()) :
167+
client.test(metadata, newRequest());
168+
case TEST_REQUEST_STREAM:
169+
return metadata == null ?
170+
client.testRequestStream(newStreamingRequest()) :
171+
client.testRequestStream(metadata, newStreamingRequest());
172+
case TEST_RESPONSE_STREAM:
173+
return (metadata == null ?
174+
client.testResponseStream(newRequest()) :
175+
client.testResponseStream(metadata, newRequest()))
176+
.firstOrError();
177+
case TEST_BI_DI_STREAM:
178+
return (metadata == null ?
179+
client.testBiDiStream(newStreamingRequest()) :
180+
client.testBiDiStream(metadata, newStreamingRequest()))
181+
.firstOrError();
182+
case BLOCKING_TEST:
183+
return executorExtension.executor().submit(() -> metadata == null ?
184+
client.asBlockingClient().test(newRequest()) :
185+
client.asBlockingClient().test(metadata, newRequest()));
186+
case BLOCKING_TEST_REQUEST_STREAM:
187+
return executorExtension.executor().submit(() -> metadata == null ?
188+
client.asBlockingClient().testRequestStream(newIterableRequest()) :
189+
client.asBlockingClient().testRequestStream(metadata, newIterableRequest()));
190+
case BLOCKING_TEST_RESPONSE_STREAM:
191+
return executorExtension.executor().submit(() -> (metadata == null ?
192+
client.asBlockingClient().testResponseStream(newRequest()) :
193+
client.asBlockingClient().testResponseStream(metadata, newRequest()))
194+
.iterator().next());
195+
case BLOCKING_TEST_BI_DI_STREAM:
196+
return executorExtension.executor().submit(() -> (metadata == null ?
197+
client.asBlockingClient().testBiDiStream(newIterableRequest()) :
198+
client.asBlockingClient().testBiDiStream(metadata, newIterableRequest()))
199+
.iterator().next());
200+
default:
201+
throw new AssertionError("Unexpected variant: " + variant);
202+
}
203+
}
204+
205+
private static TestRequest newRequest() {
206+
return TestRequest.newBuilder().setName("foo").build();
207+
}
208+
209+
private static Publisher<TestRequest> newStreamingRequest() {
210+
return Publisher.from(newRequest());
211+
}
212+
213+
private static Iterable<TestRequest> newIterableRequest() {
214+
return Collections.singletonList(newRequest());
215+
}
216+
217+
private static void assertRejected(Future<?> future) {
218+
ExecutionException ee = assertThrows(ExecutionException.class, future::get);
219+
assertThat(ee.getCause(), is(instanceOf(GrpcStatusException.class)));
220+
GrpcStatusException gse = (GrpcStatusException) ee.getCause();
221+
assertThat(gse.getCause(), is(instanceOf(RejectedSubscribeException.class)));
222+
}
223+
}

servicetalk-http-api/src/main/java/io/servicetalk/http/api/AbstractHttpMetaData.java

+2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ final ContextMap context0() {
8484
@Override
8585
public final ContextMap context() {
8686
if (context == null) {
87+
// If this implementation ever changes to a concurrent one, remove external synchronization from
88+
// FilterableClientToClient.executeRequest(...) and make it consistent with DefaultGrpcMetadata.
8789
context = new DefaultContextMap();
8890
}
8991
return context;

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/FilterableClientToClient.java

+72-9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.servicetalk.concurrent.api.Completable;
1919
import io.servicetalk.concurrent.api.Publisher;
2020
import io.servicetalk.concurrent.api.Single;
21+
import io.servicetalk.concurrent.internal.RejectedSubscribeException;
22+
import io.servicetalk.context.api.ContextMap;
2123
import io.servicetalk.http.api.BlockingHttpClient;
2224
import io.servicetalk.http.api.BlockingStreamingHttpClient;
2325
import io.servicetalk.http.api.FilterableStreamingHttpClient;
@@ -34,18 +36,29 @@
3436
import io.servicetalk.http.api.ReservedStreamingHttpConnection;
3537
import io.servicetalk.http.api.StreamingHttpClient;
3638
import io.servicetalk.http.api.StreamingHttpRequest;
39+
import io.servicetalk.http.api.StreamingHttpRequester;
3740
import io.servicetalk.http.api.StreamingHttpResponse;
3841
import io.servicetalk.http.api.StreamingHttpResponseFactory;
3942

43+
import static io.servicetalk.context.api.ContextMap.Key.newKey;
4044
import static io.servicetalk.http.api.HttpApiConversions.toBlockingClient;
4145
import static io.servicetalk.http.api.HttpApiConversions.toBlockingStreamingClient;
4246
import static io.servicetalk.http.api.HttpApiConversions.toClient;
4347
import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingConnection;
4448
import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingStreamingConnection;
4549
import static io.servicetalk.http.api.HttpApiConversions.toReservedConnection;
4650
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
51+
import static java.lang.Boolean.getBoolean;
4752

4853
final class FilterableClientToClient implements StreamingHttpClient {
54+
55+
// FIXME: 0.43 - remove this temporary system property
56+
private static final boolean SKIP_CONCURRENT_REQUEST_CHECK =
57+
getBoolean("io.servicetalk.http.netty.skipConcurrentRequestCheck");
58+
private static final ContextMap.Key<Object> HTTP_IN_FLIGHT_REQUEST =
59+
newKey("HTTP_IN_FLIGHT_REQUEST", Object.class);
60+
61+
private final Object lock = new Object();
4962
private final FilterableStreamingHttpClient client;
5063
private final HttpExecutionContext executionContext;
5164

@@ -71,17 +84,14 @@ public BlockingHttpClient asBlockingClient() {
7184

7285
@Override
7386
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
74-
return Single.defer(() -> {
75-
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, executionContext().executionStrategy());
76-
return client.request(request).shareContextOnSubscribe();
77-
});
87+
return executeRequest(client, request, executionContext().executionStrategy(), lock);
7888
}
7989

8090
@Override
8191
public Single<ReservedStreamingHttpConnection> reserveConnection(final HttpRequestMetaData metaData) {
8292
return Single.defer(() -> {
8393
HttpExecutionStrategy clientstrategy = executionContext().executionStrategy();
84-
metaData.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, clientstrategy);
94+
setExecutionStrategy(metaData.context(), clientstrategy);
8595
return client.reserveConnection(metaData).map(rc -> new ReservedStreamingHttpConnection() {
8696
@Override
8797
public ReservedHttpConnection asConnection() {
@@ -108,10 +118,7 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
108118
// Use the strategy from the client as the underlying ReservedStreamingHttpConnection may be user
109119
// created and hence could have an incorrect default strategy. Doing this makes sure we never call
110120
// the method without strategy just as we do for the regular connection.
111-
return Single.defer(() -> {
112-
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, clientstrategy);
113-
return rc.request(request).shareContextOnSubscribe();
114-
});
121+
return executeRequest(rc, request, clientstrategy, lock);
115122
}
116123

117124
@Override
@@ -196,4 +203,60 @@ public Completable closeAsyncGracefully() {
196203
public StreamingHttpRequest newRequest(final HttpRequestMethod method, final String requestTarget) {
197204
return client.newRequest(method, requestTarget);
198205
}
206+
207+
private static Single<StreamingHttpResponse> executeRequest(final StreamingHttpRequester requester,
208+
final StreamingHttpRequest request,
209+
final HttpExecutionStrategy strategy,
210+
final Object lock) {
211+
return Single.defer(() -> {
212+
final ContextMap context = request.context();
213+
214+
if (SKIP_CONCURRENT_REQUEST_CHECK) {
215+
return executeRequest(requester, request, context, strategy).shareContextOnSubscribe();
216+
}
217+
218+
// Prevent concurrent execution of the same request through the same layer.
219+
// In general, we do not expect users to execute the same mutable request concurrently. Therefore, the cost
220+
// of synchronized block should be negligible for most requests, unless they messed up with reactive streams
221+
// chain and accidentally subscribed to the same request concurrently. This protection helps them avoid
222+
// ambiguous runtime behavior caused by a corrupted mutable request state.
223+
final Object inFlight;
224+
synchronized (context) {
225+
// We do not override lock because other layers may already set their own one.
226+
inFlight = context.putIfAbsent(HTTP_IN_FLIGHT_REQUEST, lock);
227+
}
228+
if (lock.equals(inFlight)) {
229+
return Single.<StreamingHttpResponse>failed(new RejectedSubscribeException(
230+
"Concurrent execution is detected for the same mutable request. Only a single execution is " +
231+
"allowed at any point of time. Otherwise, request data structures can be corrupted. " +
232+
"To avoid this error, create a new request for every execution or wrap every request " +
233+
"creation with Single.defer() operator."))
234+
.shareContextOnSubscribe();
235+
}
236+
237+
Single<StreamingHttpResponse> response = executeRequest(requester, request, context, strategy);
238+
if (inFlight == null) {
239+
// Remove only if we are the one who set the lock.
240+
response = response.beforeFinally(() -> {
241+
synchronized (context) {
242+
context.remove(HTTP_IN_FLIGHT_REQUEST);
243+
}
244+
});
245+
}
246+
return response.shareContextOnSubscribe();
247+
});
248+
}
249+
250+
private static Single<StreamingHttpResponse> executeRequest(final StreamingHttpRequester requester,
251+
final StreamingHttpRequest request,
252+
final ContextMap context,
253+
final HttpExecutionStrategy strategy) {
254+
setExecutionStrategy(context, strategy);
255+
return requester.request(request);
256+
}
257+
258+
private static void setExecutionStrategy(final ContextMap context, final HttpExecutionStrategy strategy) {
259+
// We do not override HttpExecutionStrategy because users may prefer to use their own.
260+
context.putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
261+
}
199262
}

0 commit comments

Comments
 (0)