Skip to content

Commit 2dfacd1

Browse files
authored
gRPC Trailers-Only responses must be a single HEADERS frame (#3152)
### Motivation When a blocking gRPC endpoint with streaming response encounters an exception, it attempts to send `Trailers-Only` response (grpc-status is returned in headers rather than trailers). However, the HTTP/2 layer sends two frames instead of one: `HEADERS with endStream=false` and `DATA with endStream=true`. Official grpc-java implementation doesn't like that, it wants a single `HEADERS` frame with `endStream=true` for `Trailers-Only` responses. ### Changes I added a filter and a context key to signal when we have a case where `Trailers-Only` response is sent. The filter will aggregate the response stream, observe there are no trailers, drop them, and will only return a single `HEADERS` frame with `endStream=true`. This behavior can be tested by `ProtocolCompatibilityTest.unimplementedServiceError`, we unskipped tests that were failing before this fix. ### Results Before Change: ``` OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: 127.0.0.1:62451, :path: /grpc.netty.Compat/clientStreamingCall, :method: POST, :scheme: http, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.64.1, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false OUTBOUND DATA: streamId=3 padding=0 endStream=false length=7 bytes=00000000020801 OUTBOUND DATA: streamId=3 padding=0 endStream=true length=0 bytes= INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, server: servicetalk-grpc/, content-type: application/grpc+proto, grpc-status: 12, grpc-message: Method grpc.netty.Compat/clientStreamingCall is unimplemented] padding=0 endStream=false INBOUND DATA: streamId=3 padding=0 endStream=true length=0 bytes= ``` After Change: ``` OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: 127.0.0.1:62421, :path: /grpc.netty.Compat/clientStreamingCall, :method: POST, :scheme: http, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.64.1, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false OUTBOUND DATA: streamId=3 padding=0 endStream=false length=7 bytes=00000000020801 OUTBOUND DATA: streamId=3 padding=0 endStream=true length=0 bytes= INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, server: servicetalk-grpc/, content-type: application/grpc+proto, grpc-status: 12, grpc-message: Method grpc.netty.Compat/clientStreamingCall is unimplemented, content-length: 0] padding=0 endStream=true ```
1 parent f570fdb commit 2dfacd1

File tree

5 files changed

+103
-5
lines changed

5 files changed

+103
-5
lines changed

servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import static io.servicetalk.grpc.api.GrpcUtils.setStatus;
9191
import static io.servicetalk.grpc.api.GrpcUtils.setStatusOk;
9292
import static io.servicetalk.grpc.api.GrpcUtils.validateContentType;
93+
import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE;
9394
import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService;
9495
import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder;
9596
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
@@ -708,6 +709,7 @@ public void handle(final HttpServiceContext ctx, final BlockingStreamingHttpRequ
708709
methodDescriptor.httpPath(), t);
709710
HttpHeaders trailers;
710711
if (grpcResponse == null || (trailers = grpcResponse.trailers()) == null) {
712+
response.context().put(TRAILERS_ONLY_RESPONSE, Boolean.TRUE);
711713
setStatus(response.headers(), t, allocator);
712714
// Use HTTP response to avoid setting "OK" in trailers and allocating a serializer
713715
response.sendMetaData().close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.grpc.internal;
17+
18+
import io.servicetalk.context.api.ContextMap;
19+
20+
import static io.servicetalk.context.api.ContextMap.Key.newKey;
21+
22+
/**
23+
* All {@link ContextMap.Key}(s) defined for gRPC.
24+
*/
25+
public final class GrpcContextKeys {
26+
/**
27+
* For the blocking server this key allows the router to notify an upstream filter that it is safe to consolidate
28+
* tailing empty data frames when set to true.
29+
*
30+
*/
31+
public static final ContextMap.Key<Boolean> TRAILERS_ONLY_RESPONSE =
32+
newKey("TRAILERS_ONLY_RESPONSE", Boolean.class);
33+
34+
private GrpcContextKeys() {
35+
}
36+
}

servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ private Single<GrpcServerContext> doListen(final GrpcServiceFactory<?> serviceFa
145145
private ExecutionContextInterceptorHttpServerBuilder preBuild() {
146146
final ExecutionContextInterceptorHttpServerBuilder interceptor =
147147
new ExecutionContextInterceptorHttpServerBuilder(httpServerBuilderSupplier.get());
148-
149148
interceptor.appendNonOffloadingServiceFilter(GrpcExceptionMapperServiceFilter.INSTANCE);
150149

151150
directCallInitializer.initialize(interceptor);
@@ -154,6 +153,8 @@ private ExecutionContextInterceptorHttpServerBuilder preBuild() {
154153
}
155154
initializer.initialize(interceptor);
156155

156+
interceptor.appendServiceFilter(GrpcEnforceTrailersOnlyResponseServiceFilter.INSTANCE);
157+
157158
return interceptor;
158159
}
159160

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.grpc.netty;
17+
18+
import io.servicetalk.concurrent.api.Single;
19+
import io.servicetalk.http.api.HttpExecutionStrategies;
20+
import io.servicetalk.http.api.HttpExecutionStrategy;
21+
import io.servicetalk.http.api.HttpResponse;
22+
import io.servicetalk.http.api.HttpServiceContext;
23+
import io.servicetalk.http.api.StreamingHttpRequest;
24+
import io.servicetalk.http.api.StreamingHttpResponse;
25+
import io.servicetalk.http.api.StreamingHttpResponseFactory;
26+
import io.servicetalk.http.api.StreamingHttpService;
27+
import io.servicetalk.http.api.StreamingHttpServiceFilter;
28+
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
29+
30+
import static io.servicetalk.grpc.internal.GrpcContextKeys.TRAILERS_ONLY_RESPONSE;
31+
32+
final class GrpcEnforceTrailersOnlyResponseServiceFilter implements StreamingHttpServiceFilterFactory {
33+
static final GrpcEnforceTrailersOnlyResponseServiceFilter INSTANCE =
34+
new GrpcEnforceTrailersOnlyResponseServiceFilter();
35+
36+
private GrpcEnforceTrailersOnlyResponseServiceFilter() {
37+
}
38+
39+
@Override
40+
public StreamingHttpServiceFilter create(StreamingHttpService service) {
41+
return new StreamingHttpServiceFilter(service) {
42+
@Override
43+
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
44+
final StreamingHttpRequest request,
45+
final StreamingHttpResponseFactory responseFactory) {
46+
return delegate().handle(ctx, request, responseFactory).flatMap(response -> {
47+
Single<StreamingHttpResponse> mappedResponse;
48+
if (Boolean.TRUE.equals(response.context().get(TRAILERS_ONLY_RESPONSE))) {
49+
mappedResponse = response.toResponse().map(HttpResponse::toStreamingResponse);
50+
} else {
51+
mappedResponse = Single.succeeded(response);
52+
}
53+
return mappedResponse.shareContextOnSubscribe();
54+
});
55+
}
56+
};
57+
}
58+
59+
@Override
60+
public HttpExecutionStrategy requiredOffloads() {
61+
return HttpExecutionStrategies.offloadNone();
62+
}
63+
}

servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java

-4
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,6 @@ private static Collection<Arguments> clientServerParams() {
268268
for (boolean isClientServiceTalk : TRUE_FALSE) {
269269
for (boolean isServerServiceTalk : TRUE_FALSE) {
270270
for (boolean isServerBlocking : TRUE_FALSE) {
271-
if (!isClientServiceTalk && isServerServiceTalk && isServerBlocking) {
272-
// TODO there appears to be a potential bug in this combination. Separate bug filed.
273-
continue;
274-
}
275271
if (isServerServiceTalk || !isServerBlocking) {
276272
args.add(Arguments.of(isClientServiceTalk, isServerServiceTalk, isServerBlocking));
277273
}

0 commit comments

Comments
 (0)