Skip to content

Commit 72d4151

Browse files
Wip of leak detection working as intended
1 parent 68055fa commit 72d4151

File tree

5 files changed

+244
-3
lines changed

5 files changed

+244
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package io.servicetalk.grpc;
2+
3+
import com.apple.servicetalkleak.Message;
4+
import com.apple.servicetalkleak.ServiceTalkLeak;
5+
import io.netty.buffer.ByteBufUtil;
6+
import io.servicetalk.concurrent.api.Publisher;
7+
import io.servicetalk.concurrent.api.Single;
8+
import io.servicetalk.grpc.api.GrpcServiceContext;
9+
import io.servicetalk.grpc.api.GrpcStatusCode;
10+
import io.servicetalk.grpc.api.GrpcStatusException;
11+
import io.servicetalk.grpc.netty.GrpcClients;
12+
import io.servicetalk.grpc.netty.GrpcServers;
13+
import io.servicetalk.http.netty.HttpProtocolConfigs;
14+
import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle;
15+
import io.servicetalk.logging.api.LogLevel;
16+
import io.servicetalk.transport.api.HostAndPort;
17+
import io.servicetalk.transport.api.IoExecutor;
18+
import io.servicetalk.transport.netty.internal.NettyIoExecutors;
19+
import org.junit.jupiter.api.Test;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
24+
import static org.junit.jupiter.api.Assertions.assertFalse;
25+
26+
public class LeakRepro {
27+
28+
private static final Logger LOGGER = LoggerFactory.getLogger(LeakRepro.class);
29+
30+
static boolean leakDetected = false;
31+
32+
static {
33+
System.setProperty("io.netty.leakDetection.level", "paranoid");
34+
ByteBufUtil.setLeakListener((type, records) -> {
35+
leakDetected = true;
36+
LOGGER.error("ByteBuf leak detected!");
37+
});
38+
}
39+
40+
IoExecutor serverExecutor = NettyIoExecutors.createIoExecutor(1, "server");
41+
IoExecutor clientExecutor = NettyIoExecutors.createIoExecutor(1, "client");
42+
43+
@SuppressWarnings("resource")
44+
@Test
45+
public void testLeak() throws Exception {
46+
GrpcServers.forPort(8888)
47+
.initializeHttp(b -> b
48+
.ioExecutor(serverExecutor)
49+
.executor(serverExecutor))
50+
.listenAndAwait(new ServiceTalkLeak.ServiceTalkLeakService() {
51+
@Override
52+
public Publisher<Message> rpc(GrpcServiceContext ctx, Publisher<Message> request) {
53+
Publisher<Message> response = splice(request)
54+
.flatMapPublisher(pair -> {
55+
LOGGER.info("Initial message: " + pair.head);
56+
return Publisher.failed(new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status()));
57+
});
58+
return response;
59+
}
60+
});
61+
62+
ServiceTalkLeak.ServiceTalkLeakClient client = GrpcClients.forAddress(HostAndPort.of("127.0.0.1", 8888))
63+
.initializeHttp(b -> b
64+
.protocols(HttpProtocolConfigs.h2().enableFrameLogging("CLIENT", LogLevel.INFO, () -> true).build())
65+
.ioExecutor(clientExecutor)
66+
.executor(clientExecutor))
67+
.build(new ServiceTalkLeak.ClientFactory());
68+
69+
for (int i = 0; i < 10; i++) {
70+
LOGGER.info("Iteration {}", i);
71+
blockingInvocation(
72+
client.rpc(
73+
Publisher.from(
74+
Message.newBuilder().setValue("first message").build(),
75+
Message.newBuilder().setValue("second message (which leaks)").build()))
76+
.ignoreElements()
77+
.onErrorComplete());
78+
79+
System.gc();
80+
System.runFinalization();
81+
}
82+
83+
assertFalse(leakDetected);
84+
}
85+
86+
private static Single<Pair> splice(Publisher<Message> request) {
87+
return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new));
88+
}
89+
90+
private static final class Pair {
91+
final Message head;
92+
final Publisher<Message> stream;
93+
94+
public Pair(Message head, Publisher<Message> stream) {
95+
this.head = head;
96+
this.stream = stream;
97+
}
98+
}
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
syntax = "proto3";
2+
3+
option java_multiple_files = true;
4+
option java_package = "com.apple.servicetalkleak";
5+
6+
message Message {
7+
string value = 1;
8+
}
9+
10+
service ServiceTalkLeak {
11+
rpc Rpc(stream Message) returns (stream Message);
12+
}

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
7474
final StreamingHttpRequest request,
7575
final StreamingHttpResponseFactory responseFactory) {
7676
return delegate()
77-
.handle(ctx, request, responseFactory)
77+
.handle(ctx, request.transformMessageBody(LeakDetection::instrument), responseFactory)
7878
.map(response -> {
7979
// always write the buffer publisher into the request context. When a downstream subscriber
8080
// arrives, mark the message as subscribed explicitly (having a message present and no
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package io.servicetalk.http.netty;
2+
3+
import io.servicetalk.concurrent.Cancellable;
4+
import io.servicetalk.concurrent.PublisherSource.Subscriber;
5+
import io.servicetalk.concurrent.PublisherSource.Subscription;
6+
import io.servicetalk.concurrent.api.Publisher;
7+
import io.servicetalk.concurrent.api.SourceAdapters;
8+
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import javax.annotation.Nullable;
13+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
14+
15+
final class LeakDetection {
16+
17+
private static final Logger LOGGER = LoggerFactory.getLogger(LeakDetection.class);
18+
private LeakDetection() {
19+
// no instances.
20+
}
21+
22+
static <T> Publisher<T> instrument(Publisher<T> publisher) {
23+
FinishedToken token = new FinishedToken(publisher);
24+
return publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, token));
25+
}
26+
private static final class InstrumentedSubscriber<T> implements Subscriber<T> {
27+
28+
private final Subscriber<T> delegate;
29+
private final FinishedToken token;
30+
31+
public InstrumentedSubscriber(Subscriber<T> delegate, FinishedToken token) {
32+
this.delegate = delegate;
33+
this.token = token;
34+
}
35+
36+
@Override
37+
public void onSubscribe(Subscription subscription) {
38+
token.subscribed(subscription);
39+
delegate.onSubscribe(new Subscription() {
40+
@Override
41+
public void request(long n) {
42+
subscription.request(n);
43+
}
44+
45+
@Override
46+
public void cancel() {
47+
token.doComplete();
48+
subscription.cancel();
49+
}
50+
});
51+
}
52+
53+
@Override
54+
public void onNext(@Nullable T t) {
55+
delegate.onNext(t);
56+
}
57+
58+
@Override
59+
public void onError(Throwable t) {
60+
token.doComplete();
61+
delegate.onError(t);
62+
}
63+
64+
@Override
65+
public void onComplete() {
66+
token.doComplete();
67+
delegate.onComplete();
68+
}
69+
70+
71+
}
72+
73+
private static final class FinishedToken {
74+
75+
private static final AtomicReferenceFieldUpdater<FinishedToken, Object> UPDATER =
76+
AtomicReferenceFieldUpdater.newUpdater(FinishedToken.class, Object.class,"state");
77+
private static final String COMPLETE = "complete";
78+
79+
volatile Object state;
80+
81+
public FinishedToken(Publisher<?> parent) {
82+
this.state = parent;
83+
}
84+
85+
void doComplete() {
86+
UPDATER.set(this, COMPLETE);
87+
}
88+
89+
private boolean checkComplete() {
90+
Object previous = UPDATER.getAndSet(this, COMPLETE);
91+
if (previous != COMPLETE) {
92+
// This means something leaked.
93+
if (previous instanceof Publisher) {
94+
// never subscribed to.
95+
SourceAdapters.toSource((Publisher<?>) previous).subscribe(CancelImmediatelySubscriber.INSTANCE);
96+
} else {
97+
assert previous instanceof Cancellable;
98+
Cancellable cancellable = (Cancellable) previous;
99+
cancellable.cancel();
100+
}
101+
return true;
102+
} else {
103+
return false;
104+
}
105+
}
106+
107+
void subscribed(Subscription subscription) {
108+
while (true) {
109+
Object old = UPDATER.get(this);
110+
if (old == COMPLETE || old instanceof Subscription) {
111+
// TODO: What to do here?
112+
LOGGER.debug("Publisher subscribed to multiple times.");
113+
return;
114+
} else if (UPDATER.compareAndSet(this, old, subscription)) {
115+
return;
116+
}
117+
}
118+
}
119+
120+
// TODO: move this to a phantom reference approach.
121+
@Override
122+
protected void finalize() throws Throwable {
123+
super.finalize();
124+
if (checkComplete()) {
125+
LOGGER.warn("LEAK detected.");
126+
}
127+
}
128+
}
129+
}

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
* @param <MetaData> type of meta-data in front of the stream of {@link Payload}, eg. {@link HttpResponseMetaData}
5353
* @param <Payload> type of payload inside the {@link Data}, eg. {@link Buffer}
5454
*/
55-
final class SpliceFlatStreamToMetaSingle<Data, MetaData, Payload> implements PublisherToSingleOperator<Object, Data> {
55+
// TODO: revert: this shouldn't be public.
56+
public final class SpliceFlatStreamToMetaSingle<Data, MetaData, Payload> implements PublisherToSingleOperator<Object, Data> {
5657
private static final Logger LOGGER = LoggerFactory.getLogger(SpliceFlatStreamToMetaSingle.class);
5758
private final BiFunction<MetaData, Publisher<Payload>, Data> packer;
5859

@@ -64,7 +65,7 @@ final class SpliceFlatStreamToMetaSingle<Data, MetaData, Payload> implements Pub
6465
* @param packer function to pack the {@link Publisher}&lt;{@link Payload}&gt; and {@link MetaData} into a
6566
* {@link Data}
6667
*/
67-
SpliceFlatStreamToMetaSingle(BiFunction<MetaData, Publisher<Payload>, Data> packer) {
68+
public SpliceFlatStreamToMetaSingle(BiFunction<MetaData, Publisher<Payload>, Data> packer) {
6869
this.packer = requireNonNull(packer);
6970
}
7071

0 commit comments

Comments
 (0)