@@ -36,7 +36,10 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
36
36
import selectLbConfigFromList = experimental . selectLbConfigFromList ;
37
37
import SubchannelInterface = experimental . SubchannelInterface ;
38
38
import BaseSubchannelWrapper = experimental . BaseSubchannelWrapper ;
39
+ import UnavailablePicker = experimental . UnavailablePicker ;
39
40
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality" ;
41
+ import { ClusterConfig , XdsConfig } from "./xds-dependency-manager" ;
42
+ import { CdsUpdate } from "./xds-resource-type/cluster-resource-type" ;
40
43
41
44
const TRACER_NAME = 'xds_cluster_impl' ;
42
45
@@ -53,59 +56,26 @@ export interface DropCategory {
53
56
requests_per_million : number ;
54
57
}
55
58
56
- function validateDropCategory ( obj : any ) : DropCategory {
57
- if ( ! ( 'category' in obj && typeof obj . category === 'string' ) ) {
58
- throw new Error ( 'xds_cluster_impl config drop_categories entry must have a string field category' ) ;
59
- }
60
- if ( ! ( 'requests_per_million' in obj && typeof obj . requests_per_million === 'number' ) ) {
61
- throw new Error ( 'xds_cluster_impl config drop_categories entry must have a number field requests_per_million' ) ;
62
- }
63
- return obj ;
64
- }
65
-
66
59
class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
67
- private maxConcurrentRequests : number ;
68
60
getLoadBalancerName ( ) : string {
69
61
return TYPE_NAME ;
70
62
}
71
63
toJsonObject ( ) : object {
72
64
const jsonObj : { [ key : string ] : any } = {
73
65
cluster : this . cluster ,
74
- drop_categories : this . dropCategories ,
75
66
child_policy : [ this . childPolicy . toJsonObject ( ) ] ,
76
- max_concurrent_requests : this . maxConcurrentRequests ,
77
- eds_service_name : this . edsServiceName ,
78
- lrs_load_reporting_server : this . lrsLoadReportingServer ,
79
67
} ;
80
68
return {
81
69
[ TYPE_NAME ] : jsonObj
82
70
} ;
83
71
}
84
72
85
- constructor ( private cluster : string , private dropCategories : DropCategory [ ] , private childPolicy : TypedLoadBalancingConfig , private edsServiceName : string , private lrsLoadReportingServer ?: XdsServerConfig , maxConcurrentRequests ?: number ) {
86
- this . maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS ;
87
- }
73
+ constructor ( private cluster : string , private childPolicy : TypedLoadBalancingConfig ) { }
88
74
89
75
getCluster ( ) {
90
76
return this . cluster ;
91
77
}
92
78
93
- getEdsServiceName ( ) {
94
- return this . edsServiceName ;
95
- }
96
-
97
- getLrsLoadReportingServer ( ) {
98
- return this . lrsLoadReportingServer ;
99
- }
100
-
101
- getMaxConcurrentRequests ( ) {
102
- return this . maxConcurrentRequests ;
103
- }
104
-
105
- getDropCategories ( ) {
106
- return this . dropCategories ;
107
- }
108
-
109
79
getChildPolicy ( ) {
110
80
return this . childPolicy ;
111
81
}
@@ -114,27 +84,14 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
114
84
if ( ! ( 'cluster' in obj && typeof obj . cluster === 'string' ) ) {
115
85
throw new Error ( 'xds_cluster_impl config must have a string field cluster' ) ;
116
86
}
117
- if ( ! ( 'eds_service_name' in obj && typeof obj . eds_service_name === 'string' ) ) {
118
- throw new Error ( 'xds_cluster_impl config must have a string field eds_service_name' ) ;
119
- }
120
- if ( 'max_concurrent_requests' in obj && ! ( obj . max_concurrent_requests === undefined || typeof obj . max_concurrent_requests === 'number' ) ) {
121
- throw new Error ( 'xds_cluster_impl config max_concurrent_requests must be a number if provided' ) ;
122
- }
123
- if ( ! ( 'drop_categories' in obj && Array . isArray ( obj . drop_categories ) ) ) {
124
- throw new Error ( 'xds_cluster_impl config must have an array field drop_categories' ) ;
125
- }
126
87
if ( ! ( 'child_policy' in obj && Array . isArray ( obj . child_policy ) ) ) {
127
88
throw new Error ( 'xds_cluster_impl config must have an array field child_policy' ) ;
128
89
}
129
90
const childConfig = selectLbConfigFromList ( obj . child_policy ) ;
130
91
if ( ! childConfig ) {
131
92
throw new Error ( 'xds_cluster_impl config child_policy parsing failed' ) ;
132
93
}
133
- let lrsServer : XdsServerConfig | undefined = undefined ;
134
- if ( obj . lrs_load_reporting_server ) {
135
- lrsServer = validateXdsServerConfig ( obj . lrs_load_reporting_server )
136
- }
137
- return new XdsClusterImplLoadBalancingConfig ( obj . cluster , obj . drop_categories . map ( validateDropCategory ) , childConfig , obj . eds_service_name , lrsServer , obj . max_concurrent_requests ) ;
94
+ return new XdsClusterImplLoadBalancingConfig ( obj . cluster , childConfig ) ;
138
95
}
139
96
}
140
97
@@ -252,11 +209,12 @@ class XdsClusterImplBalancer implements LoadBalancer {
252
209
private latestConfig : XdsClusterImplLoadBalancingConfig | null = null ;
253
210
private clusterDropStats : XdsClusterDropStats | null = null ;
254
211
private xdsClient : XdsClient | null = null ;
212
+ private latestClusterConfig : ClusterConfig | null = null ;
255
213
256
214
constructor ( private readonly channelControlHelper : ChannelControlHelper , credentials : ChannelCredentials , options : ChannelOptions ) {
257
215
this . childBalancer = new ChildLoadBalancerHandler ( createChildChannelControlHelper ( channelControlHelper , {
258
216
createSubchannel : ( subchannelAddress , subchannelArgs , credentialsOverride ) => {
259
- if ( ! this . xdsClient || ! this . latestConfig || ! this . lastestEndpointList ) {
217
+ if ( ! this . xdsClient || ! this . latestConfig || ! this . lastestEndpointList || ! this . latestClusterConfig ) {
260
218
throw new Error ( 'xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated' ) ;
261
219
}
262
220
const wrapperChild = channelControlHelper . createSubchannel ( subchannelAddress , subchannelArgs , credentialsOverride ) ;
@@ -270,23 +228,23 @@ class XdsClusterImplBalancer implements LoadBalancer {
270
228
trace ( 'Not reporting load for address ' + subchannelAddressToString ( subchannelAddress ) + ' because it has unknown locality.' ) ;
271
229
return wrapperChild ;
272
230
}
273
- const lrsServer = this . latestConfig . getLrsLoadReportingServer ( ) ;
231
+ const lrsServer = this . latestClusterConfig . cluster . lrsLoadReportingServer ;
274
232
let statsObj : XdsClusterLocalityStats | null = null ;
275
233
if ( lrsServer ) {
276
234
statsObj = this . xdsClient . addClusterLocalityStats (
277
235
lrsServer ,
278
236
this . latestConfig . getCluster ( ) ,
279
- this . latestConfig . getEdsServiceName ( ) ,
237
+ this . latestClusterConfig . cluster . edsServiceName ?? '' ,
280
238
locality
281
239
) ;
282
240
}
283
241
return new LocalitySubchannelWrapper ( wrapperChild , statsObj ) ;
284
242
} ,
285
243
updateState : ( connectivityState , originalPicker ) => {
286
- if ( this . latestConfig === null ) {
244
+ if ( this . latestConfig === null || this . latestClusterConfig === null || this . latestClusterConfig . children . type === 'aggregate' || ! this . latestClusterConfig . children . endpoints ) {
287
245
channelControlHelper . updateState ( connectivityState , originalPicker ) ;
288
246
} else {
289
- const picker = new XdsClusterImplPicker ( originalPicker , getCallCounterMapKey ( this . latestConfig . getCluster ( ) , this . latestConfig . getEdsServiceName ( ) ) , this . latestConfig . getMaxConcurrentRequests ( ) , this . latestConfig . getDropCategories ( ) , this . clusterDropStats ) ;
247
+ const picker = new XdsClusterImplPicker ( originalPicker , getCallCounterMapKey ( this . latestConfig . getCluster ( ) , this . latestClusterConfig . cluster . edsServiceName ) , this . latestClusterConfig . cluster . maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS , this . latestClusterConfig . children . endpoints . dropCategories , this . clusterDropStats ) ;
290
248
channelControlHelper . updateState ( connectivityState , picker ) ;
291
249
}
292
250
}
@@ -297,15 +255,38 @@ class XdsClusterImplBalancer implements LoadBalancer {
297
255
trace ( 'Discarding address list update with unrecognized config ' + JSON . stringify ( lbConfig . toJsonObject ( ) , undefined , 2 ) ) ;
298
256
return ;
299
257
}
300
- trace ( 'Received update with config: ' + JSON . stringify ( lbConfig , undefined , 2 ) ) ;
258
+ trace ( 'Received update with config: ' + JSON . stringify ( lbConfig . toJsonObject ( ) , undefined , 2 ) ) ;
259
+ const xdsConfig = attributes . xdsConfig as XdsConfig ;
260
+ const maybeClusterConfig = xdsConfig . clusters . get ( lbConfig . getCluster ( ) ) ;
261
+ if ( ! maybeClusterConfig ) {
262
+ trace ( 'Received update with no config for cluster ' + lbConfig . getCluster ( ) ) ;
263
+ return ;
264
+ }
265
+ if ( ! maybeClusterConfig . success ) {
266
+ this . latestClusterConfig = null ;
267
+ this . childBalancer . destroy ( ) ;
268
+ this . channelControlHelper . updateState ( connectivityState . TRANSIENT_FAILURE , new UnavailablePicker ( maybeClusterConfig . error ) ) ;
269
+ return ;
270
+ }
271
+ const clusterConfig = maybeClusterConfig . value ;
272
+ if ( clusterConfig . children . type === 'aggregate' ) {
273
+ trace ( 'Received update for aggregate cluster ' + lbConfig . getCluster ( ) ) ;
274
+ return ;
275
+ }
276
+ if ( ! clusterConfig . children . endpoints ) {
277
+ this . childBalancer . destroy ( ) ;
278
+ this . channelControlHelper . updateState ( connectivityState . TRANSIENT_FAILURE , new UnavailablePicker ( { details : clusterConfig . children . resolutionNote } ) ) ;
279
+
280
+ }
301
281
this . lastestEndpointList = endpointList ;
302
282
this . latestConfig = lbConfig ;
283
+ this . latestClusterConfig = clusterConfig ;
303
284
this . xdsClient = attributes . xdsClient as XdsClient ;
304
- if ( lbConfig . getLrsLoadReportingServer ( ) ) {
285
+ if ( clusterConfig . cluster . lrsLoadReportingServer ) {
305
286
this . clusterDropStats = this . xdsClient . addClusterDropStats (
306
- lbConfig . getLrsLoadReportingServer ( ) ! ,
287
+ clusterConfig . cluster . lrsLoadReportingServer ,
307
288
lbConfig . getCluster ( ) ,
308
- lbConfig . getEdsServiceName ( ) ?? ''
289
+ clusterConfig . cluster . edsServiceName ?? ''
309
290
) ;
310
291
}
311
292
0 commit comments