Skip to content

Commit 969a0e0

Browse files
authored
Merge branch 'master' into patch-1
2 parents 67f47c9 + 83b6e60 commit 969a0e0

14 files changed

+1090
-1175
lines changed

packages/grpc-js-xds/src/environment.ts

+1
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_
2626
export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'true') === 'true';
2727
export const EXPERIMENTAL_PICK_FIRST = (process.env.GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG ?? 'false') === 'true';
2828
export const EXPERIMENTAL_DUALSTACK_ENDPOINTS = (process.env.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS ?? 'true') === 'true';
29+
export const AGGREGATE_CLUSTER_BACKWARDS_COMPAT = (process.env.GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT ?? 'false') === 'true';

packages/grpc-js-xds/src/index.ts

-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import * as resolver_xds from './resolver-xds';
1919
import * as load_balancer_cds from './load-balancer-cds';
20-
import * as xds_cluster_resolver from './load-balancer-xds-cluster-resolver';
2120
import * as xds_cluster_impl from './load-balancer-xds-cluster-impl';
2221
import * as load_balancer_priority from './load-balancer-priority';
2322
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
@@ -40,7 +39,6 @@ export { XdsServerCredentials } from './xds-credentials';
4039
export function register() {
4140
resolver_xds.setup();
4241
load_balancer_cds.setup();
43-
xds_cluster_resolver.setup();
4442
xds_cluster_impl.setup();
4543
load_balancer_priority.setup();
4644
load_balancer_weighted_target.setup();

packages/grpc-js-xds/src/load-balancer-cds.ts

+187-189
Large diffs are not rendered by default.

packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts

+38-57
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
3636
import selectLbConfigFromList = experimental.selectLbConfigFromList;
3737
import SubchannelInterface = experimental.SubchannelInterface;
3838
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
39+
import UnavailablePicker = experimental.UnavailablePicker;
3940
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
41+
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
42+
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";
4043

4144
const TRACER_NAME = 'xds_cluster_impl';
4245

@@ -53,59 +56,26 @@ export interface DropCategory {
5356
requests_per_million: number;
5457
}
5558

56-
function validateDropCategory(obj: any): DropCategory {
57-
if (!('category' in obj && typeof obj.category === 'string')) {
58-
throw new Error('xds_cluster_impl config drop_categories entry must have a string field category');
59-
}
60-
if (!('requests_per_million' in obj && typeof obj.requests_per_million === 'number')) {
61-
throw new Error('xds_cluster_impl config drop_categories entry must have a number field requests_per_million');
62-
}
63-
return obj;
64-
}
65-
6659
class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
67-
private maxConcurrentRequests: number;
6860
getLoadBalancerName(): string {
6961
return TYPE_NAME;
7062
}
7163
toJsonObject(): object {
7264
const jsonObj: {[key: string]: any} = {
7365
cluster: this.cluster,
74-
drop_categories: this.dropCategories,
7566
child_policy: [this.childPolicy.toJsonObject()],
76-
max_concurrent_requests: this.maxConcurrentRequests,
77-
eds_service_name: this.edsServiceName,
78-
lrs_load_reporting_server: this.lrsLoadReportingServer,
7967
};
8068
return {
8169
[TYPE_NAME]: jsonObj
8270
};
8371
}
8472

85-
constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: TypedLoadBalancingConfig, private edsServiceName: string, private lrsLoadReportingServer?: XdsServerConfig, maxConcurrentRequests?: number) {
86-
this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS;
87-
}
73+
constructor(private cluster: string, private childPolicy: TypedLoadBalancingConfig) {}
8874

8975
getCluster() {
9076
return this.cluster;
9177
}
9278

93-
getEdsServiceName() {
94-
return this.edsServiceName;
95-
}
96-
97-
getLrsLoadReportingServer() {
98-
return this.lrsLoadReportingServer;
99-
}
100-
101-
getMaxConcurrentRequests() {
102-
return this.maxConcurrentRequests;
103-
}
104-
105-
getDropCategories() {
106-
return this.dropCategories;
107-
}
108-
10979
getChildPolicy() {
11080
return this.childPolicy;
11181
}
@@ -114,27 +84,14 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
11484
if (!('cluster' in obj && typeof obj.cluster === 'string')) {
11585
throw new Error('xds_cluster_impl config must have a string field cluster');
11686
}
117-
if (!('eds_service_name' in obj && typeof obj.eds_service_name === 'string')) {
118-
throw new Error('xds_cluster_impl config must have a string field eds_service_name');
119-
}
120-
if ('max_concurrent_requests' in obj && !(obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) {
121-
throw new Error('xds_cluster_impl config max_concurrent_requests must be a number if provided');
122-
}
123-
if (!('drop_categories' in obj && Array.isArray(obj.drop_categories))) {
124-
throw new Error('xds_cluster_impl config must have an array field drop_categories');
125-
}
12687
if (!('child_policy' in obj && Array.isArray(obj.child_policy))) {
12788
throw new Error('xds_cluster_impl config must have an array field child_policy');
12889
}
12990
const childConfig = selectLbConfigFromList(obj.child_policy);
13091
if (!childConfig) {
13192
throw new Error('xds_cluster_impl config child_policy parsing failed');
13293
}
133-
let lrsServer: XdsServerConfig | undefined = undefined;
134-
if (obj.lrs_load_reporting_server) {
135-
lrsServer = validateXdsServerConfig(obj.lrs_load_reporting_server)
136-
}
137-
return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), childConfig, obj.eds_service_name, lrsServer, obj.max_concurrent_requests);
94+
return new XdsClusterImplLoadBalancingConfig(obj.cluster, childConfig);
13895
}
13996
}
14097

@@ -252,11 +209,12 @@ class XdsClusterImplBalancer implements LoadBalancer {
252209
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
253210
private clusterDropStats: XdsClusterDropStats | null = null;
254211
private xdsClient: XdsClient | null = null;
212+
private latestClusterConfig: ClusterConfig | null = null;
255213

256214
constructor(private readonly channelControlHelper: ChannelControlHelper, credentials: ChannelCredentials, options: ChannelOptions) {
257215
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
258216
createSubchannel: (subchannelAddress, subchannelArgs, credentialsOverride) => {
259-
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) {
217+
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
260218
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
261219
}
262220
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs, credentialsOverride);
@@ -270,23 +228,23 @@ class XdsClusterImplBalancer implements LoadBalancer {
270228
trace('Not reporting load for address ' + subchannelAddressToString(subchannelAddress) + ' because it has unknown locality.');
271229
return wrapperChild;
272230
}
273-
const lrsServer = this.latestConfig.getLrsLoadReportingServer();
231+
const lrsServer = this.latestClusterConfig.cluster.lrsLoadReportingServer;
274232
let statsObj: XdsClusterLocalityStats | null = null;
275233
if (lrsServer) {
276234
statsObj = this.xdsClient.addClusterLocalityStats(
277235
lrsServer,
278236
this.latestConfig.getCluster(),
279-
this.latestConfig.getEdsServiceName(),
237+
this.latestClusterConfig.cluster.edsServiceName ?? '',
280238
locality
281239
);
282240
}
283241
return new LocalitySubchannelWrapper(wrapperChild, statsObj);
284242
},
285243
updateState: (connectivityState, originalPicker) => {
286-
if (this.latestConfig === null) {
244+
if (this.latestConfig === null || this.latestClusterConfig === null || this.latestClusterConfig.children.type === 'aggregate' || !this.latestClusterConfig.children.endpoints) {
287245
channelControlHelper.updateState(connectivityState, originalPicker);
288246
} else {
289-
const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestConfig.getEdsServiceName()), this.latestConfig.getMaxConcurrentRequests(), this.latestConfig.getDropCategories(), this.clusterDropStats);
247+
const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestClusterConfig.cluster.edsServiceName), this.latestClusterConfig.cluster.maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS, this.latestClusterConfig.children.endpoints.dropCategories, this.clusterDropStats);
290248
channelControlHelper.updateState(connectivityState, picker);
291249
}
292250
}
@@ -297,15 +255,38 @@ class XdsClusterImplBalancer implements LoadBalancer {
297255
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
298256
return;
299257
}
300-
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
258+
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
259+
const xdsConfig = attributes.xdsConfig as XdsConfig;
260+
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
261+
if (!maybeClusterConfig) {
262+
trace('Received update with no config for cluster ' + lbConfig.getCluster());
263+
return;
264+
}
265+
if (!maybeClusterConfig.success) {
266+
this.latestClusterConfig = null;
267+
this.childBalancer.destroy();
268+
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error));
269+
return;
270+
}
271+
const clusterConfig = maybeClusterConfig.value;
272+
if (clusterConfig.children.type === 'aggregate') {
273+
trace('Received update for aggregate cluster ' + lbConfig.getCluster());
274+
return;
275+
}
276+
if (!clusterConfig.children.endpoints) {
277+
this.childBalancer.destroy();
278+
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({details: clusterConfig.children.resolutionNote}));
279+
280+
}
301281
this.lastestEndpointList = endpointList;
302282
this.latestConfig = lbConfig;
283+
this.latestClusterConfig = clusterConfig;
303284
this.xdsClient = attributes.xdsClient as XdsClient;
304-
if (lbConfig.getLrsLoadReportingServer()) {
285+
if (clusterConfig.cluster.lrsLoadReportingServer) {
305286
this.clusterDropStats = this.xdsClient.addClusterDropStats(
306-
lbConfig.getLrsLoadReportingServer()!,
287+
clusterConfig.cluster.lrsLoadReportingServer,
307288
lbConfig.getCluster(),
308-
lbConfig.getEdsServiceName() ?? ''
289+
clusterConfig.cluster.edsServiceName ?? ''
309290
);
310291
}
311292

0 commit comments

Comments
 (0)