Skip to content

Commit abc0d75

Browse files
Racy cancel from transport does not always cancel the message body (#2369)
Motivation: When `cancel` from the server's transport races with HTTP message completion, we can receive `cancel` after `Single<HttpResponseMetaData>` completes but before we subscribe to the `messageBody` publisher. In this case, we won't subscribe to the `messageBody` because we use `concat` operator. This does not allow `BeforeFinallyHttpOperator` to observe a terminal event. The race is possible when offloading is enabled. `cancel` comes from an IO thread, HTTP meta-data completes on the offloaded thread. Modifications: - For the server-side control flow, use the recently added `concatPropagateCancel` operator to guarantee propagation of the cancel for the entire flat HTTP stream; Result: `HttpLifecycleObserverTest.testClientCancelsRequestBeforeResponse()` does not race anymore.
1 parent b1a5c53 commit abc0d75

File tree

5 files changed

+34
-16
lines changed

5 files changed

+34
-16
lines changed

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,18 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
166166
if (canAddRequestContentLength(request)) {
167167
flatRequest = setRequestContentLength(connectionContext().protocol(), request);
168168
} else {
169-
if (emptyMessageBody(request, request.messageBody())) {
170-
flatRequest = flatEmptyMessage(connectionContext().protocol(), request, request.messageBody());
169+
final Publisher<Object> messageBody = request.messageBody();
170+
// Do not propagate cancel to the messageBody if cancel arrives before meta-data completes. Client-side
171+
// state machine does not depend on termination of the messageBody until after transport subscribes to
172+
// it. It's preferable to avoid subscribe to the messageBody in case of cancellation to allow requests
173+
// with non-replayable messageBody to retry.
174+
if (emptyMessageBody(request, messageBody)) {
175+
flatRequest = flatEmptyMessage(connectionContext().protocol(), request, messageBody,
176+
/* propagateCancel */ false);
171177
} else {
172178
// Defer subscribe to the messageBody until transport requests it to allow clients retry failed
173179
// requests with non-replayable messageBody
174-
flatRequest = Single.<Object>succeeded(request).concat(request.messageBody(), true);
180+
flatRequest = Single.<Object>succeeded(request).concat(messageBody, /* deferSubscribe */ true);
175181
if (shouldAppendTrailers(connectionContext().protocol(), request)) {
176182
flatRequest = flatRequest.scanWith(HeaderUtils::appendTrailersMapper);
177183
}

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HeaderUtils.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,13 @@ static Publisher<Object> setRequestContentLength(final HttpProtocolVersion proto
139139
final StreamingHttpRequest request) {
140140
return setContentLength(request, request.messageBody(),
141141
shouldAddZeroContentLength(request.method()) ? HeaderUtils::updateContentLength :
142-
HeaderUtils::updateRequestContentLengthNonZero, protocolVersion);
142+
HeaderUtils::updateRequestContentLengthNonZero, protocolVersion, /* propagateCancel */ false);
143143
}
144144

145145
static Publisher<Object> setResponseContentLength(final HttpProtocolVersion protocolVersion,
146146
final StreamingHttpResponse response) {
147-
return setContentLength(response, response.messageBody(), HeaderUtils::updateContentLength, protocolVersion);
147+
return setContentLength(response, response.messageBody(), HeaderUtils::updateContentLength, protocolVersion,
148+
/* propagateCancel */ true);
148149
}
149150

150151
private static void updateRequestContentLengthNonZero(final int contentLength, final HttpHeaders headers) {
@@ -210,16 +211,22 @@ static boolean emptyMessageBody(final HttpMetaData metadata) {
210211
}
211212

212213
static Publisher<Object> flatEmptyMessage(final HttpProtocolVersion protocolVersion,
213-
final HttpMetaData metadata, final Publisher<Object> messageBody) {
214+
final HttpMetaData metadata,
215+
final Publisher<Object> messageBody,
216+
final boolean propagateCancel) {
214217
assert emptyMessageBody(metadata, messageBody);
215218
// HTTP/2 and above can write meta-data as a single frame with endStream=true flag. To check the version, use
216219
// HttpProtocolVersion from ConnectionInfo because HttpMetaData may have different version.
217220
final Publisher<Object> flatMessage =
218221
protocolVersion.major() > 1 || !shouldAppendTrailers(protocolVersion, metadata) ? from(metadata) :
219222
from(metadata, EmptyHttpHeaders.INSTANCE);
220-
return messageBody == empty() ? flatMessage :
221-
// Subscribe to the messageBody publisher to trigger any applied transformations, but ignore its
222-
// content because the PayloadInfo indicated it's effectively empty and does not contain trailers
223+
if (messageBody == empty()) {
224+
return flatMessage;
225+
}
226+
// Subscribe to the messageBody publisher to trigger any applied transformations, but ignore its content because
227+
// the PayloadInfo indicated it's effectively empty and does not contain trailers.
228+
return propagateCancel ?
229+
flatMessage.concatPropagateCancel(messageBody.ignoreElements()) :
223230
flatMessage.concat(messageBody.ignoreElements());
224231
}
225232

@@ -246,10 +253,11 @@ public boolean equals(Object o) {
246253
private static Publisher<Object> setContentLength(final HttpMetaData metadata,
247254
final Publisher<Object> messageBody,
248255
final BiIntConsumer<HttpHeaders> contentLengthUpdater,
249-
final HttpProtocolVersion protocolVersion) {
256+
final HttpProtocolVersion protocolVersion,
257+
final boolean propagateCancel) {
250258
if (emptyMessageBody(metadata, messageBody)) {
251259
contentLengthUpdater.apply(0, metadata.headers());
252-
return flatEmptyMessage(protocolVersion, metadata, messageBody);
260+
return flatEmptyMessage(protocolVersion, metadata, messageBody, propagateCancel);
253261
}
254262
return messageBody.collect(() -> null, (reduction, item) -> {
255263
if (reduction == null) {

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -453,11 +453,14 @@ private static Publisher<Object> handleResponse(final HttpProtocolVersion protoc
453453
return setResponseContentLength(protocolVersion, response);
454454
} else {
455455
Publisher<Object> flatResponse;
456-
if (emptyMessageBody(response, response.messageBody())) {
457-
flatResponse = flatEmptyMessage(protocolVersion, response, response.messageBody());
456+
final Publisher<Object> messageBody = response.messageBody();
457+
// Ensure cancel is propagated through the messageBody. Otherwise, if cancel from transport races with
458+
// execution of this method and wins, BeforeFinallyHttpOperator won't trigger and observers won't
459+
// complete the exchange.
460+
if (emptyMessageBody(response, messageBody)) {
461+
flatResponse = flatEmptyMessage(protocolVersion, response, messageBody, /* propagateCancel */ true);
458462
} else {
459-
// Not necessary to defer subscribe to the messageBody because server does not retry responses
460-
flatResponse = Single.<Object>succeeded(response).concat(response.messageBody());
463+
flatResponse = Single.<Object>succeeded(response).concatPropagateCancel(messageBody);
461464
if (shouldAppendTrailers(protocolVersion, response)) {
462465
flatResponse = flatResponse.scanWith(HeaderUtils::appendTrailersMapper);
463466
}

servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpLifecycleObserverTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
290290
verify(serverExchangeObserver, atMostOnce()).onResponse(any(StreamingHttpResponse.class));
291291
verify(serverResponseObserver, atMostOnce()).onResponseDataRequested(anyLong());
292292
verify(serverResponseObserver, atMostOnce()).onResponseComplete();
293+
verify(serverResponseObserver, atMostOnce()).onResponseCancel();
293294
serverInOrder.verify(serverExchangeObserver).onExchangeFinally();
294295
verifyNoMoreInteractions(serverLifecycleObserver, serverExchangeObserver,
295296
serverRequestObserver, serverResponseObserver);

servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryRequestWithNonRepeatablePayloadTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
9696
try {
9797
assertThat("Unexpected exception type", t,
9898
instanceOf(RetryableException.class));
99-
assertThat("Unexpected exception type",
99+
assertThat("Unexpected exception cause type",
100100
t.getCause(), instanceOf(DeliberateException.class));
101101
assertThat("Unexpected subscribe to payload body",
102102
payloadBody.isSubscribed(), is(false));

0 commit comments

Comments
 (0)