Skip to content

Commit 29657c9

Browse files
loadbalancer: Add ConnectTracker and make HealthIndicator extend it (#2818)
Motivation: We can't rely on the request path to pass in connection failures because if we can't get a connection we can never add the observers. That means we need the `Host` implementation to pass it in directly. Modifications: Add two new methods to `HealthIndicator` that provide the current time and and a callback for submitting connection failures. Result: We should be able to track connection failures now and factor those into the `HealthTracker` implementations.
1 parent 11c5a3d commit 29657c9

File tree

12 files changed

+201
-50
lines changed

12 files changed

+201
-50
lines changed

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -368,23 +368,23 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
368368

369369
return Single.defer(() -> {
370370
final RequestTracker theTracker = new AtMostOnceDeliveryRequestTracker(tracker);
371-
final long startTime = theTracker.beforeStart();
371+
final long startTime = theTracker.beforeRequestStart();
372372

373373
return delegate.request(request)
374374
.liftSync(new BeforeFinallyHttpOperator(new TerminalSignalConsumer() {
375375
@Override
376376
public void onComplete() {
377-
theTracker.onSuccess(startTime);
377+
theTracker.onRequestSuccess(startTime);
378378
}
379379

380380
@Override
381381
public void onError(final Throwable throwable) {
382-
theTracker.onError(startTime, errorClassFunction.apply(throwable));
382+
theTracker.onRequestError(startTime, errorClassFunction.apply(throwable));
383383
}
384384

385385
@Override
386386
public void cancel() {
387-
theTracker.onError(startTime, ErrorClass.CANCELLED);
387+
theTracker.onRequestError(startTime, ErrorClass.CANCELLED);
388388
}
389389
}, /*discardEventsAfterCancel*/ true))
390390

@@ -397,7 +397,7 @@ public void cancel() {
397397
final ErrorClass eClass = peerResponseErrorClassifier.apply(response);
398398
if (eClass != null) {
399399
// The onError is triggered before the body is actually consumed.
400-
theTracker.onError(startTime, eClass);
400+
theTracker.onRequestError(startTime, eClass);
401401
}
402402
return response;
403403
})
@@ -453,21 +453,21 @@ private AtMostOnceDeliveryRequestTracker(final RequestTracker original) {
453453
}
454454

455455
@Override
456-
public long beforeStart() {
457-
return original.beforeStart();
456+
public long beforeRequestStart() {
457+
return original.beforeRequestStart();
458458
}
459459

460460
@Override
461-
public void onSuccess(final long beforeStartTimeNs) {
461+
public void onRequestSuccess(final long beforeStartTimeNs) {
462462
if (doneUpdater.compareAndSet(this, 0, 1)) {
463-
original.onSuccess(beforeStartTimeNs);
463+
original.onRequestSuccess(beforeStartTimeNs);
464464
}
465465
}
466466

467467
@Override
468-
public void onError(final long beforeStartTimeNs, final ErrorClass errorClass) {
468+
public void onRequestError(final long beforeStartTimeNs, final ErrorClass errorClass) {
469469
if (doneUpdater.compareAndSet(this, 0, 1)) {
470-
original.onError(beforeStartTimeNs, errorClass);
470+
original.onRequestError(beforeStartTimeNs, errorClass);
471471
}
472472
}
473473
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.loadbalancer;
17+
18+
/**
19+
* An interface for tracking connection establishment measurements.
20+
* This has an intended usage similar to the {@link RequestTracker} but with a focus on connection establishment
21+
* metrics.
22+
*/
23+
interface ConnectTracker {
24+
25+
/**
26+
* Get the current time in nanoseconds.
27+
* Note: this must not be a stateful API. Eg, it does not necessarily have a correlation with any other method call
28+
* and such shouldn't be used as a method of counting in the same way that {@link RequestTracker} is used.
29+
* @return the current time in nanoseconds.
30+
*/
31+
long beforeConnectStart();
32+
33+
/**
34+
* Callback to notify the parent {@link HealthChecker} that an attempt to connect to this host has succeeded.
35+
* @param beforeConnectStart the time that the connection attempt was initiated.
36+
*/
37+
void onConnectSuccess(long beforeConnectStart);
38+
39+
/**
40+
* Callback to notify the parent {@link HealthChecker} that an attempt to connect to this host has failed.
41+
* @param beforeConnectStart the time that the connection attempt was initiated.
42+
*/
43+
void onConnectError(long beforeConnectStart);
44+
}

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java

+61-4
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
import io.servicetalk.client.api.ConnectionFactory;
1919
import io.servicetalk.client.api.ConnectionLimitReachedException;
20+
import io.servicetalk.client.api.DelegatingConnectionFactory;
2021
import io.servicetalk.client.api.LoadBalancedConnection;
2122
import io.servicetalk.concurrent.api.AsyncContext;
2223
import io.servicetalk.concurrent.api.Completable;
2324
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
2425
import io.servicetalk.concurrent.api.Single;
26+
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
2527
import io.servicetalk.concurrent.internal.DefaultContextMap;
2628
import io.servicetalk.concurrent.internal.DelayedCancellable;
2729
import io.servicetalk.context.api.ContextMap;
2830
import io.servicetalk.loadbalancer.LoadBalancerObserver.HostObserver;
31+
import io.servicetalk.transport.api.TransportObserver;
2932

3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
@@ -112,10 +115,12 @@ private enum State {
112115
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
113116
this.address = requireNonNull(address, "address");
114117
this.linearSearchSpace = linearSearchSpace;
115-
this.connectionFactory = requireNonNull(connectionFactory, "connectionFactory");
118+
this.healthIndicator = healthIndicator;
119+
requireNonNull(connectionFactory, "connectionFactory");
120+
this.connectionFactory = healthIndicator == null ? connectionFactory :
121+
new InstrumentedConnectionFactory<>(connectionFactory, healthIndicator);
116122
this.healthCheckConfig = healthCheckConfig;
117123
this.hostObserver = requireNonNull(hostObserver, "hostObserver");
118-
this.healthIndicator = healthIndicator;
119124
this.closeable = toAsyncCloseable(this::doClose);
120125
hostObserver.onHostCreated(address);
121126
}
@@ -235,7 +240,7 @@ public Single<C> newConnection(
235240
Single<? extends C> establishConnection = connectionFactory.newConnection(address, actualContext, null);
236241
if (healthCheckConfig != null) {
237242
// Schedule health check before returning
238-
establishConnection = establishConnection.beforeOnError(this::markUnhealthy);
243+
establishConnection = establishConnection.beforeOnError(this::onConnectionError);
239244
}
240245
return establishConnection
241246
.flatMap(newCnx -> {
@@ -302,7 +307,7 @@ private void markHealthy(final HealthCheck originalHealthCheckState) {
302307
}
303308
}
304309

305-
private void markUnhealthy(final Throwable cause) {
310+
private void onConnectionError(Throwable cause) {
306311
assert healthCheckConfig != null;
307312
for (;;) {
308313
ConnState previous = connStateUpdater.get(this);
@@ -646,4 +651,56 @@ public String toString() {
646651
'}';
647652
}
648653
}
654+
655+
private static final class InstrumentedConnectionFactory<Addr, C extends LoadBalancedConnection>
656+
extends DelegatingConnectionFactory<Addr, C> {
657+
658+
private final ConnectTracker connectTracker;
659+
660+
InstrumentedConnectionFactory(final ConnectionFactory<Addr, C> delegate, ConnectTracker connectTracker) {
661+
super(delegate);
662+
this.connectTracker = connectTracker;
663+
}
664+
665+
@Override
666+
public Single<C> newConnection(Addr addr, @Nullable ContextMap context, @Nullable TransportObserver observer) {
667+
return Single.defer(() -> {
668+
final long connectStartTime = connectTracker.beforeConnectStart();
669+
return delegate().newConnection(addr, context, observer)
670+
.beforeFinally(new ConnectSignalConsumer<>(connectStartTime, connectTracker))
671+
.shareContextOnSubscribe();
672+
});
673+
}
674+
}
675+
676+
private static class ConnectSignalConsumer<C extends LoadBalancedConnection> implements TerminalSignalConsumer {
677+
678+
private final ConnectTracker connectTracker;
679+
private final long connectStartTime;
680+
681+
ConnectSignalConsumer(final long connectStartTime, final ConnectTracker connectTracker) {
682+
this.connectStartTime = connectStartTime;
683+
this.connectTracker = connectTracker;
684+
}
685+
686+
@Override
687+
public void onComplete() {
688+
connectTracker.onConnectSuccess(connectStartTime);
689+
}
690+
691+
@Override
692+
public void cancel() {
693+
// We assume cancellation is the result of some sort of timeout.
694+
doOnError();
695+
}
696+
697+
@Override
698+
public void onError(Throwable t) {
699+
doOnError();
700+
}
701+
702+
private void doOnError() {
703+
connectTracker.onConnectError(connectStartTime);
704+
}
705+
}
649706
}

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultRequestTracker.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,19 @@ abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
8181
protected abstract long currentTimeNanos();
8282

8383
@Override
84-
public final long beforeStart() {
84+
public final long beforeRequestStart() {
8585
pendingUpdater.incrementAndGet(this);
8686
return currentTimeNanos();
8787
}
8888

8989
@Override
90-
public void onSuccess(final long startTimeNanos) {
90+
public void onRequestSuccess(final long startTimeNanos) {
9191
pendingUpdater.decrementAndGet(this);
9292
calculateAndStore((ewma, currentLatency) -> currentLatency, startTimeNanos);
9393
}
9494

9595
@Override
96-
public void onError(final long startTimeNanos, ErrorClass errorClass) {
96+
public void onRequestError(final long startTimeNanos, ErrorClass errorClass) {
9797
pendingUpdater.decrementAndGet(this);
9898
calculateAndStore(errorClass == ErrorClass.CANCELLED ? this:: cancelPenalty : this::errorPenalty,
9999
startTimeNanos);

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HealthIndicator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* health check system can give the host information about it's perceived health and the host can give the
2727
* health check system information about request results.
2828
*/
29-
interface HealthIndicator extends RequestTracker, ScoreSupplier, Cancellable {
29+
interface HealthIndicator extends RequestTracker, ConnectTracker, ScoreSupplier, Cancellable {
3030

3131
/**
3232
* Whether the host is considered healthy by the HealthIndicator.

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RequestTracker.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
* A tracker of latency of an action over time.
2222
* <p>
2323
* The usage of the RequestTracker is intended to follow the simple workflow:
24-
* - At initiation of an action for which a request is must call {@link RequestTracker#beforeStart()} and save the
25-
* timestamp much like would be done when using a stamped lock.
26-
* - Once the request event is complete only one of the {@link RequestTracker#onSuccess(long)} or
27-
* {@link RequestTracker#onError(long, ErrorClass)} methods must be called and called exactly once.
28-
* In other words, every call to {@link RequestTracker#beforeStart()} must be followed by exactly one call to either of
29-
* the completion methods {@link RequestTracker#onSuccess(long)} or
30-
* {@link RequestTracker#onError(long, ErrorClass)}. Failure to do so can cause state corruption in the
24+
* - At initiation of an action for which a request is must call {@link RequestTracker#beforeRequestStart()} and save
25+
* the timestamp much like would be done when using a stamped lock.
26+
* - Once the request event is complete only one of the {@link RequestTracker#onRequestSuccess(long)} or
27+
* {@link RequestTracker#onRequestError(long, ErrorClass)} methods must be called and called exactly once.
28+
* In other words, every call to {@link RequestTracker#beforeRequestStart()} must be followed by exactly one call to
29+
* either of the completion methods {@link RequestTracker#onRequestSuccess(long)} or
30+
* {@link RequestTracker#onRequestError(long, ErrorClass)}. Failure to do so can cause state corruption in the
3131
* {@link RequestTracker} implementations which may track not just latency but also the outstanding requests.
3232
*/
3333
public interface RequestTracker {
@@ -40,20 +40,20 @@ public interface RequestTracker {
4040
*
4141
* @return Current time in nanoseconds.
4242
*/
43-
long beforeStart();
43+
long beforeRequestStart();
4444

4545
/**
4646
* Records a successful completion of the action for which latency is to be tracked.
4747
*
48-
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
48+
* @param beforeStartTimeNs return value from {@link #beforeRequestStart()}.
4949
*/
50-
void onSuccess(long beforeStartTimeNs);
50+
void onRequestSuccess(long beforeStartTimeNs);
5151

5252
/**
5353
* Records a failed completion of the action for which latency is to be tracked.
5454
*
55-
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
55+
* @param beforeStartTimeNs return value from {@link #beforeRequestStart()}.
5656
* @param errorClass the class of error that triggered this method.
5757
*/
58-
void onError(long beforeStartTimeNs, ErrorClass errorClass);
58+
void onRequestError(long beforeStartTimeNs, ErrorClass errorClass);
5959
}

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/XdsHealthIndicator.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -115,20 +115,41 @@ public final boolean isHealthy() {
115115
}
116116

117117
@Override
118-
public final void onSuccess(final long beforeStartTimeNs) {
119-
super.onSuccess(beforeStartTimeNs);
118+
public final void onRequestSuccess(final long beforeStartTimeNs) {
119+
super.onRequestSuccess(beforeStartTimeNs);
120120
successes.incrementAndGet();
121121
consecutive5xx.set(0);
122122
LOGGER.trace("Observed success for address {}", address);
123123
}
124124

125125
@Override
126-
public final void onError(final long beforeStartTimeNs, ErrorClass errorClass) {
127-
super.onError(beforeStartTimeNs, errorClass);
126+
public final void onRequestError(final long beforeStartTimeNs, ErrorClass errorClass) {
127+
super.onRequestError(beforeStartTimeNs, errorClass);
128128
// For now, don't consider cancellation to be an error or a success.
129-
if (errorClass == ErrorClass.CANCELLED) {
130-
return;
129+
if (errorClass != ErrorClass.CANCELLED) {
130+
doOnError();
131131
}
132+
}
133+
134+
@Override
135+
public long beforeConnectStart() {
136+
return currentTimeNanos();
137+
}
138+
139+
@Override
140+
public void onConnectError(long beforeConnectStart) {
141+
// This assumes that the connect request was intended to be used for a request dispatch which
142+
// will have now failed. This is not strictly true: a connection can be acquired and simply not
143+
// used, but in practice it's a very good assumption.
144+
doOnError();
145+
}
146+
147+
@Override
148+
public void onConnectSuccess(long beforeConnectStart) {
149+
// noop: the request path will now determine if the request was a success or failure.
150+
}
151+
152+
private void doOnError() {
132153
failures.incrementAndGet();
133154
final int consecutiveFailures = consecutive5xx.incrementAndGet();
134155
final OutlierDetectorConfig localConfig = currentConfig();

servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java

+16
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
import java.util.function.Predicate;
3030
import javax.annotation.Nullable;
3131

32+
import static io.servicetalk.concurrent.api.Single.failed;
3233
import static io.servicetalk.concurrent.api.Single.succeeded;
34+
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
3335
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
3436
import static io.servicetalk.loadbalancer.UnhealthyHostConnectionFactory.UNHEALTHY_HOST_EXCEPTION;
3537
import static org.hamcrest.MatcherAssert.assertThat;
3638
import static org.hamcrest.Matchers.is;
39+
import static org.junit.jupiter.api.Assertions.assertEquals;
3740
import static org.junit.jupiter.api.Assertions.assertThrows;
3841
import static org.mockito.Mockito.mock;
3942
import static org.mockito.Mockito.times;
@@ -233,4 +236,17 @@ void forwardsHealthIndicatorScore() {
233236
assertThat(host.score(), is(10));
234237
verify(healthIndicator, times(1)).score();
235238
}
239+
240+
@Test
241+
void connectFailuresAreForwardedToHealthIndicator() {
242+
connectionFactory = new TestConnectionFactory(address -> failed(DELIBERATE_EXCEPTION));
243+
HealthIndicator healthIndicator = mock(HealthIndicator.class);
244+
buildHost(healthIndicator);
245+
verify(mockHostObserver, times(1)).onHostCreated("address");
246+
Throwable underlying = assertThrows(ExecutionException.class, () ->
247+
host.newConnection(cxn -> true, false, null).toFuture().get()).getCause();
248+
assertEquals(DELIBERATE_EXCEPTION, underlying);
249+
verify(healthIndicator, times(1)).beforeConnectStart();
250+
verify(healthIndicator, times(1)).onConnectError(0L);
251+
}
236252
}

0 commit comments

Comments
 (0)