@@ -83,6 +83,30 @@ const RETRY_CODES: {[key: string]: status} = {
83
83
export const XDS_CONFIG_KEY = `${ experimental . SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX } .xds_config` ;
84
84
export const XDS_CLIENT_KEY = 'grpc.internal.xds_client' ;
85
85
86
+ /**
87
+ * Tracks a dynamic subscription to a cluster that is currently or previously
88
+ * referenced in a RouteConfiguration.
89
+ */
90
+ class ClusterRef {
91
+ private refCount = 0 ;
92
+ constructor ( private unsubscribe : ( ) => void ) { }
93
+
94
+ ref ( ) {
95
+ this . refCount += 1 ;
96
+ }
97
+
98
+ unref ( ) {
99
+ this . refCount -= 1 ;
100
+ if ( this . refCount <= 0 ) {
101
+ this . unsubscribe ( ) ;
102
+ }
103
+ }
104
+
105
+ hasRef ( ) {
106
+ return this . refCount > 0 ;
107
+ }
108
+ }
109
+
86
110
class XdsResolver implements Resolver {
87
111
88
112
private listenerResourceName : string | null = null ;
@@ -93,6 +117,7 @@ class XdsResolver implements Resolver {
93
117
94
118
private xdsConfigWatcher : XdsConfigWatcher ;
95
119
private xdsDependencyManager : XdsDependencyManager | null = null ;
120
+ private clusterRefs : Map < string , ClusterRef > = new Map ( ) ;
96
121
97
122
constructor (
98
123
private target : GrpcUri ,
@@ -123,11 +148,20 @@ class XdsResolver implements Resolver {
123
148
}
124
149
}
125
150
151
+ private pruneUnusedClusters ( ) {
152
+ for ( const [ cluster , clusterRef ] of this . clusterRefs ) {
153
+ if ( ! clusterRef . hasRef ( ) ) {
154
+ this . clusterRefs . delete ( cluster ) ;
155
+ }
156
+ }
157
+ }
158
+
126
159
private async handleXdsConfig ( xdsConfig : XdsConfig ) {
127
160
/* We need to load the xxhash API before this function finishes, because
128
161
* it is invoked in the config selector, which can be called immediately
129
162
* after this function returns. */
130
163
await loadXxhashApi ( ) ;
164
+ this . pruneUnusedClusters ( ) ;
131
165
const httpConnectionManager = decodeSingleResource ( HTTP_CONNECTION_MANGER_TYPE_URL , xdsConfig . listener . api_listener ! . api_listener ! . value ) ;
132
166
const configDefaultTimeout = httpConnectionManager . common_http_protocol_options ?. idle_timeout ;
133
167
let defaultTimeout : Duration | undefined = undefined ;
@@ -312,44 +346,60 @@ class XdsResolver implements Resolver {
312
346
const routeMatcher = getPredicateForMatcher ( route . match ! ) ;
313
347
matchList . push ( { matcher : routeMatcher , action : routeAction } ) ;
314
348
}
315
- const configSelector : ConfigSelector = ( methodName , metadata , channelId ) => {
316
- for ( const { matcher, action} of matchList ) {
317
- if ( matcher . apply ( methodName , metadata ) ) {
318
- const clusterResult = action . getCluster ( ) ;
319
- const unrefCluster = this . xdsDependencyManager ! . addClusterSubscription ( clusterResult . name ) ;
320
- const onCommitted = ( ) => {
321
- unrefCluster ( ) ;
322
- }
323
- let hash : string ;
324
- if ( EXPERIMENTAL_RING_HASH ) {
325
- hash = `${ action . getHash ( metadata , channelId ) } ` ;
326
- } else {
327
- hash = '' ;
349
+ for ( const cluster of allConfigClusters ) {
350
+ let clusterRef = this . clusterRefs . get ( cluster ) ;
351
+ if ( ! clusterRef ) {
352
+ clusterRef = new ClusterRef ( this . xdsDependencyManager ! . addClusterSubscription ( cluster ) ) ;
353
+ this . clusterRefs . set ( cluster , clusterRef ) ;
354
+ }
355
+ clusterRef . ref ( ) ;
356
+ }
357
+ const configSelector : ConfigSelector = {
358
+ invoke : ( methodName , metadata , channelId ) => {
359
+ for ( const { matcher, action} of matchList ) {
360
+ if ( matcher . apply ( methodName , metadata ) ) {
361
+ const clusterResult = action . getCluster ( ) ;
362
+ const clusterRef = this . clusterRefs . get ( clusterResult . name ) ! ;
363
+ clusterRef . ref ( ) ;
364
+ const onCommitted = ( ) => {
365
+ clusterRef . unref ( ) ;
366
+ }
367
+ let hash : string ;
368
+ if ( EXPERIMENTAL_RING_HASH ) {
369
+ hash = `${ action . getHash ( metadata , channelId ) } ` ;
370
+ } else {
371
+ hash = '' ;
372
+ }
373
+ return {
374
+ methodConfig : clusterResult . methodConfig ,
375
+ onCommitted : onCommitted ,
376
+ pickInformation : { cluster : clusterResult . name , hash : hash } ,
377
+ status : status . OK ,
378
+ dynamicFilterFactories : clusterResult . dynamicFilterFactories
379
+ } ;
328
380
}
329
- return {
330
- methodConfig : clusterResult . methodConfig ,
331
- onCommitted : onCommitted ,
332
- pickInformation : { cluster : clusterResult . name , hash : hash } ,
333
- status : status . OK ,
334
- dynamicFilterFactories : clusterResult . dynamicFilterFactories
335
- } ;
381
+ }
382
+ return {
383
+ methodConfig : { name : [ ] } ,
384
+ // These fields won't be used here, but they're set because of some TypeScript weirdness
385
+ pickInformation : { cluster : '' , hash : '' } ,
386
+ status : status . UNAVAILABLE ,
387
+ dynamicFilterFactories : [ ]
388
+ } ;
389
+ } ,
390
+ unref : ( ) => {
391
+ for ( const cluster of allConfigClusters ) {
392
+ this . clusterRefs . get ( cluster ) ?. unref ( ) ;
336
393
}
337
394
}
338
- return {
339
- methodConfig : { name : [ ] } ,
340
- // These fields won't be used here, but they're set because of some TypeScript weirdness
341
- pickInformation : { cluster : '' , hash : '' } ,
342
- status : status . UNAVAILABLE ,
343
- dynamicFilterFactories : [ ]
344
- } ;
345
- } ;
395
+ }
346
396
trace ( 'Created ConfigSelector with configuration:' ) ;
347
397
for ( const { matcher, action} of matchList ) {
348
398
trace ( matcher . toString ( ) ) ;
349
399
trace ( '=> ' + action . toString ( ) ) ;
350
400
}
351
401
const clusterConfigMap : { [ key : string ] : { child_policy : LoadBalancingConfig [ ] } } = { } ;
352
- for ( const clusterName of allConfigClusters ) {
402
+ for ( const clusterName of this . clusterRefs . keys ( ) ) {
353
403
clusterConfigMap [ clusterName ] = { child_policy : [ { cds : { cluster : clusterName } } ] } ;
354
404
}
355
405
const lbPolicyConfig = { xds_cluster_manager : { children : clusterConfigMap } } ;
0 commit comments