Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 1.12.x into master #2910

Merged
merged 10 commits into from
Feb 28, 2025
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.12.0",
"version": "1.12.2",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.12.5",
"version": "1.12.6",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
95 changes: 57 additions & 38 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,13 @@ class ChannelSubchannelWrapper
) => {
channel.throttleKeepalive(keepaliveTime);
};
childSubchannel.addConnectivityStateListener(this.subchannelStateListener);
}

ref(): void {
if (this.refCount === 0) {
this.child.addConnectivityStateListener(this.subchannelStateListener);
this.channel.addWrappedSubchannel(this);
}
this.child.ref();
this.refCount += 1;
}
Expand Down Expand Up @@ -160,6 +163,25 @@ class ShutdownPicker implements Picker {
}

export const SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX = 'grpc.internal.no_subchannel';
class ChannelzInfoTracker {
readonly trace = new ChannelzTrace();
readonly callTracker = new ChannelzCallTracker();
readonly childrenTracker = new ChannelzChildrenTracker();
state: ConnectivityState = ConnectivityState.IDLE;
constructor(private target: string) {}

getChannelzInfoCallback(): () => ChannelInfo {
return () => {
return {
target: this.target,
state: this.state,
trace: this.trace,
callTracker: this.callTracker,
children: this.childrenTracker.getChildLists()
};
};
}
}

export class InternalChannel {
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
Expand All @@ -181,9 +203,10 @@ export class InternalChannel {
* event loop open while there are any pending calls for the channel that
* have not yet been assigned to specific subchannels. In other words,
* the invariant is that callRefTimer is reffed if and only if pickQueue
* is non-empty.
* is non-empty. In addition, the timer is null while the state is IDLE or
* SHUTDOWN and there are no pending calls.
*/
private readonly callRefTimer: NodeJS.Timeout;
private callRefTimer: NodeJS.Timeout | null = null;
private configSelector: ConfigSelector | null = null;
/**
* This is the error from the name resolver if it failed most recently. It
Expand All @@ -205,11 +228,8 @@ export class InternalChannel {

// Channelz info
private readonly channelzEnabled: boolean = true;
private readonly originalTarget: string;
private readonly channelzRef: ChannelRef;
private readonly channelzTrace: ChannelzTrace;
private readonly callTracker = new ChannelzCallTracker();
private readonly childrenTracker = new ChannelzChildrenTracker();
private readonly channelzInfoTracker: ChannelzInfoTracker;

/**
* Randomly generated ID to be passed to the config selector, for use by
Expand Down Expand Up @@ -238,7 +258,7 @@ export class InternalChannel {
throw new TypeError('Channel options must be an object');
}
}
this.originalTarget = target;
this.channelzInfoTracker = new ChannelzInfoTracker(target);
const originalTargetUri = parseUri(target);
if (originalTargetUri === null) {
throw new Error(`Could not parse target name "${target}"`);
Expand All @@ -252,21 +272,17 @@ export class InternalChannel {
);
}

this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
this.callRefTimer.unref?.();

if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}

this.channelzTrace = new ChannelzTrace();
this.channelzRef = registerChannelzChannel(
target,
() => this.getChannelzInfo(),
this.channelzInfoTracker.getChannelzInfoCallback(),
this.channelzEnabled
);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
this.channelzInfoTracker.trace.addTrace('CT_INFO', 'Channel created');
}

if (this.options['grpc.default_authority']) {
Expand Down Expand Up @@ -312,7 +328,7 @@ export class InternalChannel {
);
subchannel.throttleKeepalive(this.keepaliveTime);
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_INFO',
'Created subchannel or used existing subchannel',
subchannel.getChannelzRef()
Expand All @@ -322,7 +338,6 @@ export class InternalChannel {
subchannel,
this
);
this.wrappedSubchannels.add(wrappedSubchannel);
return wrappedSubchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
Expand All @@ -345,12 +360,12 @@ export class InternalChannel {
},
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
if (this.channelzEnabled) {
this.childrenTracker.refChild(child);
this.channelzInfoTracker.childrenTracker.refChild(child);
}
},
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
if (this.channelzEnabled) {
this.childrenTracker.unrefChild(child);
this.channelzInfoTracker.childrenTracker.unrefChild(child);
}
},
};
Expand All @@ -372,7 +387,7 @@ export class InternalChannel {
RETRY_THROTTLER_MAP.delete(this.getTarget());
}
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_INFO',
'Address resolution succeeded'
);
Expand All @@ -395,7 +410,7 @@ export class InternalChannel {
},
status => {
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_WARNING',
'Address resolution failed with code ' +
status.code +
Expand Down Expand Up @@ -447,16 +462,6 @@ export class InternalChannel {
this.lastActivityTimestamp = new Date();
}

private getChannelzInfo(): ChannelInfo {
return {
target: this.originalTarget,
state: this.connectivityState,
trace: this.channelzTrace,
callTracker: this.callTracker,
children: this.childrenTracker.getChildLists(),
};
}

private trace(text: string, verbosityOverride?: LogVerbosity) {
trace(
verbosityOverride ?? LogVerbosity.DEBUG,
Expand All @@ -466,6 +471,9 @@ export class InternalChannel {
}

private callRefTimerRef() {
if (!this.callRefTimer) {
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME)
}
// If the hasRef function does not exist, always run the code
if (!this.callRefTimer.hasRef?.()) {
this.trace(
Expand All @@ -479,15 +487,15 @@ export class InternalChannel {
}

private callRefTimerUnref() {
// If the hasRef function does not exist, always run the code
if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
// If the timer or the hasRef function does not exist, always run the code
if (!this.callRefTimer?.hasRef || this.callRefTimer.hasRef()) {
this.trace(
'callRefTimer.unref | configSelectionQueue.length=' +
this.configSelectionQueue.length +
' pickQueue.length=' +
this.pickQueue.length
);
this.callRefTimer.unref?.();
this.callRefTimer?.unref?.();
}
}

Expand Down Expand Up @@ -516,12 +524,13 @@ export class InternalChannel {
ConnectivityState[newState]
);
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_INFO',
'Connectivity state change to ' + ConnectivityState[newState]
);
}
this.connectivityState = newState;
this.channelzInfoTracker.state = newState;
const watchersCopy = this.connectivityStateWatchers.slice();
for (const watcherObject of watchersCopy) {
if (newState !== watcherObject.currentState) {
Expand All @@ -546,6 +555,10 @@ export class InternalChannel {
}
}

addWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.add(wrappedSubchannel);
}

removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.delete(wrappedSubchannel);
}
Expand Down Expand Up @@ -598,6 +611,10 @@ export class InternalChannel {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
if (this.callRefTimer) {
clearInterval(this.callRefTimer);
this.callRefTimer = null;
}
}

private startIdleTimeout(timeoutMs: number) {
Expand Down Expand Up @@ -641,17 +658,17 @@ export class InternalChannel {

private onCallStart() {
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
this.channelzInfoTracker.callTracker.addCallStarted();
}
this.callCount += 1;
}

private onCallEnd(status: StatusObject) {
if (this.channelzEnabled) {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
this.channelzInfoTracker.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
this.channelzInfoTracker.callTracker.addCallFailed();
}
}
this.callCount -= 1;
Expand Down Expand Up @@ -755,7 +772,9 @@ export class InternalChannel {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.pickQueue = [];
clearInterval(this.callRefTimer);
if (this.callRefTimer) {
clearInterval(this.callRefTimer);
}
if (this.idleTimer) {
clearTimeout(this.idleTimer);
}
Expand Down