Skip to content

Commit

Permalink
xdsClient metrics structure
Browse files Browse the repository at this point in the history
  • Loading branch information
DNVindhya committed Oct 31, 2024
1 parent 36e29ab commit 7496537
Show file tree
Hide file tree
Showing 19 changed files with 1,805 additions and 116 deletions.
33 changes: 31 additions & 2 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public static final class Args {
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;
@Nullable private final String overrideAuthority;
@Nullable private final MetricRecorder metricRecorder;

private Args(
Integer defaultPort,
Expand All @@ -297,7 +298,8 @@ private Args(
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor,
@Nullable String overrideAuthority) {
@Nullable String overrideAuthority,
@Nullable MetricRecorder metricRecorder) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
Expand All @@ -306,6 +308,7 @@ private Args(
this.channelLogger = channelLogger;
this.executor = executor;
this.overrideAuthority = overrideAuthority;
this.metricRecorder = metricRecorder;
}

/**
Expand Down Expand Up @@ -403,6 +406,17 @@ public String getOverrideAuthority() {
return overrideAuthority;
}

/**
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*
* @since 1.67.0
*/
@Nullable
@ExperimentalApi("Insert GitHub issue")
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}


@Override
public String toString() {
Expand All @@ -415,6 +429,7 @@ public String toString() {
.add("channelLogger", channelLogger)
.add("executor", executor)
.add("overrideAuthority", overrideAuthority)
.add("metricRecorder", metricRecorder)
.toString();
}

Expand All @@ -433,6 +448,7 @@ public Builder toBuilder() {
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
builder.setOverrideAuthority(overrideAuthority);
builder.setMetricRecorder(metricRecorder);
return builder;
}

Expand All @@ -459,6 +475,7 @@ public static final class Builder {
private ChannelLogger channelLogger;
private Executor executor;
private String overrideAuthority;
private MetricRecorder metricRecorder;

Builder() {
}
Expand Down Expand Up @@ -545,6 +562,17 @@ public Builder setOverrideAuthority(String authority) {
return this;
}

/**
* See {@link Args#getMetricRecorder()}. This is an optional field.
*
* @since 1.67.0
*/
@ExperimentalApi("Insert github issue")
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
return this;
}

/**
* Builds an {@link Args}.
*
Expand All @@ -554,7 +582,8 @@ public Args build() {
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
scheduledExecutorService, channelLogger, executor, overrideAuthority);
scheduledExecutorService, channelLogger, executor, overrideAuthority,
metricRecorder);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ ClientStream newSubstream(
builder.maxHedgedAttempts,
loadBalancerFactory);
this.authorityOverride = builder.authorityOverride;
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
this.nameResolverArgs =
NameResolver.Args.newBuilder()
.setDefaultPort(builder.getDefaultPort())
Expand All @@ -599,6 +601,7 @@ ClientStream newSubstream(
.setChannelLogger(channelLogger)
.setOffloadExecutor(this.offloadExecutorHolder)
.setOverrideAuthority(this.authorityOverride)
.setMetricRecorder(this.metricRecorder)
.build();
this.nameResolver = getNameResolver(
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
Expand Down Expand Up @@ -671,8 +674,6 @@ public CallTracer create() {
}
serviceConfigUpdated = true;
}
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import io.grpc.Internal;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
Expand All @@ -36,6 +37,11 @@ public static void setDefaultProviderBootstrapOverride(Map<String, ?> bootstrap)

public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
return getOrCreate(target, new MetricRecorder() {});
}

public static ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target, metricRecorder);
}
}
24 changes: 21 additions & 3 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.MetricRecorder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand Down Expand Up @@ -56,6 +57,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();
private final Map<String, MetricRecorder> targetToMetricRecorderMap = new ConcurrentHashMap<>();

SharedXdsClientPoolProvider() {
this(new GrpcBootstrapperImpl());
Expand All @@ -82,7 +84,14 @@ public ObjectPool<XdsClient> get(String target) {
}

@Override
@Nullable
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
return this.getOrCreate(target, new MetricRecorder() {});
}

@Override
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
Expand All @@ -98,8 +107,12 @@ public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitialization
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
MetricRecorder metricRecorderForTarget = targetToMetricRecorderMap.get(target);
metricRecorder =
metricRecorderForTarget != null ? metricRecorderForTarget : metricRecorder;
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
targetToXdsClientMap.put(target, ref);
targetToMetricRecorderMap.putIfAbsent(target, metricRecorder);
}
}
}
Expand All @@ -124,6 +137,7 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final MetricRecorder metricRecorder;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
Expand All @@ -133,9 +147,11 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private int refCount;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
MetricRecorder metricRecorder) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
this.metricRecorder = metricRecorder;
}

@Override
Expand All @@ -154,7 +170,9 @@ public XdsClient getObject() {
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
new TlsContextManagerImpl(bootstrapInfo),
getTarget(),
new XdsClientMetricReporterImpl(metricRecorder));
}
refCount++;
return xdsClient;
Expand Down
99 changes: 99 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import io.grpc.Internal;
import io.grpc.xds.client.XdsClient;

/**
* Interface for reporting metrics from the xDS client.
* We need this indirection, to de couple Xds from OpenTelemetry
*/
@Internal
public interface XdsClientMetricReporter {

/**
* Reports resource update counts.
*
* @param validResourceCount the number of resources that were successfully updated.
* @param invalidResourceCount the number of resources that failed to update.
* @param target the xDS management server name for the load balancing policy this update is for.
* @param xdsServer the xDS management server address for this update.
* @param resourceType the type of resource (e.g., "LDS", "RDS", "CDS", "EDS").
*/
default void reportResourceUpdates(long validResourceCount, long invalidResourceCount,
String target, String xdsServer, String resourceType) {
}

/**
* Reports xDS server failure counts.
*
* @param serverFailure the number of times the xDS server has failed.
* @param target the xDS management server name for the load balancing policy this failure is for.
* @param xdsServer the xDS management server address for this failure.
*/
default void reportServerFailure(long serverFailure, String target, String xdsServer) {
}

/**
* Sets the {@link XdsClient} instance to the reporter.
*
* @param xdsClient the {@link XdsClient} instance.
*/
default void setXdsClient(XdsClient xdsClient) {
}

/**
* Closes the metric reporter.
*/
default void close() {
}

/**
* Interface for reporting metrics from the xDS client callbacks.
*
*/
interface CallbackMetricReporter {

/**
* Reports resource counts in the cache.
*
* @param resourceCount the number of resources in the cache.
* @param cacheState the state of the cache (e.g., "SYNCED", "DOES_NOT_EXIST").
* @param resourceType the type of resource (e.g., "LDS", "RDS", "CDS", "EDS").
* @param target the xDS management server name for the load balancing policy this count is
* for.
*/
// TODO(@dnvindhya): include the "authority" label once authority is available.
default void reportResourceCounts(long resourceCount, String cacheState, String resourceType,
String target) {
}

/**
* Reports server connection status.
*
* @param isConnected {@code true} if the client is connected to the xDS server, {@code false}
* otherwise.
* @param target the xDS management server name for the load balancing policy this connection
* is for.
* @param xdsServer the xDS management server address for this connection.
* @since 0.1.0
*/
default void reportServerConnections(int isConnected, String target, String xdsServer) {
}
}
}
Loading

0 comments on commit 7496537

Please sign in to comment.