Skip to content

Commit 61d4b3d

Browse files
Merge remote-tracking branch 'origin/main' into bl_anderson/configurable_connection_selection
2 parents 8842345 + 840aca3 commit 61d4b3d

17 files changed

+144
-145
lines changed

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,15 @@ private enum State {
8484
@Nullable
8585
private final HealthIndicator healthIndicator;
8686
private final ConnectionPoolStrategy<C> connectionPoolStrategy;
87-
private final LoadBalancerObserver.HostObserver<Addr> hostObserver;
87+
private final LoadBalancerObserver.HostObserver hostObserver;
8888
private final ConnectionFactory<Addr, ? extends C> connectionFactory;
8989
private final ListenableAsyncCloseable closeable;
9090
private volatile ConnState connState = new ConnState(emptyList(), State.ACTIVE, 0, null);
9191

9292
DefaultHost(final String lbDescription, final Addr address,
9393
final ConnectionPoolStrategy<C> connectionPoolStrategy,
9494
final ConnectionFactory<Addr, ? extends C> connectionFactory,
95-
final HostObserver<Addr> hostObserver, final @Nullable HealthCheckConfig healthCheckConfig,
95+
final HostObserver hostObserver, final @Nullable HealthCheckConfig healthCheckConfig,
9696
final @Nullable HealthIndicator healthIndicator) {
9797
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
9898
this.address = requireNonNull(address, "address");
@@ -104,7 +104,6 @@ private enum State {
104104
this.healthCheckConfig = healthCheckConfig;
105105
this.hostObserver = requireNonNull(hostObserver, "hostObserver");
106106
this.closeable = toAsyncCloseable(this::doClose);
107-
hostObserver.onHostCreated(address);
108107
}
109108

110109
@Override
@@ -124,7 +123,7 @@ public boolean markActiveIfNotClosed() {
124123
return oldConnState;
125124
});
126125
if (oldState.state == State.EXPIRED) {
127-
hostObserver.onExpiredHostRevived(address, oldState.connections.size());
126+
hostObserver.onExpiredHostRevived(oldState.connections.size());
128127
}
129128
return oldState.state != State.CLOSED;
130129
}
@@ -161,7 +160,7 @@ public boolean markExpired() {
161160
Object nextState = oldState.connections.isEmpty() ? State.CLOSED : State.EXPIRED;
162161
if (connStateUpdater.compareAndSet(this, oldState, oldState.toExpired())) {
163162
cancelIfHealthCheck(oldState);
164-
hostObserver.onHostMarkedExpired(address, oldState.connections.size());
163+
hostObserver.onHostMarkedExpired(oldState.connections.size());
165164
if (nextState == State.CLOSED) {
166165
// Trigger the callback to remove the host from usedHosts array.
167166
this.closeAsync().subscribe();
@@ -261,7 +260,7 @@ private void markHealthy(final HealthCheck originalHealthCheckState) {
261260
}
262261
// Only if the previous state was a healthcheck should we notify the observer.
263262
if (oldState.isUnhealthy()) {
264-
hostObserver.onHostRevived(address);
263+
hostObserver.onHostRevived();
265264
}
266265
}
267266

@@ -292,7 +291,7 @@ private void onConnectionError(Throwable cause) {
292291
"{} time(s) in a row. Error counting threshold reached, marking this host as " +
293292
"UNHEALTHY for the selection algorithm and triggering background health-checking.",
294293
lbDescription, address, healthCheckConfig.failedThreshold, cause);
295-
hostObserver.onHostMarkedUnhealthy(address, cause);
294+
hostObserver.onHostMarkedUnhealthy(cause);
296295
nextState.healthCheck.schedule(cause);
297296
}
298297
break;
@@ -336,7 +335,7 @@ private boolean addConnection(final C connection, final @Nullable HealthCheck cu
336335
cancelIfHealthCheck(previous);
337336
}
338337
// If we transitioned from unhealth to healthy we need to let the observer know.
339-
hostObserver.onHostRevived(address);
338+
hostObserver.onHostRevived();
340339
}
341340
break;
342341
}
@@ -373,7 +372,7 @@ private boolean addConnection(final C connection, final @Nullable HealthCheck cu
373372
// in the next iteration.
374373
&& connStateUpdater.compareAndSet(this, currentConnState, nextState.toClosed())) {
375374
closeAsync().subscribe();
376-
hostObserver.onExpiredHostRemoved(address, nextState.connections.size());
375+
hostObserver.onExpiredHostRemoved(nextState.connections.size());
377376
break;
378377
}
379378
} else {
@@ -426,9 +425,9 @@ private Completable doClose(final boolean graceful) {
426425
LOGGER.debug("{}: closing {} connection(s) {}gracefully to the closed address: {}.",
427426
lbDescription, oldState.connections.size(), graceful ? "" : "un", address);
428427
if (oldState.state == State.ACTIVE) {
429-
hostObserver.onActiveHostRemoved(address, oldState.connections.size());
428+
hostObserver.onActiveHostRemoved(oldState.connections.size());
430429
} else if (oldState.state == State.EXPIRED) {
431-
hostObserver.onExpiredHostRemoved(address, oldState.connections.size());
430+
hostObserver.onExpiredHostRemoved(oldState.connections.size());
432431
}
433432
final List<C> connections = oldState.connections;
434433
return (connections.isEmpty() ? completed() :

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@
4646
import java.util.Map.Entry;
4747
import java.util.concurrent.ThreadLocalRandom;
4848
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
49+
import java.util.function.Function;
4950
import java.util.function.Predicate;
50-
import java.util.function.Supplier;
5151
import javax.annotation.Nullable;
5252

5353
import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT;
@@ -133,7 +133,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
133133
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
134134
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver,
135135
@Nullable final HealthCheckConfig healthCheckConfig,
136-
@Nullable final Supplier<HealthChecker<ResolvedAddress>> healthCheckerFactory) {
136+
@Nullable final Function<String, HealthChecker<ResolvedAddress>> healthCheckerFactory) {
137137
this.targetResource = requireNonNull(targetResourceName);
138138
this.lbDescription = makeDescription(id, targetResource);
139139
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
@@ -150,7 +150,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
150150
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
151151
eventStream.ignoreElements().subscribe();
152152
subscribeToEvents(false);
153-
this.healthChecker = healthCheckerFactory == null ? null : healthCheckerFactory.get();
153+
this.healthChecker = healthCheckerFactory == null ? null : healthCheckerFactory.apply(lbDescription);
154154
}
155155

156156
private void subscribeToEvents(boolean resubscribe) {
@@ -383,10 +383,12 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
383383
}
384384

385385
private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
386+
final LoadBalancerObserver.HostObserver hostObserver = loadBalancerObserver.hostObserver(addr);
386387
// All hosts will share the healthcheck config of the parent RR loadbalancer.
387-
final HealthIndicator indicator = healthChecker == null ? null : healthChecker.newHealthIndicator(addr);
388+
final HealthIndicator indicator = healthChecker == null ?
389+
null : healthChecker.newHealthIndicator(addr, hostObserver);
388390
final Host<ResolvedAddress, C> host = new DefaultHost<>(lbDescription, addr, connectionPoolStrategy,
389-
connectionFactory, loadBalancerObserver.hostObserver(), healthCheckConfig, indicator);
391+
connectionFactory, loadBalancerObserver.hostObserver(addr), healthCheckConfig, indicator);
390392
host.onClose().afterFinally(() ->
391393
sequentialExecutor.execute(() -> {
392394
final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancerBuilder.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import java.time.Duration;
2727
import java.util.Collection;
2828
import java.util.Collections;
29-
import java.util.function.Supplier;
29+
import java.util.function.Function;
3030
import javax.annotation.Nullable;
3131

3232
import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
@@ -135,13 +135,13 @@ public LoadBalancerFactory<ResolvedAddress, C> build() {
135135
}
136136
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver = this.loadBalancerObserver != null ?
137137
this.loadBalancerObserver : NoopLoadBalancerObserver.instance();
138-
Supplier<HealthChecker<ResolvedAddress>> healthCheckerSupplier;
138+
Function<String, HealthChecker<ResolvedAddress>> healthCheckerSupplier;
139139
if (healthCheckerFactory == null) {
140140
healthCheckerSupplier = null;
141141
} else {
142142
final Executor executor = getExecutor();
143-
healthCheckerSupplier = () -> healthCheckerFactory.newHealthChecker(executor,
144-
loadBalancerObserver.hostObserver());
143+
healthCheckerSupplier = (String lbDescrption) ->
144+
healthCheckerFactory.newHealthChecker(executor, lbDescrption);
145145
}
146146

147147
return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, healthCheckConfig,
@@ -155,15 +155,15 @@ private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends
155155
private final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy;
156156
private final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
157157
@Nullable
158-
private final Supplier<HealthChecker<ResolvedAddress>> healthCheckerFactory;
158+
private final Function<String, HealthChecker<ResolvedAddress>> healthCheckerFactory;
159159
@Nullable
160160
private final HealthCheckConfig healthCheckConfig;
161161
private final ConnectionPoolStrategyFactory<C> connectionPoolStrategyFactory;
162162

163163
DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
164164
final HealthCheckConfig healthCheckConfig,
165165
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver,
166-
final Supplier<HealthChecker<ResolvedAddress>> healthCheckerFactory,
166+
final Function<String, HealthChecker<ResolvedAddress>> healthCheckerFactory,
167167
final ConnectionPoolStrategyFactory<C> connectionPoolStrategyFactory) {
168168
this.id = requireNonNull(id, "id");
169169
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultRequestTracker.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2121
import java.util.function.IntBinaryOperator;
2222

23+
import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
2324
import static java.lang.Integer.MAX_VALUE;
2425
import static java.lang.Integer.MIN_VALUE;
2526
import static java.lang.Math.ceil;
@@ -66,9 +67,7 @@ abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
6667
}
6768

6869
DefaultRequestTracker(final long halfLifeNanos, final long cancelPenalty, final long errorPenalty) {
69-
if (halfLifeNanos <= 0) {
70-
throw new IllegalArgumentException("halfLifeNanos: " + halfLifeNanos + " (expected >0)");
71-
}
70+
ensurePositive(halfLifeNanos, "halfLifeNanos");
7271
this.invTau = Math.pow((halfLifeNanos / log(2)), -1);
7372
this.cancelPenalty = cancelPenalty;
7473
this.errorPenalty = errorPenalty;

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HealthChecker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,5 @@ interface HealthChecker<ResolvedAddress> extends Cancellable {
3030
* @param address the resolved address of the destination.
3131
* @return new {@link HealthIndicator}.
3232
*/
33-
HealthIndicator newHealthIndicator(ResolvedAddress address);
33+
HealthIndicator newHealthIndicator(ResolvedAddress address, LoadBalancerObserver.HostObserver hostObserver);
3434
}

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HealthCheckerFactory.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.servicetalk.loadbalancer;
1717

1818
import io.servicetalk.concurrent.api.Executor;
19-
import io.servicetalk.loadbalancer.LoadBalancerObserver.HostObserver;
2019

2120
/**
2221
* A factory of {@link HealthChecker} instances. The factory will be used by load balancer
@@ -27,9 +26,8 @@ interface HealthCheckerFactory<ResolvedAddress> {
2726
/**
2827
* Create a new {@link HealthChecker}.
2928
* @param executor the {@link Executor} to use for scheduling tasks and obtaining the current time.
30-
* @param hostObserver a {@link HostObserver} to notify of
31-
* relevant host events.
29+
* @param lbDescription a description of the load balancer for logging purposes.
3230
* @return a new {@link HealthChecker}.
3331
*/
34-
HealthChecker newHealthChecker(Executor executor, HostObserver<ResolvedAddress> hostObserver);
32+
HealthChecker<ResolvedAddress> newHealthChecker(Executor executor, String lbDescription);
3533
}

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java

+8-21
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ interface LoadBalancerObserver<ResolvedAddress> {
3131
* Get a {@link HostObserver}.
3232
* @return a {@link HostObserver}.
3333
*/
34-
HostObserver<ResolvedAddress> hostObserver();
34+
HostObserver hostObserver(ResolvedAddress resolvedAddress);
3535

3636
/**
3737
* Callback for when connection selection fails due to no hosts being available.
@@ -51,55 +51,42 @@ void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<Resolve
5151

5252
/**
5353
* An observer for {@link Host} events.
54-
* @param <ResolvedAddress> the type of the resolved address.
5554
*/
56-
interface HostObserver<ResolvedAddress> {
55+
interface HostObserver {
5756

5857
/**
5958
* Callback for when an active host is marked expired.
60-
* @param address the resolved address.
6159
* @param connectionCount the number of active connections for the host.
6260
*/
63-
void onHostMarkedExpired(ResolvedAddress address, int connectionCount);
61+
void onHostMarkedExpired(int connectionCount);
6462

6563
/**
6664
* Callback for when a host is removed by service discovery.
67-
* @param address the resolved address.
6865
* @param connectionCount the number of open connections when the host was removed.
6966
*/
70-
void onActiveHostRemoved(ResolvedAddress address, int connectionCount);
67+
void onActiveHostRemoved(int connectionCount);
7168

7269
/**
7370
* Callback for when an expired host is returned to an active state.
74-
* @param address the resolved address.
7571
* @param connectionCount the number of active connections when the host was revived.
7672
*/
77-
void onExpiredHostRevived(ResolvedAddress address, int connectionCount);
73+
void onExpiredHostRevived(int connectionCount);
7874

7975
/**
8076
* Callback for when an expired host is removed.
81-
* @param address the resolved address.
8277
* @param connectionCount the number of open connections when the host was removed.
8378
*/
84-
void onExpiredHostRemoved(ResolvedAddress address, int connectionCount);
85-
86-
/**
87-
* Callback for when a host is created.
88-
* @param address the resolved address.
89-
*/
90-
void onHostCreated(ResolvedAddress address);
79+
void onExpiredHostRemoved(int connectionCount);
9180

9281
/**
9382
* Callback for when a {@link Host} transitions from healthy to unhealthy.
94-
* @param address the resolved address.
9583
* @param cause the most recent cause of the transition.
9684
*/
97-
void onHostMarkedUnhealthy(ResolvedAddress address, @Nullable Throwable cause);
85+
void onHostMarkedUnhealthy(@Nullable Throwable cause);
9886

9987
/**
10088
* Callback for when a {@link Host} transitions from unhealthy to healthy.
101-
* @param address the resolved address.
10289
*/
103-
void onHostRevived(ResolvedAddress address);
90+
void onHostRevived();
10491
}
10592
}

servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java

+10-16
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ private NoopLoadBalancerObserver() {
2929
}
3030

3131
@Override
32-
public HostObserver<ResolvedAddress> hostObserver() {
33-
return (HostObserver<ResolvedAddress>) NoopHostObserver.INSTANCE;
32+
public HostObserver hostObserver(ResolvedAddress resolvedAddress) {
33+
return NoopHostObserver.INSTANCE;
3434
}
3535

3636
@Override
@@ -49,46 +49,40 @@ public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<
4949
// noop
5050
}
5151

52-
private static final class NoopHostObserver<ResolvedAddress> implements
53-
LoadBalancerObserver.HostObserver<ResolvedAddress> {
52+
private static final class NoopHostObserver implements LoadBalancerObserver.HostObserver {
5453

55-
private static final HostObserver<Object> INSTANCE = new NoopHostObserver<>();
54+
private static final HostObserver INSTANCE = new NoopHostObserver();
5655

5756
private NoopHostObserver() {
5857
}
5958

6059
@Override
61-
public void onHostMarkedExpired(ResolvedAddress resolvedAddress, int connectionCount) {
60+
public void onHostMarkedExpired(int connectionCount) {
6261
// noop
6362
}
6463

6564
@Override
66-
public void onExpiredHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) {
65+
public void onExpiredHostRemoved(int connectionCount) {
6766
// noop
6867
}
6968

7069
@Override
71-
public void onExpiredHostRevived(ResolvedAddress resolvedAddress, int connectionCount) {
70+
public void onExpiredHostRevived(int connectionCount) {
7271
// noop
7372
}
7473

7574
@Override
76-
public void onActiveHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) {
75+
public void onActiveHostRemoved(int connectionCount) {
7776
// noop
7877
}
7978

8079
@Override
81-
public void onHostCreated(ResolvedAddress resolvedAddress) {
80+
public void onHostMarkedUnhealthy(Throwable cause) {
8281
// noop
8382
}
8483

8584
@Override
86-
public void onHostMarkedUnhealthy(ResolvedAddress address, Throwable cause) {
87-
// noop
88-
}
89-
90-
@Override
91-
public void onHostRevived(ResolvedAddress address) {
85+
public void onHostRevived() {
9286
// noop
9387
}
9488
}

0 commit comments

Comments
 (0)