18
18
import io .servicetalk .client .api .ConnectionFactory ;
19
19
import io .servicetalk .client .api .LoadBalancer ;
20
20
import io .servicetalk .client .api .LoadBalancerFactory ;
21
+ import io .servicetalk .client .api .ReservableRequestConcurrencyController ;
21
22
import io .servicetalk .client .api .ScoreSupplier ;
22
23
import io .servicetalk .client .api .ServiceDiscovererEvent ;
23
24
import io .servicetalk .concurrent .api .Completable ;
24
25
import io .servicetalk .concurrent .api .Publisher ;
25
26
import io .servicetalk .concurrent .api .Single ;
27
+ import io .servicetalk .concurrent .api .TerminalSignalConsumer ;
28
+ import io .servicetalk .context .api .ContextMap ;
26
29
import io .servicetalk .http .api .FilterableStreamingHttpConnection ;
27
30
import io .servicetalk .http .api .FilterableStreamingHttpLoadBalancedConnection ;
28
31
import io .servicetalk .http .api .HttpConnectionContext ;
31
34
import io .servicetalk .http .api .HttpExecutionStrategy ;
32
35
import io .servicetalk .http .api .HttpLoadBalancerFactory ;
33
36
import io .servicetalk .http .api .HttpRequestMethod ;
37
+ import io .servicetalk .http .api .HttpResponseMetaData ;
38
+ import io .servicetalk .http .api .ReservedBlockingHttpConnection ;
39
+ import io .servicetalk .http .api .ReservedBlockingStreamingHttpConnection ;
40
+ import io .servicetalk .http .api .ReservedHttpConnection ;
34
41
import io .servicetalk .http .api .StreamingHttpRequest ;
35
42
import io .servicetalk .http .api .StreamingHttpResponse ;
36
43
import io .servicetalk .http .api .StreamingHttpResponseFactory ;
44
+ import io .servicetalk .http .utils .BeforeFinallyHttpOperator ;
45
+ import io .servicetalk .loadbalancer .ErrorClass ;
46
+ import io .servicetalk .loadbalancer .RequestTracker ;
37
47
import io .servicetalk .loadbalancer .RoundRobinLoadBalancers ;
38
48
49
+ import org .slf4j .Logger ;
50
+ import org .slf4j .LoggerFactory ;
51
+
52
+ import java .net .ConnectException ;
39
53
import java .util .Collection ;
54
+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
55
+ import java .util .function .Function ;
56
+ import javax .annotation .Nullable ;
40
57
58
+ import static io .servicetalk .http .api .HttpApiConversions .toReservedBlockingConnection ;
59
+ import static io .servicetalk .http .api .HttpApiConversions .toReservedBlockingStreamingConnection ;
60
+ import static io .servicetalk .http .api .HttpApiConversions .toReservedConnection ;
61
+ import static io .servicetalk .http .api .HttpResponseStatus .StatusClass .SERVER_ERROR_5XX ;
62
+ import static io .servicetalk .http .api .HttpResponseStatus .TOO_MANY_REQUESTS ;
63
+ import static io .servicetalk .loadbalancer .ErrorClass .LOCAL_ORIGIN_CONNECT_FAILED ;
64
+ import static io .servicetalk .loadbalancer .ErrorClass .LOCAL_ORIGIN_REQUEST_FAILED ;
65
+ import static io .servicetalk .loadbalancer .RequestTracker .REQUEST_TRACKER_KEY ;
41
66
import static java .util .Objects .requireNonNull ;
67
+ import static java .util .concurrent .atomic .AtomicIntegerFieldUpdater .newUpdater ;
42
68
43
69
/**
44
70
* Default implementation of {@link HttpLoadBalancerFactory}.
47
73
*/
48
74
public final class DefaultHttpLoadBalancerFactory <ResolvedAddress >
49
75
implements HttpLoadBalancerFactory <ResolvedAddress > {
76
+ private static final Logger LOGGER = LoggerFactory .getLogger (DefaultHttpLoadBalancerFactory .class );
50
77
private final LoadBalancerFactory <ResolvedAddress , FilterableStreamingHttpLoadBalancedConnection > rawFactory ;
78
+ private final Function <Throwable , ErrorClass > errorClassFunction ;
79
+ private final Function <HttpResponseMetaData , ErrorClass > peerResponseErrorClassifier ;
51
80
private final HttpExecutionStrategy strategy ;
52
81
53
82
DefaultHttpLoadBalancerFactory (
54
83
final LoadBalancerFactory <ResolvedAddress , FilterableStreamingHttpLoadBalancedConnection > rawFactory ,
84
+ final Function <Throwable , ErrorClass > errorClassFunction ,
85
+ final Function <HttpResponseMetaData , ErrorClass > peerResponseErrorClassifier ,
55
86
final HttpExecutionStrategy strategy ) {
56
87
this .rawFactory = rawFactory ;
88
+ this .errorClassFunction = errorClassFunction ;
89
+ this .peerResponseErrorClassifier = peerResponseErrorClassifier ;
57
90
this .strategy = strategy ;
58
91
}
59
92
@@ -80,6 +113,36 @@ public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection(
80
113
return new DefaultFilterableStreamingHttpLoadBalancedConnection (connection );
81
114
}
82
115
116
+ @ Override
117
+ public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection (
118
+ final FilterableStreamingHttpConnection connection ,
119
+ final ReservableRequestConcurrencyController concurrencyController ,
120
+ @ Nullable final ContextMap context ) {
121
+
122
+ RequestTracker hostHealthIndicator = null ;
123
+ if (context == null ) {
124
+ LOGGER .debug ("Context is null. In order for " + DefaultHttpLoadBalancerFactory .class .getSimpleName () +
125
+ ":toLoadBalancedConnection to get access to the " + RequestTracker .class .getSimpleName () +
126
+ ", health-monitor of this connection, the context must not be null." );
127
+ } else {
128
+ hostHealthIndicator = context .get (REQUEST_TRACKER_KEY );
129
+ if (hostHealthIndicator == null ) {
130
+ LOGGER .debug (REQUEST_TRACKER_KEY .name () + " is not set in context. " +
131
+ "In order for " + DefaultHttpLoadBalancerFactory .class .getSimpleName () +
132
+ ":toLoadBalancedConnection to get access to the " + RequestTracker .class .getSimpleName () +
133
+ ", health-monitor of this connection, the context must be properly wired." );
134
+ }
135
+ }
136
+
137
+ if (hostHealthIndicator == null ) {
138
+ return new HttpLoadBalancerFactory .DefaultFilterableStreamingHttpLoadBalancedConnection (connection ,
139
+ concurrencyController );
140
+ }
141
+
142
+ return new DefaultHttpLoadBalancedConnection (connection , concurrencyController ,
143
+ errorClassFunction , peerResponseErrorClassifier , hostHealthIndicator );
144
+ }
145
+
83
146
@ Override
84
147
public HttpExecutionStrategy requiredOffloads () {
85
148
return strategy ;
@@ -94,6 +157,11 @@ public HttpExecutionStrategy requiredOffloads() {
94
157
public static final class Builder <ResolvedAddress > {
95
158
private final LoadBalancerFactory <ResolvedAddress , FilterableStreamingHttpLoadBalancedConnection > rawFactory ;
96
159
private final HttpExecutionStrategy strategy ;
160
+ private final Function <Throwable , ErrorClass > errorClassifier = t -> t instanceof ConnectException ?
161
+ LOCAL_ORIGIN_CONNECT_FAILED : LOCAL_ORIGIN_REQUEST_FAILED ;
162
+ private final Function <HttpResponseMetaData , ErrorClass > peerResponseErrorClassifier = resp ->
163
+ (resp .status ().statusClass () == SERVER_ERROR_5XX || TOO_MANY_REQUESTS .equals (resp .status ())) ?
164
+ ErrorClass .EXT_ORIGIN_REQUEST_FAILED : null ;
97
165
98
166
private Builder (
99
167
final LoadBalancerFactory <ResolvedAddress , FilterableStreamingHttpLoadBalancedConnection > rawFactory ,
@@ -108,7 +176,8 @@ private Builder(
108
176
* @return A {@link DefaultHttpLoadBalancerFactory}.
109
177
*/
110
178
public DefaultHttpLoadBalancerFactory <ResolvedAddress > build () {
111
- return new DefaultHttpLoadBalancerFactory <>(rawFactory , strategy );
179
+ return new DefaultHttpLoadBalancerFactory <>(rawFactory , errorClassifier , peerResponseErrorClassifier ,
180
+ strategy );
112
181
}
113
182
114
183
/**
@@ -153,10 +222,10 @@ private static final class DefaultFilterableStreamingHttpLoadBalancedConnection
153
222
@ Override
154
223
public int score () {
155
224
throw new UnsupportedOperationException (
156
- DefaultFilterableStreamingHttpLoadBalancedConnection .class .getName () +
157
- " doesn't support scoring. " + ScoreSupplier .class .getName () +
158
- " is only available through " + HttpLoadBalancerFactory .class .getSimpleName () +
159
- " implementations that support scoring." );
225
+ DefaultFilterableStreamingHttpLoadBalancedConnection .class .getName () +
226
+ " doesn't support scoring. " + ScoreSupplier .class .getName () +
227
+ " is only available through " + HttpLoadBalancerFactory .class .getSimpleName () +
228
+ " implementations that support scoring." );
160
229
}
161
230
162
231
@ Override
@@ -214,4 +283,193 @@ public String toString() {
214
283
return delegate .toString ();
215
284
}
216
285
}
286
+
287
+ private static final class DefaultHttpLoadBalancedConnection
288
+ implements FilterableStreamingHttpLoadBalancedConnection {
289
+ private final FilterableStreamingHttpConnection delegate ;
290
+ private final ReservableRequestConcurrencyController concurrencyController ;
291
+ private final Function <Throwable , ErrorClass > errorClassFunction ;
292
+ private final Function <HttpResponseMetaData , ErrorClass > peerResponseErrorClassifier ;
293
+ @ Nullable
294
+ private final RequestTracker tracker ;
295
+
296
+ DefaultHttpLoadBalancedConnection (final FilterableStreamingHttpConnection delegate ,
297
+ final ReservableRequestConcurrencyController concurrencyController ,
298
+ final Function <Throwable , ErrorClass > errorClassFunction ,
299
+ final Function <HttpResponseMetaData , ErrorClass > peerResponseErrorClassifier ,
300
+ @ Nullable final RequestTracker tracker ) {
301
+ this .delegate = delegate ;
302
+ this .concurrencyController = concurrencyController ;
303
+ this .errorClassFunction = errorClassFunction ;
304
+ this .peerResponseErrorClassifier = peerResponseErrorClassifier ;
305
+ this .tracker = tracker ;
306
+ }
307
+
308
+ @ Override
309
+ public int score () {
310
+ return 1 ;
311
+ }
312
+
313
+ @ Override
314
+ public ReservedHttpConnection asConnection () {
315
+ return toReservedConnection (this , executionContext ().executionStrategy ());
316
+ }
317
+
318
+ @ Override
319
+ public ReservedBlockingStreamingHttpConnection asBlockingStreamingConnection () {
320
+ return toReservedBlockingStreamingConnection (this , executionContext ().executionStrategy ());
321
+ }
322
+
323
+ @ Override
324
+ public ReservedBlockingHttpConnection asBlockingConnection () {
325
+ return toReservedBlockingConnection (this , executionContext ().executionStrategy ());
326
+ }
327
+
328
+ @ Override
329
+ public Completable releaseAsync () {
330
+ return concurrencyController .releaseAsync ();
331
+ }
332
+
333
+ @ Override
334
+ public Completable closeAsyncGracefully () {
335
+ return delegate .closeAsyncGracefully ();
336
+ }
337
+
338
+ @ Override
339
+ public Result tryRequest () {
340
+ return concurrencyController .tryRequest ();
341
+ }
342
+
343
+ @ Override
344
+ public boolean tryReserve () {
345
+ return concurrencyController .tryReserve ();
346
+ }
347
+
348
+ @ Override
349
+ public void requestFinished () {
350
+ concurrencyController .requestFinished ();
351
+ }
352
+
353
+ @ Override
354
+ public HttpConnectionContext connectionContext () {
355
+ return delegate .connectionContext ();
356
+ }
357
+
358
+ @ Override
359
+ public <T > Publisher <? extends T > transportEventStream (final HttpEventKey <T > eventKey ) {
360
+ return delegate .transportEventStream (eventKey );
361
+ }
362
+
363
+ @ Override
364
+ public Single <StreamingHttpResponse > request (final StreamingHttpRequest request ) {
365
+ if (tracker == null ) {
366
+ return delegate .request (request ).shareContextOnSubscribe ();
367
+ }
368
+
369
+ return Single .defer (() -> {
370
+ final RequestTracker theTracker = new AtMostOnceDeliveryRequestTracker (tracker );
371
+ final long startTime = theTracker .beforeStart ();
372
+
373
+ return delegate .request (request )
374
+ .liftSync (new BeforeFinallyHttpOperator (new TerminalSignalConsumer () {
375
+ @ Override
376
+ public void onComplete () {
377
+ theTracker .onSuccess (startTime );
378
+ }
379
+
380
+ @ Override
381
+ public void onError (final Throwable throwable ) {
382
+ theTracker .onError (startTime , errorClassFunction .apply (throwable ));
383
+ }
384
+
385
+ @ Override
386
+ public void cancel () {
387
+ theTracker .onError (startTime , ErrorClass .CANCELLED );
388
+ }
389
+ }, /*discardEventsAfterCancel*/ true ))
390
+
391
+ // BeforeFinallyHttpOperator conditionally outputs a Single<Meta> with a failed
392
+ // Publisher<Data> instead of the real Publisher<Data> in case a cancel signal is observed
393
+ // before completion of Meta. It also transforms the original Publisher<Data> to discard
394
+ // signals after cancel. So in order for downstream operators to get a consistent view of the
395
+ // data path map() needs to be applied last.
396
+ .map (response -> {
397
+ final ErrorClass eClass = peerResponseErrorClassifier .apply (response );
398
+ if (eClass != null ) {
399
+ // The onError is triggered before the body is actually consumed.
400
+ theTracker .onError (startTime , eClass );
401
+ }
402
+ return response ;
403
+ })
404
+ .shareContextOnSubscribe ();
405
+ });
406
+ }
407
+
408
+ @ Override
409
+ public HttpExecutionContext executionContext () {
410
+ return delegate .executionContext ();
411
+ }
412
+
413
+ @ Override
414
+ public StreamingHttpResponseFactory httpResponseFactory () {
415
+ return delegate .httpResponseFactory ();
416
+ }
417
+
418
+ @ Override
419
+ public Completable onClose () {
420
+ return delegate .onClose ();
421
+ }
422
+
423
+ @ Override
424
+ public Completable onClosing () {
425
+ return delegate .onClosing ();
426
+ }
427
+
428
+ @ Override
429
+ public Completable closeAsync () {
430
+ return delegate .closeAsync ();
431
+ }
432
+
433
+ @ Override
434
+ public StreamingHttpRequest newRequest (final HttpRequestMethod method , final String requestTarget ) {
435
+ return delegate .newRequest (method , requestTarget );
436
+ }
437
+
438
+ @ Override
439
+ public String toString () {
440
+ return delegate .toString ();
441
+ }
442
+
443
+ private static final class AtMostOnceDeliveryRequestTracker implements RequestTracker {
444
+
445
+ private static final AtomicIntegerFieldUpdater <AtMostOnceDeliveryRequestTracker > doneUpdater =
446
+ newUpdater (AtMostOnceDeliveryRequestTracker .class , "done" );
447
+
448
+ private final RequestTracker original ;
449
+ private volatile int done ;
450
+
451
+ private AtMostOnceDeliveryRequestTracker (final RequestTracker original ) {
452
+ this .original = original ;
453
+ }
454
+
455
+ @ Override
456
+ public long beforeStart () {
457
+ return original .beforeStart ();
458
+ }
459
+
460
+ @ Override
461
+ public void onSuccess (final long beforeStartTimeNs ) {
462
+ if (doneUpdater .compareAndSet (this , 0 , 1 )) {
463
+ original .onSuccess (beforeStartTimeNs );
464
+ }
465
+ }
466
+
467
+ @ Override
468
+ public void onError (final long beforeStartTimeNs , final ErrorClass errorClass ) {
469
+ if (doneUpdater .compareAndSet (this , 0 , 1 )) {
470
+ original .onError (beforeStartTimeNs , errorClass );
471
+ }
472
+ }
473
+ }
474
+ }
217
475
}
0 commit comments