Skip to content

Commit 11c5a3d

Browse files
tkountisbryce-andersonidelpivnitskiy
authored
LoadBalancer wire the tracker on the host layers with the request flow (#2816)
Motivation In order for the new load balancer features to work, the health indicators need to be wired as request trackers on the request flow. Modification - Added additional configuration options in the DefaultHttpLoadBalancerFactory to support error classifications. Feature only useful to load-balancers that support it. - When the Context is available and a request-tracker is present in it, the request flow is now enriched with additional logic to track the state and feed it to the request-tracker. Co-authored-by: Bryce Anderson <bryce_anderson@apple.com> Co-authored-by: Idel Pivnitskiy <idel.pivnitskiy@apple.com>
1 parent 954c147 commit 11c5a3d

File tree

6 files changed

+325
-111
lines changed

6 files changed

+325
-111
lines changed

servicetalk-http-netty/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies {
3333
implementation project(":servicetalk-dns-discovery-netty")
3434
implementation project(":servicetalk-http-utils")
3535
implementation project(":servicetalk-loadbalancer")
36+
implementation project(":servicetalk-loadbalancer-experimental")
3637
implementation project(":servicetalk-logging-slf4j-internal")
3738
implementation project(":servicetalk-tcp-netty-internal")
3839
implementation project(":servicetalk-transport-netty")

servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java

+263-5
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
import io.servicetalk.client.api.ConnectionFactory;
1919
import io.servicetalk.client.api.LoadBalancer;
2020
import io.servicetalk.client.api.LoadBalancerFactory;
21+
import io.servicetalk.client.api.ReservableRequestConcurrencyController;
2122
import io.servicetalk.client.api.ScoreSupplier;
2223
import io.servicetalk.client.api.ServiceDiscovererEvent;
2324
import io.servicetalk.concurrent.api.Completable;
2425
import io.servicetalk.concurrent.api.Publisher;
2526
import io.servicetalk.concurrent.api.Single;
27+
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
28+
import io.servicetalk.context.api.ContextMap;
2629
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
2730
import io.servicetalk.http.api.FilterableStreamingHttpLoadBalancedConnection;
2831
import io.servicetalk.http.api.HttpConnectionContext;
@@ -31,14 +34,37 @@
3134
import io.servicetalk.http.api.HttpExecutionStrategy;
3235
import io.servicetalk.http.api.HttpLoadBalancerFactory;
3336
import io.servicetalk.http.api.HttpRequestMethod;
37+
import io.servicetalk.http.api.HttpResponseMetaData;
38+
import io.servicetalk.http.api.ReservedBlockingHttpConnection;
39+
import io.servicetalk.http.api.ReservedBlockingStreamingHttpConnection;
40+
import io.servicetalk.http.api.ReservedHttpConnection;
3441
import io.servicetalk.http.api.StreamingHttpRequest;
3542
import io.servicetalk.http.api.StreamingHttpResponse;
3643
import io.servicetalk.http.api.StreamingHttpResponseFactory;
44+
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
45+
import io.servicetalk.loadbalancer.ErrorClass;
46+
import io.servicetalk.loadbalancer.RequestTracker;
3747
import io.servicetalk.loadbalancer.RoundRobinLoadBalancers;
3848

49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
51+
52+
import java.net.ConnectException;
3953
import java.util.Collection;
54+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
55+
import java.util.function.Function;
56+
import javax.annotation.Nullable;
4057

58+
import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingConnection;
59+
import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingStreamingConnection;
60+
import static io.servicetalk.http.api.HttpApiConversions.toReservedConnection;
61+
import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.SERVER_ERROR_5XX;
62+
import static io.servicetalk.http.api.HttpResponseStatus.TOO_MANY_REQUESTS;
63+
import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_CONNECT_FAILED;
64+
import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_REQUEST_FAILED;
65+
import static io.servicetalk.loadbalancer.RequestTracker.REQUEST_TRACKER_KEY;
4166
import static java.util.Objects.requireNonNull;
67+
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
4268

4369
/**
4470
* Default implementation of {@link HttpLoadBalancerFactory}.
@@ -47,13 +73,20 @@
4773
*/
4874
public final class DefaultHttpLoadBalancerFactory<ResolvedAddress>
4975
implements HttpLoadBalancerFactory<ResolvedAddress> {
76+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHttpLoadBalancerFactory.class);
5077
private final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory;
78+
private final Function<Throwable, ErrorClass> errorClassFunction;
79+
private final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier;
5180
private final HttpExecutionStrategy strategy;
5281

5382
DefaultHttpLoadBalancerFactory(
5483
final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory,
84+
final Function<Throwable, ErrorClass> errorClassFunction,
85+
final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier,
5586
final HttpExecutionStrategy strategy) {
5687
this.rawFactory = rawFactory;
88+
this.errorClassFunction = errorClassFunction;
89+
this.peerResponseErrorClassifier = peerResponseErrorClassifier;
5790
this.strategy = strategy;
5891
}
5992

@@ -80,6 +113,36 @@ public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection(
80113
return new DefaultFilterableStreamingHttpLoadBalancedConnection(connection);
81114
}
82115

116+
@Override
117+
public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection(
118+
final FilterableStreamingHttpConnection connection,
119+
final ReservableRequestConcurrencyController concurrencyController,
120+
@Nullable final ContextMap context) {
121+
122+
RequestTracker hostHealthIndicator = null;
123+
if (context == null) {
124+
LOGGER.debug("Context is null. In order for " + DefaultHttpLoadBalancerFactory.class.getSimpleName() +
125+
":toLoadBalancedConnection to get access to the " + RequestTracker.class.getSimpleName() +
126+
", health-monitor of this connection, the context must not be null.");
127+
} else {
128+
hostHealthIndicator = context.get(REQUEST_TRACKER_KEY);
129+
if (hostHealthIndicator == null) {
130+
LOGGER.debug(REQUEST_TRACKER_KEY.name() + " is not set in context. " +
131+
"In order for " + DefaultHttpLoadBalancerFactory.class.getSimpleName() +
132+
":toLoadBalancedConnection to get access to the " + RequestTracker.class.getSimpleName() +
133+
", health-monitor of this connection, the context must be properly wired.");
134+
}
135+
}
136+
137+
if (hostHealthIndicator == null) {
138+
return new HttpLoadBalancerFactory.DefaultFilterableStreamingHttpLoadBalancedConnection(connection,
139+
concurrencyController);
140+
}
141+
142+
return new DefaultHttpLoadBalancedConnection(connection, concurrencyController,
143+
errorClassFunction, peerResponseErrorClassifier, hostHealthIndicator);
144+
}
145+
83146
@Override
84147
public HttpExecutionStrategy requiredOffloads() {
85148
return strategy;
@@ -94,6 +157,11 @@ public HttpExecutionStrategy requiredOffloads() {
94157
public static final class Builder<ResolvedAddress> {
95158
private final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory;
96159
private final HttpExecutionStrategy strategy;
160+
private final Function<Throwable, ErrorClass> errorClassifier = t -> t instanceof ConnectException ?
161+
LOCAL_ORIGIN_CONNECT_FAILED : LOCAL_ORIGIN_REQUEST_FAILED;
162+
private final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier = resp ->
163+
(resp.status().statusClass() == SERVER_ERROR_5XX || TOO_MANY_REQUESTS.equals(resp.status())) ?
164+
ErrorClass.EXT_ORIGIN_REQUEST_FAILED : null;
97165

98166
private Builder(
99167
final LoadBalancerFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> rawFactory,
@@ -108,7 +176,8 @@ private Builder(
108176
* @return A {@link DefaultHttpLoadBalancerFactory}.
109177
*/
110178
public DefaultHttpLoadBalancerFactory<ResolvedAddress> build() {
111-
return new DefaultHttpLoadBalancerFactory<>(rawFactory, strategy);
179+
return new DefaultHttpLoadBalancerFactory<>(rawFactory, errorClassifier, peerResponseErrorClassifier,
180+
strategy);
112181
}
113182

114183
/**
@@ -153,10 +222,10 @@ private static final class DefaultFilterableStreamingHttpLoadBalancedConnection
153222
@Override
154223
public int score() {
155224
throw new UnsupportedOperationException(
156-
DefaultFilterableStreamingHttpLoadBalancedConnection.class.getName() +
157-
" doesn't support scoring. " + ScoreSupplier.class.getName() +
158-
" is only available through " + HttpLoadBalancerFactory.class.getSimpleName() +
159-
" implementations that support scoring.");
225+
DefaultFilterableStreamingHttpLoadBalancedConnection.class.getName() +
226+
" doesn't support scoring. " + ScoreSupplier.class.getName() +
227+
" is only available through " + HttpLoadBalancerFactory.class.getSimpleName() +
228+
" implementations that support scoring.");
160229
}
161230

162231
@Override
@@ -214,4 +283,193 @@ public String toString() {
214283
return delegate.toString();
215284
}
216285
}
286+
287+
private static final class DefaultHttpLoadBalancedConnection
288+
implements FilterableStreamingHttpLoadBalancedConnection {
289+
private final FilterableStreamingHttpConnection delegate;
290+
private final ReservableRequestConcurrencyController concurrencyController;
291+
private final Function<Throwable, ErrorClass> errorClassFunction;
292+
private final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier;
293+
@Nullable
294+
private final RequestTracker tracker;
295+
296+
DefaultHttpLoadBalancedConnection(final FilterableStreamingHttpConnection delegate,
297+
final ReservableRequestConcurrencyController concurrencyController,
298+
final Function<Throwable, ErrorClass> errorClassFunction,
299+
final Function<HttpResponseMetaData, ErrorClass> peerResponseErrorClassifier,
300+
@Nullable final RequestTracker tracker) {
301+
this.delegate = delegate;
302+
this.concurrencyController = concurrencyController;
303+
this.errorClassFunction = errorClassFunction;
304+
this.peerResponseErrorClassifier = peerResponseErrorClassifier;
305+
this.tracker = tracker;
306+
}
307+
308+
@Override
309+
public int score() {
310+
return 1;
311+
}
312+
313+
@Override
314+
public ReservedHttpConnection asConnection() {
315+
return toReservedConnection(this, executionContext().executionStrategy());
316+
}
317+
318+
@Override
319+
public ReservedBlockingStreamingHttpConnection asBlockingStreamingConnection() {
320+
return toReservedBlockingStreamingConnection(this, executionContext().executionStrategy());
321+
}
322+
323+
@Override
324+
public ReservedBlockingHttpConnection asBlockingConnection() {
325+
return toReservedBlockingConnection(this, executionContext().executionStrategy());
326+
}
327+
328+
@Override
329+
public Completable releaseAsync() {
330+
return concurrencyController.releaseAsync();
331+
}
332+
333+
@Override
334+
public Completable closeAsyncGracefully() {
335+
return delegate.closeAsyncGracefully();
336+
}
337+
338+
@Override
339+
public Result tryRequest() {
340+
return concurrencyController.tryRequest();
341+
}
342+
343+
@Override
344+
public boolean tryReserve() {
345+
return concurrencyController.tryReserve();
346+
}
347+
348+
@Override
349+
public void requestFinished() {
350+
concurrencyController.requestFinished();
351+
}
352+
353+
@Override
354+
public HttpConnectionContext connectionContext() {
355+
return delegate.connectionContext();
356+
}
357+
358+
@Override
359+
public <T> Publisher<? extends T> transportEventStream(final HttpEventKey<T> eventKey) {
360+
return delegate.transportEventStream(eventKey);
361+
}
362+
363+
@Override
364+
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
365+
if (tracker == null) {
366+
return delegate.request(request).shareContextOnSubscribe();
367+
}
368+
369+
return Single.defer(() -> {
370+
final RequestTracker theTracker = new AtMostOnceDeliveryRequestTracker(tracker);
371+
final long startTime = theTracker.beforeStart();
372+
373+
return delegate.request(request)
374+
.liftSync(new BeforeFinallyHttpOperator(new TerminalSignalConsumer() {
375+
@Override
376+
public void onComplete() {
377+
theTracker.onSuccess(startTime);
378+
}
379+
380+
@Override
381+
public void onError(final Throwable throwable) {
382+
theTracker.onError(startTime, errorClassFunction.apply(throwable));
383+
}
384+
385+
@Override
386+
public void cancel() {
387+
theTracker.onError(startTime, ErrorClass.CANCELLED);
388+
}
389+
}, /*discardEventsAfterCancel*/ true))
390+
391+
// BeforeFinallyHttpOperator conditionally outputs a Single<Meta> with a failed
392+
// Publisher<Data> instead of the real Publisher<Data> in case a cancel signal is observed
393+
// before completion of Meta. It also transforms the original Publisher<Data> to discard
394+
// signals after cancel. So in order for downstream operators to get a consistent view of the
395+
// data path map() needs to be applied last.
396+
.map(response -> {
397+
final ErrorClass eClass = peerResponseErrorClassifier.apply(response);
398+
if (eClass != null) {
399+
// The onError is triggered before the body is actually consumed.
400+
theTracker.onError(startTime, eClass);
401+
}
402+
return response;
403+
})
404+
.shareContextOnSubscribe();
405+
});
406+
}
407+
408+
@Override
409+
public HttpExecutionContext executionContext() {
410+
return delegate.executionContext();
411+
}
412+
413+
@Override
414+
public StreamingHttpResponseFactory httpResponseFactory() {
415+
return delegate.httpResponseFactory();
416+
}
417+
418+
@Override
419+
public Completable onClose() {
420+
return delegate.onClose();
421+
}
422+
423+
@Override
424+
public Completable onClosing() {
425+
return delegate.onClosing();
426+
}
427+
428+
@Override
429+
public Completable closeAsync() {
430+
return delegate.closeAsync();
431+
}
432+
433+
@Override
434+
public StreamingHttpRequest newRequest(final HttpRequestMethod method, final String requestTarget) {
435+
return delegate.newRequest(method, requestTarget);
436+
}
437+
438+
@Override
439+
public String toString() {
440+
return delegate.toString();
441+
}
442+
443+
private static final class AtMostOnceDeliveryRequestTracker implements RequestTracker {
444+
445+
private static final AtomicIntegerFieldUpdater<AtMostOnceDeliveryRequestTracker> doneUpdater =
446+
newUpdater(AtMostOnceDeliveryRequestTracker.class, "done");
447+
448+
private final RequestTracker original;
449+
private volatile int done;
450+
451+
private AtMostOnceDeliveryRequestTracker(final RequestTracker original) {
452+
this.original = original;
453+
}
454+
455+
@Override
456+
public long beforeStart() {
457+
return original.beforeStart();
458+
}
459+
460+
@Override
461+
public void onSuccess(final long beforeStartTimeNs) {
462+
if (doneUpdater.compareAndSet(this, 0, 1)) {
463+
original.onSuccess(beforeStartTimeNs);
464+
}
465+
}
466+
467+
@Override
468+
public void onError(final long beforeStartTimeNs, final ErrorClass errorClass) {
469+
if (doneUpdater.compareAndSet(this, 0, 1)) {
470+
original.onError(beforeStartTimeNs, errorClass);
471+
}
472+
}
473+
}
474+
}
217475
}

0 commit comments

Comments
 (0)