Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer: Add ConnectTracker and make HealthIndicator extend it #2818

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

/**
* An interface for tracking connection establishment measurements.
* This has an intended usage similar to the {@link RequestTracker} but with a focus on connection establishment
* metrics.
*/
interface ConnectTracker {

/**
* Get the current time in nanoseconds.
* Note: this must not be a stateful API. Eg, it does not necessarily have a correlation with any other method call
* and such shouldn't be used as a method of counting in the same way that {@link RequestTracker} is used.
* @return the current time in nanoseconds.
*/
long beforeConnectStart();

/**
* Callback to notify the parent {@link HealthChecker} that an attempt to connect to this host has succeeded.
* @param beforeConnectStart the time that the connection attempt was initiated.
*/
void onConnectSuccess(long beforeConnectStart);

/**
* Callback to notify the parent {@link HealthChecker} that an attempt to connect to this host has failed.
* @param beforeConnectStart the time that the connection attempt was initiated.
*/
void onConnectError(long beforeConnectStart);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionLimitReachedException;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
Expand All @@ -26,6 +29,7 @@
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.loadbalancer.LoadBalancerObserver.HostObserver;
import io.servicetalk.transport.api.TransportObserver;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -112,10 +117,12 @@ private enum State {
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
this.address = requireNonNull(address, "address");
this.linearSearchSpace = linearSearchSpace;
this.connectionFactory = requireNonNull(connectionFactory, "connectionFactory");
this.healthIndicator = healthIndicator;
requireNonNull(connectionFactory, "connectionFactory");
this.connectionFactory = healthIndicator == null ? connectionFactory :
new InstrumentedConnectionFactory<>(connectionFactory, healthIndicator);
this.healthCheckConfig = healthCheckConfig;
this.hostObserver = requireNonNull(hostObserver, "hostObserver");
this.healthIndicator = healthIndicator;
this.closeable = toAsyncCloseable(this::doClose);
hostObserver.onHostCreated(address);
}
Expand Down Expand Up @@ -231,10 +238,11 @@ public Single<C> newConnection(
Single<? extends C> establishConnection = connectionFactory.newConnection(address, context, null);
if (healthCheckConfig != null) {
// Schedule health check before returning
establishConnection = establishConnection.beforeOnError(this::markUnhealthy);
establishConnection = establishConnection.beforeOnError(this::onConnectionError);
}
return establishConnection
.flatMap(newCnx -> {

if (forceNewConnectionAndReserve && !newCnx.tryReserve()) {
return newCnx.closeAsync().<C>concat(failed(
Exceptions.StacklessConnectionRejectedException.newInstance(
Expand Down Expand Up @@ -296,7 +304,7 @@ private void markHealthy(final HealthCheck originalHealthCheckState) {
}
}

private void markUnhealthy(final Throwable cause) {
private void onConnectionError(Throwable cause) {
assert healthCheckConfig != null;
for (;;) {
ConnState previous = connStateUpdater.get(this);
Expand Down Expand Up @@ -640,4 +648,63 @@ public String toString() {
'}';
}
}

private static final class InstrumentedConnectionFactory<Addr, C extends LoadBalancedConnection>
extends DelegatingConnectionFactory<Addr, C> {

private final ConnectTracker connectTracker;

InstrumentedConnectionFactory(final ConnectionFactory<Addr, C> delegate, ConnectTracker connectTracker) {
super(delegate);
this.connectTracker = connectTracker;
}

@Override
public Single<C> newConnection(Addr addr, @Nullable ContextMap context, @Nullable TransportObserver observer) {
return super.newConnection(addr, context, observer).liftSync(delegate -> new ConnectSubscriber(delegate));
}

private class ConnectSubscriber implements SingleSource.Subscriber<C> {

private final AtomicBoolean once = new AtomicBoolean();
private final SingleSource.Subscriber<? super C> delegate;
private final long connectStartTime;

ConnectSubscriber(final SingleSource.Subscriber<? super C> delegate) {
this.delegate = delegate;
this.connectStartTime = connectTracker.beforeConnectStart();
}

@Override
public void onSubscribe(final Cancellable cancellable) {
delegate.onSubscribe(() -> {
if (once()) {
// we assume that cancellation is the result of taking to long so it's an error.
connectTracker.onConnectError(connectStartTime);
}
cancellable.cancel();
});
}

@Override
public void onSuccess(@Nullable C result) {
if (once()) {
connectTracker.onConnectSuccess(connectStartTime);
}
delegate.onSuccess(result);
}

@Override
public void onError(Throwable t) {
if (once()) {
connectTracker.onConnectError(connectStartTime);
}
delegate.onError(t);
}

private boolean once() {
return !once.getAndSet(true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ enum ErrorClass {
* Failures related to locally enforced timeouts that prevent session establishment with the peer.
*/
LOCAL_ORIGIN_TIMEOUT(true),
/**
* Failures related to connection establishment.
*/
LOCAL_ORIGIN_CONNECT_FAILED(true),

/**
* Failures related to locally enforced timeouts waiting for responses from the peer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* health check system can give the host information about it's perceived health and the host can give the
* health check system information about request results.
*/
interface HealthIndicator extends RequestTracker, ScoreSupplier, Cancellable {
interface HealthIndicator extends RequestTracker, ConnectTracker, ScoreSupplier, Cancellable {

/**
* Whether the host is considered healthy by the HealthIndicator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ abstract class XdsHealthIndicator<ResolvedAddress> extends DefaultRequestTracker
protected abstract void doCancel();

@Override
protected final long currentTimeNanos() {
public final long currentTimeNanos() {
return executor.currentTime(TimeUnit.NANOSECONDS);
}

Expand Down Expand Up @@ -126,9 +126,30 @@ public final void onSuccess(final long beforeStartTimeNs) {
public final void onError(final long beforeStartTimeNs, ErrorClass errorClass) {
super.onError(beforeStartTimeNs, errorClass);
// For now, don't consider cancellation to be an error or a success.
if (errorClass == ErrorClass.CANCELLED) {
return;
if (errorClass != ErrorClass.CANCELLED) {
doOnError();
}
}

@Override
public long beforeConnectStart() {
return currentTimeNanos();
}

@Override
public void onConnectError(long beforeConnectStart) {
// This assumes that the connect request was intended to be used for a request dispatch which
// will have now failed. This is not strictly true: a connection can be acquired and simply not
// used, but in practice it's a very good assumption.
doOnError();
}

@Override
public void onConnectSuccess(long beforeConnectStart) {
// noop: the request path will now determine if the request was a success or failure.
}

private void doOnError() {
failures.incrementAndGet();
final int consecutiveFailures = consecutive5xx.incrementAndGet();
final OutlierDetectorConfig localConfig = currentConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
import static io.servicetalk.loadbalancer.UnhealthyHostConnectionFactory.UNHEALTHY_HOST_EXCEPTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -233,4 +236,17 @@ void forwardsHealthIndicatorScore() {
assertThat(host.score(), is(10));
verify(healthIndicator, times(1)).score();
}

@Test
void connectFailuresAreForwardedToHealthIndicator() {
connectionFactory = new TestConnectionFactory(address -> failed(DELIBERATE_EXCEPTION));
HealthIndicator healthIndicator = mock(HealthIndicator.class);
buildHost(healthIndicator);
verify(mockHostObserver, times(1)).onHostCreated("address");
Throwable underlying = assertThrows(ExecutionException.class, () ->
host.newConnection(cxn -> true, false, null).toFuture().get()).getCause();
assertEquals(DELIBERATE_EXCEPTION, underlying);
verify(healthIndicator, times(1)).beforeConnectStart();
verify(healthIndicator, times(1)).onConnectError(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,19 @@ public long beforeStart() {
return 0;
}

@Override
public long beforeConnectStart() {
return 0;
}

@Override
public void onConnectSuccess(long beforeConnectStart) {
}

@Override
public void onConnectError(long beforeConnectStart) {
}

@Override
public void cancel() {
synchronized (indicatorSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void test() {
Assertions.assertEquals(-500, requestTracker.score());

// error penalty
requestTracker.onError(requestTracker.beforeStart(), ErrorClass.LOCAL_ORIGIN_CONNECT_FAILED);
requestTracker.onError(requestTracker.beforeStart(), ErrorClass.EXT_ORIGIN_REQUEST_FAILED);
Assertions.assertEquals(-5000, requestTracker.score());

// cancellation penalty
Expand Down
Loading