Skip to content

fix #3271: refining the wait logic #3274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* Fix #3243: pipes provided to exec command are no longer closed on connection close, so that client can fully read the buffer after the command finishes.
* Fix #3272: prevent index npe after informer sees an empty list
* Fix #3275: filter related dsl methods withLabel, withField, etc. should not modify the current context. If you need similar behavior to the previous use `Filterable.withNewFilter`.
* Fix #3271: waitUntilReady and waitUntilCondition should handle resource too old

#### Improvements
* Fix #3078: adding javadocs to further clarify patch, edit, replace, etc. and note the possibility of items being modified.
Expand Down Expand Up @@ -59,6 +60,8 @@
##### DSL Changes:
- #3127 `StatusUpdatable.updateStatus` deprecated, please use patchStatus, editStatus, or replaceStatus
- #3239 deprecated methods on SharedInformerFactory directly dealing with the OperationContext, withName, and withNamespace - the Informable interface should be used instead.
- #3271 `Waitable.waitUntilReady` and `Waitable.waitUntilCondition` with throw a KubernetesClientTimeoutException instead of an IllegalArgumentException on timeout. The methods will also no longer throw an interrupted exception.
`Waitable.withWaitRetryBackoff` and the associated constants are now deprecated.

##### Util Changes:
- #3197 `Utils.waitUntilReady` now accepts a Future, rather than a BlockingQueue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public class KubernetesClientTimeoutException extends KubernetesClientException
private static final String KNOWS_RESOURCES_FORMAT = "Timed out waiting for [%d] milliseconds for multiple resources. %s";

private final List<HasMetadata> resourcesNotReady;

public KubernetesClientTimeoutException(String kind, String name, String namespace, long amount, TimeUnit timeUnit) {
super(String.format(RESOURCE_FORMAT, timeUnit.toMillis(amount), kind, name, namespace));
this.resourcesNotReady = Collections.emptyList();
}

public KubernetesClientTimeoutException(HasMetadata resource, long amount, TimeUnit timeUnit) {
super(String.format(RESOURCE_FORMAT, timeUnit.toMillis(amount), resource.getKind(), resource.getMetadata().getName(), resource.getMetadata().getNamespace()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,4 @@ public boolean isHttpGone() {
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
}

public boolean isShouldRetry() {
return getCause() == null || !isHttpGone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@

public interface Waitable<T, P> {

/**
* @deprecated no longer used
*/
@Deprecated
long DEFAULT_INITIAL_BACKOFF_MILLIS = 5L;
/**
* @deprecated no longer used
*/
@Deprecated
double DEFAULT_BACKOFF_MULTIPLIER = 2d;

T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException;
T waitUntilReady(long amount, TimeUnit timeUnit);

T waitUntilCondition(Predicate<P> condition, long amount, TimeUnit timeUnit) throws InterruptedException;
T waitUntilCondition(Predicate<P> condition, long amount, TimeUnit timeUnit);

/**
* Configure the backoff strategy to use when waiting for conditions, in case the watcher encounters a retryable error.
* @param initialBackoff the value for the initial backoff on first error
* @param backoffUnit the TimeUnit for the initial backoff value
* @param backoffMultiplier what to multiply the backoff by on each subsequent error
* @return the waitable
* @deprecated no longer used
*/
@Deprecated
Waitable<T, P> withWaitRetryBackoff(long initialBackoff, TimeUnit backoffUnit, double backoffMultiplier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.WritableOperation;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import org.slf4j.Logger;
Expand All @@ -37,6 +36,7 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.OperationInfo;
import io.fabric8.kubernetes.client.ResourceNotFoundException;
import io.fabric8.kubernetes.client.Watch;
Expand Down Expand Up @@ -71,11 +71,11 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -87,8 +87,6 @@
import okhttp3.HttpUrl;
import okhttp3.Request;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceList<T>, R extends Resource<T>>
extends OperationSupport
implements
Expand All @@ -107,8 +105,6 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
private final boolean reloadingFromServer;
private final long gracePeriodSeconds;
private final DeletionPropagation propagationPolicy;
private final long watchRetryInitialBackoffMillis;
private final double watchRetryBackoffMultiplier;

protected String apiVersion;

Expand All @@ -124,8 +120,6 @@ protected BaseOperation(OperationContext ctx) {
this.resourceVersion = ctx.getResourceVersion();
this.gracePeriodSeconds = ctx.getGracePeriodSeconds();
this.propagationPolicy = ctx.getPropagationPolicy();
this.watchRetryInitialBackoffMillis = ctx.getWatchRetryInitialBackoffMillis();
this.watchRetryBackoffMultiplier = ctx.getWatchRetryBackoffMultiplier();
}

public BaseOperation<T, L, R> newInstance(OperationContext context) {
Expand Down Expand Up @@ -234,7 +228,7 @@ public RootPaths getRootPaths() {
public T edit(UnaryOperator<T> function) {
throw new KubernetesClientException(READ_ONLY_EDIT_EXCEPTION_MESSAGE);
}

@Override
public T editStatus(UnaryOperator<T> function) {
throw new KubernetesClientException(READ_ONLY_EDIT_EXCEPTION_MESSAGE);
Expand Down Expand Up @@ -389,15 +383,7 @@ public final T createOrReplace(T... items) {
CreateOrReplaceHelper<T> createOrReplaceHelper = new CreateOrReplaceHelper<>(
this::create,
this::replace,
m -> {
try {
return waitUntilCondition(Objects::nonNull, 1, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
LOG.warn("Interrupted while waiting for the resource to be created or replaced. Gracefully assuming the resource has not been created and doesn't exist. ({})", interruptedException.getMessage());
Thread.currentThread().interrupt();
}
return null;
},
m -> waitUntilCondition(Objects::nonNull, 1, TimeUnit.SECONDS),
m -> fromServer().get()
);

Expand All @@ -423,7 +409,7 @@ public FilterWatchListDeletable<T, L> withoutLabels(Map<String, String> labels)
public FilterWatchListDeletable<T, L> withLabelIn(String key, String... values) {
return withNewFilter().withLabelIn(key, values).endFilter();
}

@Override
public FilterWatchListDeletable<T, L> withLabelNotIn(String key, String... values) {
return withNewFilter().withLabelNotIn(key, values).endFilter();
Expand All @@ -446,7 +432,7 @@ public FilterWatchListDeletable<T, L> withFields(Map<String, String> fields) {

@Override
public FilterWatchListDeletable<T, L> withField(String key, String value) {
return withNewFilter().withField(key, value).endFilter();
return withNewFilter().withField(key, value).endFilter();
}

@Override
Expand All @@ -461,7 +447,7 @@ public FilterWatchListDeletable<T, L> withInvolvedObject(ObjectReference objectR
public FilterNested<FilterWatchListDeletable<T, L>> withNewFilter() {
return new FilterNestedImpl<>(this);
}

@Override
public FilterWatchListDeletable<T, L> withoutFields(Map<String, String> fields) {
return withNewFilter().withoutFields(fields).endFilter();
Expand All @@ -474,7 +460,7 @@ public FilterWatchListDeletable<T, L> withoutField(String key, String value) {

public String getLabelQueryParam() {
StringBuilder sb = new StringBuilder();

Map<String, String> labels = context.getLabels();
if (labels != null && !labels.isEmpty()) {
for (Iterator<Map.Entry<String, String>> iter = labels.entrySet().iterator(); iter.hasNext(); ) {
Expand Down Expand Up @@ -656,7 +642,7 @@ public T updateStatus(T item) {
}

}

@Override
public T patchStatus(T item) {
throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE);
Expand Down Expand Up @@ -760,7 +746,7 @@ public Watch watch(ListOptions options, final Watcher<T> watcher) {
public T replace(T item) {
throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE);
}

@Override
public T replaceStatus(T item) {
throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE);
Expand All @@ -770,7 +756,7 @@ public T replaceStatus(T item) {
public T patch(PatchContext patchContext, String patch) {
throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE);
}

@Override
public T patch(PatchContext patchContext, T item) {
throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE);
Expand Down Expand Up @@ -985,66 +971,61 @@ public final Boolean isReady() {
}

@Override
public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException {
public T waitUntilReady(long amount, TimeUnit timeUnit) {
return waitUntilCondition(resource -> Objects.nonNull(resource) && getReadiness().isReady(resource), amount, timeUnit);
}

@Override
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit)
throws InterruptedException {
return waitUntilConditionWithRetries(condition, timeUnit.toNanos(amount), watchRetryInitialBackoffMillis);
}

private T waitUntilConditionWithRetries(Predicate<T> condition, long timeoutNanos, long backoffMillis)
throws InterruptedException {
ListOptions options = null;

if (resourceVersion != null) {
options = createListOptions(resourceVersion);
}
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit) {
CompletableFuture<T> future = new CompletableFuture<>();
// tests the condition, trapping any exceptions
Consumer<T> tester = obj -> {
try {
if (condition.test(obj)) {
future.complete(obj);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
};
// start an informer that supplies the tester with events and empty list handling
try (SharedIndexInformer<T> informer = this.createInformer(0, l -> {
if (l.getItems().isEmpty()) {
tester.accept(null);
}
}, new ResourceEventHandler<T>() {

long currentBackOff = backoffMillis;
long remainingNanosToWait = timeoutNanos;
while (remainingNanosToWait > 0) {
@Override
public void onAdd(T obj) {
tester.accept(obj);
}

T item = fromServer().get();
if (condition.test(item)) {
return item;
} else if (options == null) {
options = createListOptions(getResourceVersion(item));
@Override
public void onUpdate(T oldObj, T newObj) {
tester.accept(newObj);
}

final WaitForConditionWatcher<T> watcher = new WaitForConditionWatcher<>(condition);
final long startTime = System.nanoTime();
try (Watch ignored = watch(options, watcher)) {
return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WatcherException && ((WatcherException) cause).isShouldRetry()) {
LOG.debug("retryable watch exception encountered, retrying after {} millis", currentBackOff, cause);
Thread.sleep(currentBackOff);
currentBackOff *= watchRetryBackoffMultiplier;
remainingNanosToWait -= (System.nanoTime() - startTime);
} else {
throw KubernetesClientException.launderThrowable(cause);
}
} catch (TimeoutException e) {
break;
@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
tester.accept(null);
}
})) {
// prevent unnecessary watches
future.whenComplete((r,t) -> informer.stop());
informer.run();
return future.get(amount, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (ExecutionException e) {
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (TimeoutException e) {
T i = getItem();
if (i != null) {
throw new KubernetesClientTimeoutException(i, amount, timeUnit);
}
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
}

LOG.debug("ran out of time waiting for watcher, wait condition not met");
throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!");
}

private static String getResourceVersion(HasMetadata item) {
return (item == null) ? null : item.getMetadata().getResourceVersion();
}

private static ListOptions createListOptions(String resourceVersion) {
return new ListOptionsBuilder()
.withResourceVersion(resourceVersion)
.build();
}

public void setType(Class<T> type) {
Expand All @@ -1063,40 +1044,53 @@ public void setNamespace(String namespace) {
public WritableOperation<T> dryRun(boolean isDryRun) {
return newInstance(context.withDryRun(isDryRun));
}

@Override
public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers) {
BaseOperation<T, L, R> result = newInstance(context);
result.indexers = indexers;
return result;
}

@Override
public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync) {
// convert the name into something listable
FilterWatchListDeletable<T, L> baseOperation =
getName() == null ? this : withFields(Collections.singletonMap("metadata.name", getName()));
DefaultSharedIndexInformer<T, L> result = createInformer(resync, null, handler);
// synchronous start list/watch must succeed in the calling thread
// initial add events will be processed in the calling thread as well
result.run();
return result;
}

private DefaultSharedIndexInformer<T, L> createInformer(long resync, Consumer<L> onList, ResourceEventHandler<T> handler) {
T i = getItem();
String name = (Utils.isNotNullOrEmpty(getName()) || i != null) ? checkName(i) : null;

// use the local context / namespace
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), new ListerWatcher<T, L>() {
@Override
public L list(ListOptions params, String namespace, OperationContext context) {
return baseOperation.list(params);
// convert the name into something listable
if (name != null) {
params.setFieldSelector("metadata.name="+name);
}
L result = BaseOperation.this.list(params);
if (onList != null) {
onList.accept(result);
}
return result;
}

@Override
public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> watcher) {
return baseOperation.watch(params, watcher);
return BaseOperation.this.watch(params, watcher);
}
}, resync, context, Runnable::run); // just run the event notification in the websocket thread
if (handler != null) {
informer.addEventHandler(handler);
}
if (indexers != null) {
informer.addIndexers(indexers);
}
// synchronous start list/watch must succeed in the calling thread
informer.run();
if (handler != null) {
informer.addEventHandler(handler);
}
return informer;
}
}
Expand Down
Loading