Skip to content

Commit

Permalink
Passed the new status to and added unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
DNVindhya committed Oct 4, 2024
1 parent 74a23d0 commit 7301c5c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
18 changes: 12 additions & 6 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
*/
final class ControlPlaneClient {

public static final String CLOSED_BY_SERVER = "Closed by server";
public static final String CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE =
"Closed by server after receiving a response";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
Expand Down Expand Up @@ -400,10 +401,14 @@ private void handleRpcStreamClosed(Status status) {
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.OK.withDescription(CLOSED_BY_SERVER);
logger.log( XdsLogLevel.INFO, "ADS stream closed with error {0}: {1}. However, a "
newStatus = Status.UNAVAILABLE.withDescription(
CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE);
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}.",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after responses received status {0}: {1}. Cause: {2}");
}
} else {
// If the ADS stream is closed without ever having received a response from the server, then
Expand All @@ -414,13 +419,14 @@ private void handleRpcStreamClosed(Status status) {
}
}

if (!newStatus.isOk()) {
if (!newStatus.isOk() && !(newStatus.getDescription() != null
&& newStatus.getDescription().contains(CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE))) {
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
status.getCode(), status.getDescription(), status.getCause());
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
}
closed = true;
xdsResponseHandler.handleStreamClosed(status);
xdsResponseHandler.handleStreamClosed(newStatus);
cleanUp();

logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
Expand Down
8 changes: 6 additions & 2 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
import static io.grpc.xds.client.ControlPlaneClient.CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE;
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;

Expand Down Expand Up @@ -141,8 +142,11 @@ public void handleResourceResponse(
@Override
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext();
if (!error.isOk()) {
cleanUpResourceTimers();
cleanUpResourceTimers();
// Resource subscribers are not notified when the server closes the stream after a
// response is received.
if (!error.isOk() && !(error.getDescription() != null
&& error.getDescription().contains(CLOSED_BY_SERVER_AFTER_RECEIVING_A_RESPONSE))) {
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
Expand Down
40 changes: 30 additions & 10 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,28 @@ public void useIndependentRpcContext() {
}
}

@Test
public void streamClosedWithNoResponse() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
// Management server closes the RPC stream before sending any response.
call.sendCompleted();
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream failed due to connectivity error");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream failed due to connectivity error");
ScheduledTask retryTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER));
assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L);

verifyNoMoreInteractions(ldsResourceWatcher, rdsResourceWatcher);
}

@Test
public void streamClosedAndRetryWithBackoff() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
Expand Down Expand Up @@ -3408,10 +3430,10 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.DEADLINE_EXCEEDED.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);

// Reset backoff sequence and retry after backoff.
inOrder.verify(backoffPolicyProvider).get();
Expand All @@ -3430,9 +3452,9 @@ public void streamClosedAndRetryWithBackoff() {
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");

// Retry after backoff.
Expand Down Expand Up @@ -3516,10 +3538,8 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
assertThat(edsResourceTimeout.isCancelled()).isTrue();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(cdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(edsResourceWatcher, never()).onError(errorCaptor.capture());

fakeClock.forwardNanos(10L);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);
Expand Down

0 comments on commit 7301c5c

Please sign in to comment.