Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Implement relevant parts of A74 and A75 #2844

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/grpc-js-xds/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_
export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'true') === 'true';
export const EXPERIMENTAL_PICK_FIRST = (process.env.GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG ?? 'false') === 'true';
export const EXPERIMENTAL_DUALSTACK_ENDPOINTS = (process.env.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS ?? 'true') === 'true';
export const AGGREGATE_CLUSTER_BACKWARDS_COMPAT = (process.env.GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT ?? 'false') === 'true';
2 changes: 0 additions & 2 deletions packages/grpc-js-xds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import * as resolver_xds from './resolver-xds';
import * as load_balancer_cds from './load-balancer-cds';
import * as xds_cluster_resolver from './load-balancer-xds-cluster-resolver';
import * as xds_cluster_impl from './load-balancer-xds-cluster-impl';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
Expand All @@ -40,7 +39,6 @@ export { XdsServerCredentials } from './xds-credentials';
export function register() {
resolver_xds.setup();
load_balancer_cds.setup();
xds_cluster_resolver.setup();
xds_cluster_impl.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
Expand Down
376 changes: 187 additions & 189 deletions packages/grpc-js-xds/src/load-balancer-cds.ts

Large diffs are not rendered by default.

95 changes: 38 additions & 57 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import SubchannelInterface = experimental.SubchannelInterface;
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
import UnavailablePicker = experimental.UnavailablePicker;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";

const TRACER_NAME = 'xds_cluster_impl';

Expand All @@ -53,59 +56,26 @@ export interface DropCategory {
requests_per_million: number;
}

function validateDropCategory(obj: any): DropCategory {
if (!('category' in obj && typeof obj.category === 'string')) {
throw new Error('xds_cluster_impl config drop_categories entry must have a string field category');
}
if (!('requests_per_million' in obj && typeof obj.requests_per_million === 'number')) {
throw new Error('xds_cluster_impl config drop_categories entry must have a number field requests_per_million');
}
return obj;
}

class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
private maxConcurrentRequests: number;
getLoadBalancerName(): string {
return TYPE_NAME;
}
toJsonObject(): object {
const jsonObj: {[key: string]: any} = {
cluster: this.cluster,
drop_categories: this.dropCategories,
child_policy: [this.childPolicy.toJsonObject()],
max_concurrent_requests: this.maxConcurrentRequests,
eds_service_name: this.edsServiceName,
lrs_load_reporting_server: this.lrsLoadReportingServer,
};
return {
[TYPE_NAME]: jsonObj
};
}

constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: TypedLoadBalancingConfig, private edsServiceName: string, private lrsLoadReportingServer?: XdsServerConfig, maxConcurrentRequests?: number) {
this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS;
}
constructor(private cluster: string, private childPolicy: TypedLoadBalancingConfig) {}

getCluster() {
return this.cluster;
}

getEdsServiceName() {
return this.edsServiceName;
}

getLrsLoadReportingServer() {
return this.lrsLoadReportingServer;
}

getMaxConcurrentRequests() {
return this.maxConcurrentRequests;
}

getDropCategories() {
return this.dropCategories;
}

getChildPolicy() {
return this.childPolicy;
}
Expand All @@ -114,27 +84,14 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
if (!('cluster' in obj && typeof obj.cluster === 'string')) {
throw new Error('xds_cluster_impl config must have a string field cluster');
}
if (!('eds_service_name' in obj && typeof obj.eds_service_name === 'string')) {
throw new Error('xds_cluster_impl config must have a string field eds_service_name');
}
if ('max_concurrent_requests' in obj && !(obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) {
throw new Error('xds_cluster_impl config max_concurrent_requests must be a number if provided');
}
if (!('drop_categories' in obj && Array.isArray(obj.drop_categories))) {
throw new Error('xds_cluster_impl config must have an array field drop_categories');
}
if (!('child_policy' in obj && Array.isArray(obj.child_policy))) {
throw new Error('xds_cluster_impl config must have an array field child_policy');
}
const childConfig = selectLbConfigFromList(obj.child_policy);
if (!childConfig) {
throw new Error('xds_cluster_impl config child_policy parsing failed');
}
let lrsServer: XdsServerConfig | undefined = undefined;
if (obj.lrs_load_reporting_server) {
lrsServer = validateXdsServerConfig(obj.lrs_load_reporting_server)
}
return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), childConfig, obj.eds_service_name, lrsServer, obj.max_concurrent_requests);
return new XdsClusterImplLoadBalancingConfig(obj.cluster, childConfig);
}
}

Expand Down Expand Up @@ -252,11 +209,12 @@ class XdsClusterImplBalancer implements LoadBalancer {
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
private clusterDropStats: XdsClusterDropStats | null = null;
private xdsClient: XdsClient | null = null;
private latestClusterConfig: ClusterConfig | null = null;

constructor(private readonly channelControlHelper: ChannelControlHelper, credentials: ChannelCredentials, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs, credentialsOverride) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
}
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs, credentialsOverride);
Expand All @@ -270,23 +228,23 @@ class XdsClusterImplBalancer implements LoadBalancer {
trace('Not reporting load for address ' + subchannelAddressToString(subchannelAddress) + ' because it has unknown locality.');
return wrapperChild;
}
const lrsServer = this.latestConfig.getLrsLoadReportingServer();
const lrsServer = this.latestClusterConfig.cluster.lrsLoadReportingServer;
let statsObj: XdsClusterLocalityStats | null = null;
if (lrsServer) {
statsObj = this.xdsClient.addClusterLocalityStats(
lrsServer,
this.latestConfig.getCluster(),
this.latestConfig.getEdsServiceName(),
this.latestClusterConfig.cluster.edsServiceName ?? '',
locality
);
}
return new LocalitySubchannelWrapper(wrapperChild, statsObj);
},
updateState: (connectivityState, originalPicker) => {
if (this.latestConfig === null) {
if (this.latestConfig === null || this.latestClusterConfig === null || this.latestClusterConfig.children.type === 'aggregate' || !this.latestClusterConfig.children.endpoints) {
channelControlHelper.updateState(connectivityState, originalPicker);
} else {
const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestConfig.getEdsServiceName()), this.latestConfig.getMaxConcurrentRequests(), this.latestConfig.getDropCategories(), this.clusterDropStats);
const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestClusterConfig.cluster.edsServiceName), this.latestClusterConfig.cluster.maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS, this.latestClusterConfig.children.endpoints.dropCategories, this.clusterDropStats);
channelControlHelper.updateState(connectivityState, picker);
}
}
Expand All @@ -297,15 +255,38 @@ class XdsClusterImplBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + lbConfig.getCluster());
return;
}
if (!maybeClusterConfig.success) {
this.latestClusterConfig = null;
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error));
return;
}
const clusterConfig = maybeClusterConfig.value;
if (clusterConfig.children.type === 'aggregate') {
trace('Received update for aggregate cluster ' + lbConfig.getCluster());
return;
}
if (!clusterConfig.children.endpoints) {
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({details: clusterConfig.children.resolutionNote}));

}
this.lastestEndpointList = endpointList;
this.latestConfig = lbConfig;
this.latestClusterConfig = clusterConfig;
this.xdsClient = attributes.xdsClient as XdsClient;
if (lbConfig.getLrsLoadReportingServer()) {
if (clusterConfig.cluster.lrsLoadReportingServer) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
lbConfig.getLrsLoadReportingServer()!,
clusterConfig.cluster.lrsLoadReportingServer,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
clusterConfig.cluster.edsServiceName ?? ''
);
}

Expand Down
Loading
Loading