Skip to content

Commit 922993b

Browse files
authored
NettyCompression gzip streaming decompression over multiple buffers fix (#2380)
Motivation: NettyCompression's gzip streaming decompression fails to decompress payloads when split over multiple buffers and may result in: ``` DecompressionException: Input is not in the GZIP format ``` Modifications: - NettyCompressionStreamingSerializer skips bytes unconditionally after they are feed to the Netty decoders. However in some scenarios not all bytes maybe consumed, and if we skip the bytes the aggregation maintained by Netty's ByteToMessageDecoder will be discarded. Instead of skipping unconditionally we should only advance the reader index if the ByteBuf and Buffer don't already match.
1 parent 16a4f8a commit 922993b

File tree

3 files changed

+140
-11
lines changed

3 files changed

+140
-11
lines changed

servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyCompressionSerializer.java

+17-6
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ public void serialize(final Buffer toSerialize, final BufferAllocator allocator,
5656
final MessageToByteEncoder<ByteBuf> encoder = encoderSupplier.get();
5757
final EmbeddedChannel channel = newEmbeddedChannel(encoder, allocator);
5858
try {
59-
channel.writeOutbound(extractByteBufOrCreate(toSerialize));
60-
toSerialize.skipBytes(toSerialize.readableBytes());
61-
59+
writeAndUpdateIndex(channel, toSerialize, false);
6260
// May produce footer
6361
preparePendingData(channel);
6462
drainChannelQueueToSingleBuffer(channel.outboundMessages(), nettyDst);
@@ -84,9 +82,7 @@ public Buffer deserialize(final Buffer serializedData, final BufferAllocator all
8482
final ByteToMessageDecoder decoder = decoderSupplier.get();
8583
final EmbeddedChannel channel = newEmbeddedChannel(decoder, allocator);
8684
try {
87-
channel.writeInbound(toByteBuf(serializedData));
88-
serializedData.skipBytes(serializedData.readableBytes());
89-
85+
writeAndUpdateIndex(channel, serializedData, true);
9086
drainChannelQueueToSingleBuffer(channel.inboundMessages(), nettyDst);
9187
// no need to advance writerIndex -> NettyBuffer's writerIndex reflects the underlying ByteBuf value.
9288
cleanup(channel);
@@ -97,6 +93,21 @@ public Buffer deserialize(final Buffer serializedData, final BufferAllocator all
9793
}
9894
}
9995

96+
static void writeAndUpdateIndex(EmbeddedChannel channel, Buffer toSerialize, boolean inbound) {
97+
ByteBuf byteBuf = extractByteBufOrCreate(toSerialize);
98+
final int beforeReadableBytes = byteBuf.readableBytes();
99+
if (inbound) {
100+
channel.writeInbound(byteBuf);
101+
} else {
102+
channel.writeOutbound(byteBuf);
103+
}
104+
// extractByteBufOrCreate may have to copy if it isn't able to unwrap NettyBuffer and in this case we have to
105+
// manually advance the Buffer indexes to reflect what was consumed.
106+
if (byteBuf.readableBytes() != toSerialize.readableBytes()) {
107+
toSerialize.skipBytes(byteBuf.readableBytes() - beforeReadableBytes);
108+
}
109+
}
110+
100111
@Nullable
101112
static void drainChannelQueueToSingleBuffer(final Queue<Object> queue, final ByteBuf nettyDst) {
102113
ByteBuf buf;

servicetalk-encoding-netty/src/main/java/io/servicetalk/encoding/netty/NettyCompressionStreamingSerializer.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@
3636
import javax.annotation.Nullable;
3737

3838
import static io.servicetalk.buffer.api.ReadOnlyBufferAllocators.DEFAULT_RO_ALLOCATOR;
39-
import static io.servicetalk.buffer.netty.BufferUtils.extractByteBufOrCreate;
4039
import static io.servicetalk.buffer.netty.BufferUtils.getByteBufAllocator;
4140
import static io.servicetalk.buffer.netty.BufferUtils.newBufferFrom;
4241
import static io.servicetalk.concurrent.api.Single.succeeded;
4342
import static io.servicetalk.encoding.netty.NettyCompressionSerializer.cleanup;
4443
import static io.servicetalk.encoding.netty.NettyCompressionSerializer.preparePendingData;
4544
import static io.servicetalk.encoding.netty.NettyCompressionSerializer.safeCleanup;
45+
import static io.servicetalk.encoding.netty.NettyCompressionSerializer.writeAndUpdateIndex;
4646
import static java.util.Objects.requireNonNull;
4747

4848
final class NettyCompressionStreamingSerializer implements StreamingSerializerDeserializer<Buffer> {
@@ -80,8 +80,7 @@ public void onNext(@Nullable final Buffer next) {
8080
}
8181

8282
try { // onNext will produce AT-MOST N items (as received)
83-
channel.writeInbound(extractByteBufOrCreate(next));
84-
next.skipBytes(next.readableBytes());
83+
writeAndUpdateIndex(channel, next, true);
8584
Buffer buffer = drainChannelQueueToSingleBuffer(channel.inboundMessages(), allocator);
8685
if (buffer != null && buffer.readableBytes() > 0) {
8786
subscriber.onNext(buffer);
@@ -148,8 +147,7 @@ public void onNext(@Nullable Buffer next) {
148147
subscriber.onNext(buffer);
149148
}
150149
} else {
151-
channel.writeOutbound(extractByteBufOrCreate(next));
152-
next.skipBytes(next.readableBytes());
150+
writeAndUpdateIndex(channel, next, false);
153151
Buffer buffer = drainChannelQueueToSingleBuffer(channel.outboundMessages(), allocator);
154152
if (buffer != null && buffer.readableBytes() > 0) {
155153
subscriber.onNext(buffer);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.encoding.netty;
17+
18+
import io.servicetalk.buffer.api.Buffer;
19+
import io.servicetalk.buffer.api.BufferAllocator;
20+
import io.servicetalk.buffer.api.CompositeBuffer;
21+
import io.servicetalk.concurrent.api.Publisher;
22+
import io.servicetalk.serializer.api.SerializerDeserializer;
23+
import io.servicetalk.serializer.api.StreamingSerializerDeserializer;
24+
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.EnumSource;
27+
28+
import java.util.concurrent.ThreadLocalRandom;
29+
30+
import static io.servicetalk.buffer.api.ReadOnlyBufferAllocators.DEFAULT_RO_ALLOCATOR;
31+
import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
32+
import static io.servicetalk.concurrent.api.Publisher.from;
33+
import static io.servicetalk.encoding.netty.NettyCompression.deflateDefault;
34+
import static io.servicetalk.encoding.netty.NettyCompression.deflateDefaultStreaming;
35+
import static io.servicetalk.encoding.netty.NettyCompression.gzipDefault;
36+
import static io.servicetalk.encoding.netty.NettyCompression.gzipDefaultStreaming;
37+
import static java.nio.charset.StandardCharsets.UTF_8;
38+
import static org.hamcrest.MatcherAssert.assertThat;
39+
import static org.hamcrest.Matchers.equalTo;
40+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
41+
42+
class NettyCompressionStreamingSerializerTest {
43+
enum StreamingType {
44+
GZIP(gzipDefaultStreaming(), DEFAULT_ALLOCATOR),
45+
DEFLATE(deflateDefaultStreaming(), DEFAULT_ALLOCATOR),
46+
GZIP_RO(gzipDefaultStreaming(), DEFAULT_RO_ALLOCATOR),
47+
DEFLATE_RO(deflateDefaultStreaming(), DEFAULT_RO_ALLOCATOR);
48+
49+
final StreamingSerializerDeserializer<Buffer> serializer;
50+
final BufferAllocator allocator;
51+
52+
StreamingType(StreamingSerializerDeserializer<Buffer> serializer, BufferAllocator allocator) {
53+
this.serializer = serializer;
54+
this.allocator = allocator;
55+
}
56+
}
57+
58+
enum AggType {
59+
GZIP(gzipDefault(), DEFAULT_ALLOCATOR),
60+
DEFLATE(deflateDefault(), DEFAULT_ALLOCATOR),
61+
GZIP_RO(gzipDefault(), DEFAULT_RO_ALLOCATOR),
62+
DEFLATE_RO(deflateDefault(), DEFAULT_RO_ALLOCATOR);
63+
64+
final SerializerDeserializer<Buffer> serializer;
65+
final BufferAllocator allocator;
66+
67+
AggType(SerializerDeserializer<Buffer> serializer, BufferAllocator allocator) {
68+
this.serializer = serializer;
69+
this.allocator = allocator;
70+
}
71+
}
72+
73+
@ParameterizedTest(name = "{displayName} [{index}] type = {0}")
74+
@EnumSource(StreamingType.class)
75+
void streamingOverMultipleFrames(StreamingType type) throws Exception {
76+
BufferAllocator allocator = type.allocator;
77+
StreamingSerializerDeserializer<Buffer> serializer = type.serializer;
78+
String rawString = "hello";
79+
byte[] rawBytes = new byte[1024];
80+
ThreadLocalRandom.current().nextBytes(rawBytes);
81+
Buffer[] clearText = new Buffer[] {allocator.fromAscii(rawString), allocator.wrap(rawBytes)};
82+
Publisher<Buffer> serializedPub = serializer.serialize(from(clearText), DEFAULT_ALLOCATOR);
83+
84+
CompositeBuffer serializedBuf = serializedPub
85+
.collect(DEFAULT_ALLOCATOR::newCompositeBuffer, CompositeBuffer::addBuffer).toFuture().get();
86+
87+
assertThat(serializedBuf.readableBytes(), greaterThanOrEqualTo(4));
88+
CompositeBuffer result = serializer.deserialize(from(
89+
// Copy so we feed in data from the allocator of choice.
90+
allocator.wrap(readAndCopy(serializedBuf, 4)),
91+
allocator.wrap(readAndCopy(serializedBuf, serializedBuf.readableBytes()))
92+
), DEFAULT_ALLOCATOR)
93+
.collect(DEFAULT_ALLOCATOR::newCompositeBuffer, CompositeBuffer::addBuffer).toFuture().get();
94+
95+
assertThat(result.readableBytes(), equalTo(rawString.length() + rawBytes.length));
96+
assertThat(result.readBytes(rawString.length()).toString(UTF_8), equalTo(rawString));
97+
assertThat(result, equalTo(allocator.wrap(rawBytes)));
98+
}
99+
100+
@ParameterizedTest(name = "{displayName} [{index}] type = {0}")
101+
@EnumSource(AggType.class)
102+
void aggregatedRoundTrip(AggType type) {
103+
BufferAllocator allocator = type.allocator;
104+
SerializerDeserializer<Buffer> serializer = type.serializer;
105+
byte[] rawBytes = new byte[1024];
106+
ThreadLocalRandom.current().nextBytes(rawBytes);
107+
Buffer serializedBuf = serializer.serialize(allocator.wrap(rawBytes), DEFAULT_ALLOCATOR);
108+
Buffer result = serializer.deserialize(
109+
allocator.wrap(readAndCopy(serializedBuf, serializedBuf.readableBytes())),
110+
DEFAULT_ALLOCATOR);
111+
112+
assertThat(readAndCopy(result, result.readableBytes()), equalTo(rawBytes));
113+
}
114+
115+
private static byte[] readAndCopy(Buffer buffer, int byteCount) {
116+
byte[] bytes = new byte[byteCount];
117+
buffer.readBytes(bytes);
118+
return bytes;
119+
}
120+
}

0 commit comments

Comments
 (0)