Skip to content

Commit 4ee9064

Browse files
Fix racy test behavior
1 parent e8ec581 commit 4ee9064

File tree

2 files changed

+65
-46
lines changed

2 files changed

+65
-46
lines changed

servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ConcurrentGrpcRequestTest.java

+57-38
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
import io.servicetalk.grpc.netty.TesterProto.Tester.ClientFactory;
3333
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
3434
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
35+
import io.servicetalk.http.api.HttpServiceContext;
36+
import io.servicetalk.http.api.StreamingHttpRequest;
37+
import io.servicetalk.http.api.StreamingHttpResponse;
38+
import io.servicetalk.http.api.StreamingHttpResponseFactory;
39+
import io.servicetalk.http.api.StreamingHttpServiceFilter;
3540
import io.servicetalk.transport.api.ServerContext;
3641

3742
import org.junit.jupiter.api.AfterEach;
@@ -79,37 +84,48 @@ private enum AsyncVariant {
7984
private final ServerContext serverCtx;
8085

8186
ConcurrentGrpcRequestTest() throws Exception {
82-
serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(new TesterService() {
83-
@Override
84-
public Single<TestResponse> test(GrpcServiceContext ctx, TestRequest request) {
85-
return testSingle(Publisher.from(request));
86-
}
87-
88-
@Override
89-
public Single<TestResponse> testRequestStream(GrpcServiceContext ctx, Publisher<TestRequest> request) {
90-
return testSingle(request);
91-
}
92-
93-
@Override
94-
public Publisher<TestResponse> testResponseStream(GrpcServiceContext ctx, TestRequest request) {
95-
return testSingle(Publisher.from(request)).toPublisher();
96-
}
97-
98-
@Override
99-
public Publisher<TestResponse> testBiDiStream(GrpcServiceContext ctx, Publisher<TestRequest> request) {
100-
return testSingle(request).toPublisher();
101-
}
102-
103-
private Single<TestResponse> testSingle(Publisher<TestRequest> request) {
104-
receivedFirstRequest.countDown();
105-
if (receivedRequests.incrementAndGet() == 1) {
106-
return SourceAdapters.fromSource(responseProcessor)
107-
.concat(request.ignoreElements())
108-
.concat(Single.succeeded(TestResponse.newBuilder().setMessage("first").build()));
109-
}
110-
return Single.succeeded(TestResponse.newBuilder().setMessage("other").build());
111-
}
112-
});
87+
serverCtx = GrpcServers.forAddress(localAddress(0))
88+
.initializeHttp(builder -> builder.appendServiceFilter(s -> new StreamingHttpServiceFilter(s) {
89+
@Override
90+
public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
91+
StreamingHttpRequest request,
92+
StreamingHttpResponseFactory responseFactory) {
93+
receivedFirstRequest.countDown();
94+
Single<StreamingHttpResponse> response = delegate().handle(ctx, request, responseFactory);
95+
if (receivedRequests.incrementAndGet() == 1) {
96+
return response.concat(SourceAdapters.fromSource(responseProcessor));
97+
}
98+
return response;
99+
}
100+
}))
101+
.listenAndAwait(new TesterService() {
102+
103+
@Override
104+
public Single<TestResponse> test(GrpcServiceContext ctx, TestRequest request) {
105+
return newResponse();
106+
}
107+
108+
@Override
109+
public Single<TestResponse> testRequestStream(GrpcServiceContext ctx,
110+
Publisher<TestRequest> request) {
111+
return newResponse();
112+
}
113+
114+
@Override
115+
public Publisher<TestResponse> testResponseStream(GrpcServiceContext ctx, TestRequest request) {
116+
return newResponse().toPublisher();
117+
}
118+
119+
@Override
120+
public Publisher<TestResponse> testBiDiStream(GrpcServiceContext ctx,
121+
Publisher<TestRequest> request) {
122+
return newResponse().toPublisher();
123+
}
124+
125+
private Single<TestResponse> newResponse() {
126+
return Single.succeeded(TestResponse.newBuilder().setMessage("msg").build());
127+
}
128+
});
113129
}
114130

115131
@AfterEach
@@ -140,20 +156,19 @@ void test(boolean withMetadata, AsyncVariant variant) throws Exception {
140156
Future<TestResponse> firstConcurrent = firstSingle.toFuture();
141157
Future<TestResponse> secondConcurrent = newSingle(variant, client, metadata).toFuture();
142158

143-
responseProcessor.onComplete();
144-
assertThat(first.get().getMessage(), is("first"));
145159
assertRejected(firstConcurrent);
146160
if (metadata != null) {
147161
assertRejected(secondConcurrent);
148162
} else {
149163
// Requests are independent when metadata is not shared between them
150-
assertThat(secondConcurrent.get().getMessage(), is("other"));
164+
assertResponse(secondConcurrent);
151165
}
166+
responseProcessor.onComplete();
167+
assertResponse(first);
152168

153-
Future<TestResponse> firstSequential = firstSingle.toFuture();
154-
assertThat(firstSequential.get().getMessage(), is("other"));
155-
Future<TestResponse> thirdSequential = newSingle(variant, client, metadata).toFuture();
156-
assertThat(thirdSequential.get().getMessage(), is("other"));
169+
// Sequential requests should be successful:
170+
assertResponse(firstSingle.toFuture());
171+
assertResponse(newSingle(variant, client, metadata).toFuture());
157172
}
158173
assertThat(receivedRequests.get(), is(metadata != null ? 3 : 4));
159174
}
@@ -220,4 +235,8 @@ private static void assertRejected(Future<?> future) {
220235
GrpcStatusException gse = (GrpcStatusException) ee.getCause();
221236
assertThat(gse.getCause(), is(instanceOf(RejectedSubscribeException.class)));
222237
}
238+
239+
private static void assertResponse(Future<TestResponse> future) throws Exception {
240+
assertThat(future.get().getMessage(), is("msg"));
241+
}
223242
}

servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConcurrentHttpRequestTest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ void asyncStreamingRequester(StreamingHttpRequester requester, boolean multiAddr
122122
Future<StreamingHttpResponse> firstConcurrent = firstSingle.toFuture();
123123
Future<StreamingHttpResponse> secondConcurrent = requester.request(request).toFuture();
124124

125-
responseProcessor.onComplete();
126-
assertResponse(first.get(), HTTP_1_1, OK, 0);
127125
assertRejected(firstConcurrent);
128126
assertRejected(secondConcurrent);
127+
responseProcessor.onComplete();
128+
assertResponse(first.get(), HTTP_1_1, OK, 0);
129129

130130
assertSequential(multiAddressClient, request, firstSingle);
131131
assertSequential(multiAddressClient, request, requester.request(request));
@@ -161,10 +161,10 @@ void asyncAggregatedRequester(HttpRequester requester, boolean multiAddressClien
161161
Future<HttpResponse> firstConcurrent = firstSingle.toFuture();
162162
Future<HttpResponse> secondConcurrent = requester.request(request).toFuture();
163163

164-
responseProcessor.onComplete();
165-
assertAggregatedResponse(first.get(), OK);
166164
assertRejected(firstConcurrent);
167165
assertRejected(secondConcurrent);
166+
responseProcessor.onComplete();
167+
assertAggregatedResponse(first.get(), OK);
168168

169169
assertSequential(multiAddressClient, request, firstSingle);
170170
assertSequential(multiAddressClient, request, requester.request(request));
@@ -203,10 +203,10 @@ void blockingStreamingRequester(BlockingStreamingHttpRequester requester,
203203
Future<StreamingHttpResponse> secondConcurrent =
204204
executorExtension.executor().submit(() -> requester.request(request).toStreamingResponse()).toFuture();
205205

206-
responseProcessor.onComplete();
207-
assertResponse(first.get(), HTTP_1_1, OK, 0);
208206
assertRejected(firstConcurrent);
209207
assertRejected(secondConcurrent);
208+
responseProcessor.onComplete();
209+
assertResponse(first.get(), HTTP_1_1, OK, 0);
210210

211211
assertSequential(multiAddressClient, request, firstSingle);
212212
assertSequential(multiAddressClient, request,
@@ -244,10 +244,10 @@ void blockingAggregatedRequester(BlockingHttpRequester requester, boolean multiA
244244
Future<HttpResponse> secondConcurrent = executorExtension.executor().submit(() -> requester.request(request))
245245
.toFuture();
246246

247-
responseProcessor.onComplete();
248-
assertAggregatedResponse(first.get(), OK);
249247
assertRejected(firstConcurrent);
250248
assertRejected(secondConcurrent);
249+
responseProcessor.onComplete();
250+
assertAggregatedResponse(first.get(), OK);
251251

252252
assertSequential(multiAddressClient, request, firstSingle);
253253
assertSequential(multiAddressClient, request,

0 commit comments

Comments
 (0)