diff --git a/CHANGELOG.md b/CHANGELOG.md index d7a6dbcfea9..4a7609253fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * Fix #3243: pipes provided to exec command are no longer closed on connection close, so that client can fully read the buffer after the command finishes. * Fix #3272: prevent index npe after informer sees an empty list * Fix #3275: filter related dsl methods withLabel, withField, etc. should not modify the current context. If you need similar behavior to the previous use `Filterable.withNewFilter`. +* Fix #3271: waitUntilReady and waitUntilCondition should handle resource too old #### Improvements * Fix #3078: adding javadocs to further clarify patch, edit, replace, etc. and note the possibility of items being modified. @@ -59,6 +60,8 @@ ##### DSL Changes: - #3127 `StatusUpdatable.updateStatus` deprecated, please use patchStatus, editStatus, or replaceStatus - #3239 deprecated methods on SharedInformerFactory directly dealing with the OperationContext, withName, and withNamespace - the Informable interface should be used instead. +- #3271 `Waitable.waitUntilReady` and `Waitable.waitUntilCondition` with throw a KubernetesClientTimeoutException instead of an IllegalArgumentException on timeout. The methods will also no longer throw an interrupted exception. + `Waitable.withWaitRetryBackoff` and the associated constants are now deprecated. ##### Util Changes: - #3197 `Utils.waitUntilReady` now accepts a Future, rather than a BlockingQueue diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/KubernetesClientTimeoutException.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/KubernetesClientTimeoutException.java index 807f7e56ccc..ad1eb9c63f9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/KubernetesClientTimeoutException.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/KubernetesClientTimeoutException.java @@ -30,6 +30,11 @@ public class KubernetesClientTimeoutException extends KubernetesClientException private static final String KNOWS_RESOURCES_FORMAT = "Timed out waiting for [%d] milliseconds for multiple resources. %s"; private final List resourcesNotReady; + + public KubernetesClientTimeoutException(String kind, String name, String namespace, long amount, TimeUnit timeUnit) { + super(String.format(RESOURCE_FORMAT, timeUnit.toMillis(amount), kind, name, namespace)); + this.resourcesNotReady = Collections.emptyList(); + } public KubernetesClientTimeoutException(HasMetadata resource, long amount, TimeUnit timeUnit) { super(String.format(RESOURCE_FORMAT, timeUnit.toMillis(amount), resource.getKind(), resource.getMetadata().getName(), resource.getMetadata().getNamespace())); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/WatcherException.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/WatcherException.java index c5c61e9398d..2803fc9393e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/WatcherException.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/WatcherException.java @@ -39,7 +39,4 @@ public boolean isHttpGone() { || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE); } - public boolean isShouldRetry() { - return getCause() == null || !isHttpGone(); - } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Waitable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Waitable.java index fad51c76187..ef331895ee8 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Waitable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Waitable.java @@ -20,12 +20,20 @@ public interface Waitable { + /** + * @deprecated no longer used + */ + @Deprecated long DEFAULT_INITIAL_BACKOFF_MILLIS = 5L; + /** + * @deprecated no longer used + */ + @Deprecated double DEFAULT_BACKOFF_MULTIPLIER = 2d; - T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException; + T waitUntilReady(long amount, TimeUnit timeUnit); - T waitUntilCondition(Predicate

condition, long amount, TimeUnit timeUnit) throws InterruptedException; + T waitUntilCondition(Predicate

condition, long amount, TimeUnit timeUnit); /** * Configure the backoff strategy to use when waiting for conditions, in case the watcher encounters a retryable error. @@ -33,6 +41,8 @@ public interface Waitable { * @param backoffUnit the TimeUnit for the initial backoff value * @param backoffMultiplier what to multiply the backoff by on each subsequent error * @return the waitable + * @deprecated no longer used */ + @Deprecated Waitable withWaitRetryBackoff(long initialBackoff, TimeUnit backoffUnit, double backoffMultiplier); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java index 14df3d4ec17..e660a0e087c 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java @@ -16,7 +16,6 @@ package io.fabric8.kubernetes.client.dsl.base; import io.fabric8.kubernetes.api.model.ObjectReference; -import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.WritableOperation; import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper; import org.slf4j.Logger; @@ -37,6 +36,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.OperationInfo; import io.fabric8.kubernetes.client.ResourceNotFoundException; import io.fabric8.kubernetes.client.Watch; @@ -71,11 +71,11 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,8 +87,6 @@ import okhttp3.HttpUrl; import okhttp3.Request; -import static java.util.concurrent.TimeUnit.NANOSECONDS; - public class BaseOperation, R extends Resource> extends OperationSupport implements @@ -107,8 +105,6 @@ public class BaseOperation newInstance(OperationContext context) { @@ -234,7 +228,7 @@ public RootPaths getRootPaths() { public T edit(UnaryOperator function) { throw new KubernetesClientException(READ_ONLY_EDIT_EXCEPTION_MESSAGE); } - + @Override public T editStatus(UnaryOperator function) { throw new KubernetesClientException(READ_ONLY_EDIT_EXCEPTION_MESSAGE); @@ -389,15 +383,7 @@ public final T createOrReplace(T... items) { CreateOrReplaceHelper createOrReplaceHelper = new CreateOrReplaceHelper<>( this::create, this::replace, - m -> { - try { - return waitUntilCondition(Objects::nonNull, 1, TimeUnit.SECONDS); - } catch (InterruptedException interruptedException) { - LOG.warn("Interrupted while waiting for the resource to be created or replaced. Gracefully assuming the resource has not been created and doesn't exist. ({})", interruptedException.getMessage()); - Thread.currentThread().interrupt(); - } - return null; - }, + m -> waitUntilCondition(Objects::nonNull, 1, TimeUnit.SECONDS), m -> fromServer().get() ); @@ -423,7 +409,7 @@ public FilterWatchListDeletable withoutLabels(Map labels) public FilterWatchListDeletable withLabelIn(String key, String... values) { return withNewFilter().withLabelIn(key, values).endFilter(); } - + @Override public FilterWatchListDeletable withLabelNotIn(String key, String... values) { return withNewFilter().withLabelNotIn(key, values).endFilter(); @@ -446,7 +432,7 @@ public FilterWatchListDeletable withFields(Map fields) { @Override public FilterWatchListDeletable withField(String key, String value) { - return withNewFilter().withField(key, value).endFilter(); + return withNewFilter().withField(key, value).endFilter(); } @Override @@ -461,7 +447,7 @@ public FilterWatchListDeletable withInvolvedObject(ObjectReference objectR public FilterNested> withNewFilter() { return new FilterNestedImpl<>(this); } - + @Override public FilterWatchListDeletable withoutFields(Map fields) { return withNewFilter().withoutFields(fields).endFilter(); @@ -474,7 +460,7 @@ public FilterWatchListDeletable withoutField(String key, String value) { public String getLabelQueryParam() { StringBuilder sb = new StringBuilder(); - + Map labels = context.getLabels(); if (labels != null && !labels.isEmpty()) { for (Iterator> iter = labels.entrySet().iterator(); iter.hasNext(); ) { @@ -656,7 +642,7 @@ public T updateStatus(T item) { } } - + @Override public T patchStatus(T item) { throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE); @@ -760,7 +746,7 @@ public Watch watch(ListOptions options, final Watcher watcher) { public T replace(T item) { throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE); } - + @Override public T replaceStatus(T item) { throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE); @@ -770,7 +756,7 @@ public T replaceStatus(T item) { public T patch(PatchContext patchContext, String patch) { throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE); } - + @Override public T patch(PatchContext patchContext, T item) { throw new KubernetesClientException(READ_ONLY_UPDATE_EXCEPTION_MESSAGE); @@ -985,66 +971,61 @@ public final Boolean isReady() { } @Override - public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException { + public T waitUntilReady(long amount, TimeUnit timeUnit) { return waitUntilCondition(resource -> Objects.nonNull(resource) && getReadiness().isReady(resource), amount, timeUnit); } @Override - public T waitUntilCondition(Predicate condition, long amount, TimeUnit timeUnit) - throws InterruptedException { - return waitUntilConditionWithRetries(condition, timeUnit.toNanos(amount), watchRetryInitialBackoffMillis); - } - - private T waitUntilConditionWithRetries(Predicate condition, long timeoutNanos, long backoffMillis) - throws InterruptedException { - ListOptions options = null; - - if (resourceVersion != null) { - options = createListOptions(resourceVersion); - } + public T waitUntilCondition(Predicate condition, long amount, TimeUnit timeUnit) { + CompletableFuture future = new CompletableFuture<>(); + // tests the condition, trapping any exceptions + Consumer tester = obj -> { + try { + if (condition.test(obj)) { + future.complete(obj); + } + } catch (Exception e) { + future.completeExceptionally(e); + } + }; + // start an informer that supplies the tester with events and empty list handling + try (SharedIndexInformer informer = this.createInformer(0, l -> { + if (l.getItems().isEmpty()) { + tester.accept(null); + } + }, new ResourceEventHandler() { - long currentBackOff = backoffMillis; - long remainingNanosToWait = timeoutNanos; - while (remainingNanosToWait > 0) { + @Override + public void onAdd(T obj) { + tester.accept(obj); + } - T item = fromServer().get(); - if (condition.test(item)) { - return item; - } else if (options == null) { - options = createListOptions(getResourceVersion(item)); + @Override + public void onUpdate(T oldObj, T newObj) { + tester.accept(newObj); } - final WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - final long startTime = System.nanoTime(); - try (Watch ignored = watch(options, watcher)) { - return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof WatcherException && ((WatcherException) cause).isShouldRetry()) { - LOG.debug("retryable watch exception encountered, retrying after {} millis", currentBackOff, cause); - Thread.sleep(currentBackOff); - currentBackOff *= watchRetryBackoffMultiplier; - remainingNanosToWait -= (System.nanoTime() - startTime); - } else { - throw KubernetesClientException.launderThrowable(cause); - } - } catch (TimeoutException e) { - break; + @Override + public void onDelete(T obj, boolean deletedFinalStateUnknown) { + tester.accept(null); } + })) { + // prevent unnecessary watches + future.whenComplete((r,t) -> informer.stop()); + informer.run(); + return future.get(amount, timeUnit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e.getCause()); + } catch (ExecutionException e) { + throw KubernetesClientException.launderThrowable(e.getCause()); + } catch (TimeoutException e) { + T i = getItem(); + if (i != null) { + throw new KubernetesClientTimeoutException(i, amount, timeUnit); + } + throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit); } - - LOG.debug("ran out of time waiting for watcher, wait condition not met"); - throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!"); - } - - private static String getResourceVersion(HasMetadata item) { - return (item == null) ? null : item.getMetadata().getResourceVersion(); - } - - private static ListOptions createListOptions(String resourceVersion) { - return new ListOptionsBuilder() - .withResourceVersion(resourceVersion) - .build(); } public void setType(Class type) { @@ -1063,40 +1044,53 @@ public void setNamespace(String namespace) { public WritableOperation dryRun(boolean isDryRun) { return newInstance(context.withDryRun(isDryRun)); } - + @Override public Informable withIndexers(Map>> indexers) { BaseOperation result = newInstance(context); result.indexers = indexers; return result; } - + @Override public SharedIndexInformer inform(ResourceEventHandler handler, long resync) { - // convert the name into something listable - FilterWatchListDeletable baseOperation = - getName() == null ? this : withFields(Collections.singletonMap("metadata.name", getName())); + DefaultSharedIndexInformer result = createInformer(resync, null, handler); + // synchronous start list/watch must succeed in the calling thread + // initial add events will be processed in the calling thread as well + result.run(); + return result; + } + + private DefaultSharedIndexInformer createInformer(long resync, Consumer onList, ResourceEventHandler handler) { + T i = getItem(); + String name = (Utils.isNotNullOrEmpty(getName()) || i != null) ? checkName(i) : null; // use the local context / namespace DefaultSharedIndexInformer informer = new DefaultSharedIndexInformer<>(getType(), new ListerWatcher() { @Override public L list(ListOptions params, String namespace, OperationContext context) { - return baseOperation.list(params); + // convert the name into something listable + if (name != null) { + params.setFieldSelector("metadata.name="+name); + } + L result = BaseOperation.this.list(params); + if (onList != null) { + onList.accept(result); + } + return result; } - + @Override public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher watcher) { - return baseOperation.watch(params, watcher); + return BaseOperation.this.watch(params, watcher); } }, resync, context, Runnable::run); // just run the event notification in the websocket thread - if (handler != null) { - informer.addEventHandler(handler); - } if (indexers != null) { informer.addIndexers(indexers); } - // synchronous start list/watch must succeed in the calling thread - informer.run(); + if (handler != null) { + informer.addEventHandler(handler); + } return informer; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java deleted file mode 100644 index 675a88ca52a..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.base; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Predicate; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; - -public class WaitForConditionWatcher implements Watcher { - - private final Predicate condition; - private final CompletableFuture future; - - public WaitForConditionWatcher(Predicate condition) { - this.condition = condition; - this.future = new CompletableFuture<>(); - } - - public CompletableFuture getFuture() { - return future; - } - - @Override - public void eventReceived(Action action, T resource) { - switch (action) { - case ADDED: - case MODIFIED: - if (condition.test(resource)) { - future.complete(resource); - } - break; - case DELETED: - if (condition.test(null)) { - future.complete(null); - } else { - future.completeExceptionally(new WatcherException("Unexpected deletion of watched resource, will never satisfy condition")); - } - break; - case ERROR: - future.completeExceptionally(new WatcherException("Action.ERROR received")); - break; - } - } - - @Override - public void onClose(WatcherException cause) { - future.completeExceptionally(cause); - } - - @Override - public void onClose() { - future.completeExceptionally(new WatcherException("Watcher closed")); - } -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl.java index 554cf1c1cd5..273a6c31ba4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl.java @@ -250,10 +250,15 @@ public final Boolean isReady() { } @Override - public HasMetadata waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException { + public HasMetadata waitUntilReady(long amount, TimeUnit timeUnit) { HasMetadata meta = acceptVisitors(asHasMetadata(get()), visitors); ResourceHandler h = handlerOf(meta); - return h.waitUntilReady(client, config, meta.getMetadata().getNamespace(), meta, amount, timeUnit); + try { + return h.waitUntilReady(client, config, meta.getMetadata().getNamespace(), meta, amount, timeUnit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e); + } } @Override @@ -263,10 +268,15 @@ public VisitFromServerWritable dryRun(boolean isDryRun) { @Override public HasMetadata waitUntilCondition(Predicate condition, long amount, - TimeUnit timeUnit) throws InterruptedException { + TimeUnit timeUnit) { HasMetadata meta = acceptVisitors(asHasMetadata(get()), visitors); ResourceHandler h = handlerOf(meta); - return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit); + try { + return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java index cd56179a1cf..d4a563bfb02 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java @@ -48,14 +48,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import static io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem; @@ -85,116 +83,64 @@ public class NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImp private final Boolean cascading; @Override - public List waitUntilReady(final long amount, final TimeUnit timeUnit) throws InterruptedException { - List items = acceptVisitors(asHasMetadata(item, true), visitors); - if (items.isEmpty()) { - return Collections.emptyList(); - } - - final List result = new ArrayList<>(); - final List itemsWithConditionNotMatched = new ArrayList<>(items); - final int size = items.size(); - final ExecutorService executor = Executors.newFixedThreadPool(size); - - try { - final CountDownLatch latch = new CountDownLatch(size); - for (final HasMetadata meta : items) { - final ResourceHandler h = handlerOf(meta); - if (!executor.isShutdown()) { - executor.submit(() -> { - try { - result.add(h.waitUntilReady(client, config, meta.getMetadata().getNamespace(), meta, amount, timeUnit)); - } catch (InterruptedException | IllegalArgumentException interruptedException) { - // We may get here if waiting is interrupted or resource doesn't support concept of readiness. - // We don't want to wait for items that will never become ready - // Skip that resource then. - LOGGER.info("{} {} does not support readiness. skipping..", meta.getKind(), meta.getMetadata().getName()); - latch.countDown(); - } catch (IllegalStateException t) { - logAsNotReady(t, meta); - } finally { - // Resource got ready and was returned properly - latch.countDown(); - } - }); - } - } - latch.await(amount, timeUnit); - if (latch.getCount() == 0) { - return result; - } else { - throw new KubernetesClientTimeoutException(itemsWithConditionNotMatched, amount, timeUnit); - } - } finally { - executor.shutdown(); - } + public List waitUntilReady(final long amount, final TimeUnit timeUnit) { + return waitUntilCondition(resource -> Objects.nonNull(resource) && getReadiness().isReady(resource), amount, timeUnit); } @Override public List waitUntilCondition(Predicate condition, long amount, - TimeUnit timeUnit) throws InterruptedException { + TimeUnit timeUnit) { List items = acceptVisitors(asHasMetadata(item, true), visitors); if (items.isEmpty()) { return Collections.emptyList(); } - - final List> futures = new ArrayList<>(items.size()); - for (final HasMetadata meta : items) { - final ResourceHandler h = handlerOf(meta); - futures.add(CompletableFuture.supplyAsync(() -> { - try { - return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit); - } catch (Exception e) { - //consider all errors as not ready. - logAsNotReady(e, meta); - return null; - } - })); - } - + // this strategy is very costly in terms of threads - by not exposing the underlying futures + // we have to create a thread for each item that mostly waits + final ExecutorService executor = Executors.newFixedThreadPool(items.size(), Utils.daemonThreadFactory(this)); try { - // All of the individual futures have the same timeout period, - // but they may not all necessarily start executing together. - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(amount, timeUnit); - } catch (TimeoutException | ExecutionException e) { - // We don't allow individual futures to complete with an exception, - // which means we should never catch ExecutionException here. - LOGGER.debug("Global timeout reached", e); - } - - final List results = new ArrayList<>(); - final List itemsWithConditionNotMatched = new ArrayList<>(); - - // Iterate over the items because we don't know what kind of List it is. - // But the futures use an ArrayList, so accessing by index is efficient. - int i = 0; - for (final HasMetadata meta : items) { - try { - CompletableFuture future = futures.get(i); - HasMetadata result = future.getNow(null); - if (result != null) { - results.add(result); - } else { - // Cancel this future, just in case it never had - // an opportunity to execute in the first place. - future.cancel(true); + final List> futures = new ArrayList<>(items.size()); + for (final HasMetadata meta : items) { + final ResourceHandler h = handlerOf(meta); + futures.add(CompletableFuture.supplyAsync(() -> { + try { + return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }, executor)); + } + + final List results = new ArrayList<>(); + final List itemsWithConditionNotMatched = new ArrayList<>(); + + // Iterate over the items because we don't know what kind of List it is. + // But the futures use an ArrayList, so accessing by index is efficient. + int i = 0; + for (final HasMetadata meta : items) { + try { + CompletableFuture future = futures.get(i); + // just get each result as the timeout is enforced below + results.add(future.get()); + } catch (ExecutionException e) { itemsWithConditionNotMatched.add(meta); + logAsNotReady(e.getCause(), meta); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e); } - } catch (CompletionException e) { - // We should never reach here, because individual futures - // aren't allowed to complete with an exception. - itemsWithConditionNotMatched.add(meta); - logAsNotReady(e.getCause(), meta); + ++i; } - ++i; - } - - if (!itemsWithConditionNotMatched.isEmpty()) { - throw new KubernetesClientTimeoutException(itemsWithConditionNotMatched, amount, timeUnit); + + if (!itemsWithConditionNotMatched.isEmpty()) { + throw new KubernetesClientTimeoutException(itemsWithConditionNotMatched, amount, timeUnit); + } + + return results; + } finally { + executor.shutdownNow(); } - - return results; } private static void logAsNotReady(Throwable t, HasMetadata meta) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java deleted file mode 100644 index ba9b92fc7d2..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.internal; - -import okhttp3.HttpUrl; -import okhttp3.Request; - -class RawRequestBuilder implements AbstractWatchManager.RequestBuilder { - private final HttpUrl.Builder watchUrlBuilder; - - public RawRequestBuilder(HttpUrl.Builder watchUrlBuilder) { - this.watchUrlBuilder = watchUrlBuilder; - } - - @Override - public Request build(String resourceVersion) { - if (resourceVersion != null) { - watchUrlBuilder.removeAllQueryParameters("resourceVersion"); - watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion); - } - HttpUrl watchUrl = watchUrlBuilder.build(); - String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost(); - if (watchUrl.url().getPort() != -1) { - origin += ":" + watchUrl.url().getPort(); - } - - return new Request.Builder() - .get() - .url(watchUrl) - .addHeader("Origin", origin) - .build(); - } -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ServiceOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ServiceOperationsImpl.java index b5250d873fd..b2842ca6dae 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ServiceOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ServiceOperationsImpl.java @@ -58,7 +58,7 @@ public ServiceOperationsImpl newInstance(OperationContext context) { } @Override - public Service waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException { + public Service waitUntilReady(long amount, TimeUnit timeUnit) { long started = System.nanoTime(); super.waitUntilReady(amount, timeUnit); long alreadySpent = System.nanoTime() - started; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java index d1f0bf850c5..aa87aab6757 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java @@ -22,7 +22,7 @@ * * This has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go */ -public interface SharedInformer { +public interface SharedInformer extends AutoCloseable { /** * Add event handler @@ -54,6 +54,11 @@ public interface SharedInformer { * Stops the shared informer. The informer cannot be started again. */ void stop(); + + @Override + default void close() { + stop(); + } /** * Return true if the informer has ever synced diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 1b23c00a31d..abd1fc4ed34 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -140,7 +140,7 @@ public void run() { return; } - log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); + log.debug("informer: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); scheduleResync(processor::shouldResync); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/PodOperationUtil.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/PodOperationUtil.java index e835e1d926c..c6456b862de 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/PodOperationUtil.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/PodOperationUtil.java @@ -81,9 +81,6 @@ public static void waitUntilReadyBeforeFetchingLogs(PodResource podOperatio if (pod != null && pod.getStatus() != null && pod.getStatus().getPhase().equals("Pending")) { podOperation.waitUntilReady(logWaitTimeout, TimeUnit.SECONDS); } - } catch (InterruptedException interruptedException) { - Thread.currentThread().interrupt(); - LOG.debug("Interrupted while waiting for Pod to become Ready: {}", interruptedException.getMessage()); } catch (Exception otherException) { LOG.debug("Error while waiting for Pod to become Ready: {}", otherException.getMessage()); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java deleted file mode 100644 index f3dfb36e16f..00000000000 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.base; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -import java.net.HttpURLConnection; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.function.Predicate; - -import io.fabric8.kubernetes.client.WatcherException; -import org.junit.jupiter.api.Test; - -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapBuilder; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watcher.Action; - -class WaitForConditionWatcherTest { - - private final ConfigMap configMap = new ConfigMapBuilder() - .withNewMetadata() - .withName("test") - .withNamespace("test") - .endMetadata() - .addToData("foo", "bar") - .build(); - - @Test - void itCompletesOnMatchAdded() throws Exception { - TrackingPredicate condition = condition(ss -> true); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.eventReceived(Action.ADDED, configMap); - assertTrue(watcher.getFuture().isDone()); - assertEquals(watcher.getFuture().get(), configMap); - assertTrue(condition.isCalledWith(configMap)); - } - - @Test - void itCompletesOnMatchModified() throws Exception { - TrackingPredicate condition = condition(ss -> true); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.eventReceived(Action.MODIFIED, configMap); - assertTrue(watcher.getFuture().isDone()); - assertEquals(watcher.getFuture().get(), configMap); - assertTrue(condition.isCalledWith(configMap)); - } - - @Test - void itCompletesOnMatchDeleted() throws Exception { - TrackingPredicate condition = condition(Objects::isNull); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.eventReceived(Action.DELETED, configMap); - assertTrue(watcher.getFuture().isDone()); - assertNull(watcher.getFuture().get()); - assertTrue(condition.isCalledWith(null)); - } - - @Test - void itDoesNotCompleteOnNoMatchAdded() { - TrackingPredicate condition = condition(ss -> false); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.eventReceived(Action.ADDED, configMap); - assertFalse(watcher.getFuture().isDone()); - assertTrue(condition.isCalledWith(configMap)); - } - - @Test - void itDoesNotCompleteOnNoMatchModified() { - TrackingPredicate condition = condition(ss -> false); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.eventReceived(Action.MODIFIED, configMap); - assertFalse(watcher.getFuture().isDone()); - assertTrue(condition.isCalledWith(configMap)); - } - - @Test - void itCompletesExceptionallyOnUnexpectedDeletion() throws Exception { - TrackingPredicate condition = condition(Objects::nonNull); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.eventReceived(Action.DELETED, configMap); - assertTrue(watcher.getFuture().isDone()); - try { - watcher.getFuture().get(); - fail("should have thrown exception"); - } catch (ExecutionException e) { - assertEquals(WatcherException.class, e.getCause().getClass()); - assertEquals("Unexpected deletion of watched resource, will never satisfy condition", e.getCause().getMessage()); - } - assertTrue(condition.isCalledWith(null)); - } - - @Test - void itCompletesExceptionallyOnError() throws Exception { - TrackingPredicate condition = condition(ss -> true); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.eventReceived(Action.ERROR, configMap); - assertTrue(watcher.getFuture().isDone()); - try { - watcher.getFuture().get(); - fail("should have thrown exception"); - } catch (ExecutionException e) { - assertEquals(WatcherException.class, e.getCause().getClass()); - assertEquals("Action.ERROR received", e.getCause().getMessage()); - } - assertFalse(condition.isCalled()); - } - - @Test - void itCompletesExceptionallyWithRetryOnCloseNonGone() throws Exception { - TrackingPredicate condition = condition(ss -> true); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.onClose(new WatcherException("Watcher closed", new KubernetesClientException("test", 500, null))); - assertTrue(watcher.getFuture().isDone()); - try { - watcher.getFuture().get(); - fail("should have thrown exception"); - } catch (ExecutionException e) { - assertEquals(WatcherException.class, e.getCause().getClass()); - assertEquals("Watcher closed", e.getCause().getMessage()); - assertTrue(((WatcherException) e.getCause()).isShouldRetry()); - } - assertFalse(condition.isCalled()); - } - - @Test - void itCompletesExceptionallyWithNoRetryOnCloseGone() throws Exception { - TrackingPredicate condition = condition(ss -> true); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.onClose(new WatcherException("Watcher closed", new KubernetesClientException("test", HttpURLConnection.HTTP_GONE, null))); - assertTrue(watcher.getFuture().isDone()); - try { - watcher.getFuture().get(); - fail("should have thrown exception"); - } catch (ExecutionException e) { - assertEquals(WatcherException.class, e.getCause().getClass()); - assertEquals("Watcher closed", e.getCause().getMessage()); - assertFalse(((WatcherException) e.getCause()).isShouldRetry()); - } - assertFalse(condition.isCalled()); - } - - @Test - void itCompletesExceptionallyWithRetryOnGracefulClose() throws Exception { - TrackingPredicate condition = condition(ss -> true); - WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.onClose(); - assertTrue(watcher.getFuture().isDone()); - try { - watcher.getFuture().get(); - fail("should have thrown exception"); - } catch (ExecutionException e) { - assertEquals(WatcherException.class, e.getCause().getClass()); - assertEquals("Watcher closed", e.getCause().getMessage()); - assertTrue(((WatcherException) e.getCause()).isShouldRetry()); - } - assertFalse(condition.isCalled()); - } - private TrackingPredicate condition(Predicate condition) { - return new TrackingPredicate(condition); - } - - private static class TrackingPredicate implements Predicate { - - private final Predicate delegate; - - private boolean called; - private ConfigMap calledWith; - - public TrackingPredicate(Predicate delegate) { - this.delegate = delegate; - this.called = false; - this.calledWith = null; - } - - @Override - public boolean test(ConfigMap configMap) { - called = true; - calledWith = configMap; - return delegate.test(configMap); - } - - public boolean isCalled() { - return called; - } - - public boolean isCalledWith(ConfigMap configMap) { - return called && Objects.equals(configMap, calledWith); - } - } - - private ConfigMap configMap() { - return new ConfigMapBuilder() - .withNewMetadata() - .withName("test") - .withNamespace("test") - .endMetadata() - .addToData("foo", "bar") - .build(); - } -} diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/JobExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/JobExample.java index c0babe4496e..967a07899cf 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/JobExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/JobExample.java @@ -18,7 +18,10 @@ import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; -import io.fabric8.kubernetes.client.*; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,9 +77,6 @@ public static void main(String[] args) { } catch (KubernetesClientException e) { logger.error("Unable to create job", e); - } catch (InterruptedException interruptedException) { - logger.warn("Interrupted while waiting for the job to be ready: {}", interruptedException.getMessage()); - Thread.currentThread().interrupt(); } } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java index 7456c43f1f5..57217d91731 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java @@ -169,13 +169,13 @@ void testGenericWithKnownType() throws InterruptedException { .withResourceVersion("1").endMetadata().build(); server.expect() - .withPath("/api/v1/namespaces/test/pods?watch=false&fieldSelector=metadata.name%3Dpod1") + .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=false") .andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().withItems(pod1).build()) .once(); server.expect() - .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1%2Cmetadata.name%3Dpod1&resourceVersion=1&watch=true") + .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true") .andUpgradeToWebSocket() .open() .waitFor(EVENT_WAIT_PERIOD_MS) diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java index 4e59db573c9..f9c9fa7dc7a 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java @@ -414,14 +414,7 @@ void testWait() throws InterruptedException { .endStatus() .build(); - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, notReady).once(); - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, ready).once(); - - server.expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder() - .withNewMetadata() - .withResourceVersion("1") - .endMetadata() - .withItems(notReady).build()).once(); + server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=false").andReturn(200, notReady).once(); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java index c97c0ef8f0d..76845069877 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java @@ -185,8 +185,8 @@ void testSuccessfulWaitUntilCondition() throws InterruptedException { .anyMatch(c -> "True".equals(c.getStatus())); // The pods are never ready if you request them directly. - server.expect().get().withPath("/api/v1/namespaces/ns1/pods/pod1").andReturn(HTTP_OK, noReady1).once(); - server.expect().get().withPath("/api/v1/namespaces/ns1/pods/pod2").andReturn(HTTP_OK, noReady2).once(); + ResourceTest.list(server, noReady1); + ResourceTest.list(server, noReady2); server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() @@ -226,8 +226,8 @@ void testPartialSuccessfulWaitUntilCondition() { .anyMatch(c -> "True".equals(c.getStatus())); // The pods are never ready if you request them directly. - server.expect().get().withPath("/api/v1/namespaces/ns1/pods/pod1").andReturn(HTTP_OK, noReady1).once(); - server.expect().get().withPath("/api/v1/namespaces/ns1/pods/pod2").andReturn(HTTP_OK, noReady2).once(); + ResourceTest.list(server, noReady1); + ResourceTest.list(server, noReady2); Status gone = new StatusBuilder() .withCode(HTTP_GONE) diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java index 2bc029a5ff4..6a6acf1862a 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.StatusBuilder; import io.fabric8.kubernetes.api.model.WatchEvent; @@ -38,7 +39,6 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.fabric8.kubernetes.client.utils.Serialization; import okhttp3.mockwebserver.RecordedRequest; -import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -47,13 +47,11 @@ import java.net.HttpURLConnection; import java.util.Objects; import java.util.concurrent.CountDownLatch; -import java.util.function.Predicate; import static java.net.HttpURLConnection.HTTP_GONE; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assertions.tuple; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -85,7 +83,7 @@ void testCreateOrReplaceWhenCreateFails() { // Given Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(HttpURLConnection.HTTP_BAD_REQUEST, pod1).once(); - + NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable podOperation = client.resource(pod1); // When @@ -225,7 +223,7 @@ void testWaitUntilReady() throws InterruptedException { Pod noReady = createReadyFrom(pod1, "False"); Pod ready = createReadyFrom(pod1, "True"); - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).once(); + list(noReady); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() @@ -233,11 +231,24 @@ void testWaitUntilReady() throws InterruptedException { .done() .always(); - + Pod p = client.resource(noReady).waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); } + private void list(Pod pod) { + list(server, pod); + } + + static void list(KubernetesMockServer server, Pod pod) { + server.expect() + .get() + .withPath("/api/v1/namespaces/"+pod.getMetadata().getNamespace()+"/pods?fieldSelector=metadata.name%3D"+pod.getMetadata().getName()+"&watch=false") + .andReturn(200, + new PodListBuilder().withItems(pod).withNewMetadata().withResourceVersion("1").endMetadata().build()) + .once(); + } + @Test void testWaitUntilExistsThenReady() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() @@ -248,19 +259,16 @@ void testWaitUntilExistsThenReady() throws InterruptedException { Pod noReady = createReadyFrom(pod1, "False"); Pod ready = createReadyFrom(pod1, "True"); - // so that "waitUntilExists" actually has some waiting to do - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(404, "").times(2); - // once so that "waitUntilExists" successfully ends // and again so that "periodicWatchUntilReady" successfully begins - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).times(2); + server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=false").andReturn(200, noReady).times(2); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=true").andUpgradeToWebSocket() + server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() .waitFor(100).andEmit(new WatchEvent(ready, "MODIFIED")) .done() .always(); - + Pod p = client.pods().withName("pod1").waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); } @@ -300,7 +308,7 @@ void testWaitUntilCondition() throws InterruptedException { .build(); // at first the pod is non-ready - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).times(2); + list(noReady); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() @@ -310,7 +318,7 @@ void testWaitUntilCondition() throws InterruptedException { .done() .always(); - + Pod p = client.pods().withName("pod1").waitUntilCondition( r -> r.getStatus().getConditions() .stream() @@ -324,63 +332,7 @@ void testWaitUntilCondition() throws InterruptedException { } @Test - void testWaitUntilConditionWhenResourceVersionTooOld() throws InterruptedException { - Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); - - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); - - // The pod is never ready if you request it directly. - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).always(); - - Status gone = new StatusBuilder() - .withCode(HTTP_GONE) - .build(); - - // Watches with the pod's own resource version fail with 410 "GONE". - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true") - .andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(gone, "ERROR")) - .done() - .once(); - - // Watches with a later resource version will return the "ready" pod. - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=2&watch=true") - .andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(ready, "MODIFIED")) - .done() - .once(); - - Predicate isReady = p -> p.getStatus().getConditions().stream() - .anyMatch(c -> "True".equals(c.getStatus())); - - final PodResource ops = client.pods().withName("pod1"); - KubernetesClientException ex = assertThrows(KubernetesClientException.class, () -> - ops.waitUntilCondition(isReady, 4, SECONDS) - ); - assertThat(ex) - .hasCauseExactlyInstanceOf(WatcherException.class) - .extracting(Throwable::getCause) - .asInstanceOf(InstanceOfAssertFactories.type(WatcherException.class)) - .extracting(WatcherException::isHttpGone) - .isEqualTo(true); - - Pod pod = client.pods() - .withName("pod1") - .withResourceVersion("2") - .waitUntilCondition(isReady, 4, SECONDS); - assertThat(pod.getMetadata().getName()).isEqualTo("pod1"); - assertThat(pod.getMetadata().getResourceVersion()).isEqualTo("1"); - assertTrue(isReady.test(pod)); - } - - @Test - void testRetryOnErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { + void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() .withName("pod1") .withResourceVersion("1") @@ -394,17 +346,15 @@ void testRetryOnErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedExc .build(); // once not ready, to begin watch - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).once(); - // once ready, after watch fails - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, ready).once(); + list(noReady); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() .waitFor(500).andEmit(new WatchEvent(status, "ERROR")) + .waitFor(500).andEmit(new WatchEvent(ready, "MODIFIED")) .done() .once(); - Pod p = client.resource(noReady).waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); } @@ -424,7 +374,7 @@ void testRetryOnErrorEventDuringWait() throws InterruptedException { .build(); // once not ready, to begin watch - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).times(2); + list(noReady); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() @@ -438,7 +388,7 @@ void testRetryOnErrorEventDuringWait() throws InterruptedException { .done() .once(); - + Pod p = client.resource(noReady).waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); } @@ -454,32 +404,28 @@ void testSkipWatchIfAlreadyMatchingCondition() throws InterruptedException { Pod ready = createReadyFrom(pod1, "True"); // once not ready, to begin watch - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, ready).once(); + list(ready); - Pod p = client.resource(noReady).waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); } @Test - void testDontRetryWatchOnHttpGone() throws InterruptedException { - Pod noReady = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().withNewStatus() - .addNewCondition() - .withType("Ready") - .withStatus("False") - .endCondition() - .endStatus() - .build(); + void testRetryWatchOnHttpGone() throws InterruptedException { + Pod pod1 = new PodBuilder().withNewMetadata() + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test").and().build(); + + Pod noReady = createReadyFrom(pod1, "False"); + Pod ready = createReadyFrom(pod1, "True"); Status status = new StatusBuilder() .withCode(HTTP_GONE) .build(); // once not ready, to begin watch - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).once(); + list(noReady); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() @@ -487,13 +433,9 @@ void testDontRetryWatchOnHttpGone() throws InterruptedException { .done() .once(); - - try { - client.resource(noReady).waitUntilReady(5, SECONDS); - fail("should have thrown KubernetesClientException"); - } catch (KubernetesClientException e) { - assertTrue(e.getCause() instanceof WatcherException); - } + list(ready); + + client.resource(noReady).waitUntilReady(5, SECONDS); } @Test @@ -509,7 +451,7 @@ void testWaitOnConditionDeleted() throws InterruptedException { .endStatus() .build(); - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, ready).once(); + list(ready); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() @@ -517,7 +459,7 @@ void testWaitOnConditionDeleted() throws InterruptedException { .done() .once(); - + Pod p = client.pods().withName("pod1").waitUntilCondition(Objects::isNull,8, SECONDS); assertNull(p); } @@ -532,9 +474,7 @@ void testCreateAndWaitUntilReady() throws InterruptedException { Pod noReady = createReadyFrom(pod1, "False"); Pod ready = createReadyFrom(pod1, "True"); - // once so that "waitUntilExists" successfully ends - // and again so that "periodicWatchUntilReady" successfully begins - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, noReady).times(2); + list(noReady); server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(201, noReady).once(); server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() @@ -543,7 +483,7 @@ void testCreateAndWaitUntilReady() throws InterruptedException { .done() .always(); - + Pod p = client.resource(noReady).createOrReplaceAnd().waitUntilReady(10, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); } @@ -559,7 +499,7 @@ void testFromServerGet() { server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, pod).once(); - + HasMetadata response = client.resource(pod).fromServer().get(); assertEquals(pod, response); } @@ -580,8 +520,8 @@ void testFromServerWaitUntilConditionAlwaysGetsResourceFromServer() throws Excep .addToLabels("CONDITION", "MET") .endMetadata() .build(); - server.expect().get().withPath("/api/v1/namespaces/test/pods/pod").andReturn(200, conditionNotMetPod).once(); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod&watch=true") + list(conditionNotMetPod); + server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod&resourceVersion=1&watch=true") .andUpgradeToWebSocket().open() .immediately().andEmit(new WatchEvent(conditionNotMetPod, "MODIFIED")) .waitFor(10).andEmit(new WatchEvent(conditionMetPod, "MODIFIED")) @@ -596,6 +536,19 @@ void testFromServerWaitUntilConditionAlwaysGetsResourceFromServer() throws Excep assertEquals(2, server.getRequestCount()); } + @Test + void testWaitNullDoesntExist() throws InterruptedException { + server.expect() + .get() + .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=false") + .andReturn(200, + new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build()) + .once(); + + Pod p = client.pods().withName("pod1").waitUntilCondition(Objects::isNull, 1, SECONDS); + assertNull(p); + } + private static Pod createReadyFrom(Pod pod, String status) { return new PodBuilder(pod) .withNewStatus() diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ServiceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ServiceTest.java index 2e91e920bb7..f9051b68ba7 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ServiceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ServiceTest.java @@ -18,10 +18,13 @@ import io.fabric8.kubernetes.api.model.EndpointAddressBuilder; import io.fabric8.kubernetes.api.model.EndpointPortBuilder; import io.fabric8.kubernetes.api.model.EndpointSubsetBuilder; +import io.fabric8.kubernetes.api.model.Endpoints; import io.fabric8.kubernetes.api.model.EndpointsBuilder; +import io.fabric8.kubernetes.api.model.EndpointsListBuilder; import io.fabric8.kubernetes.api.model.IntOrString; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.api.model.ServiceListBuilder; import io.fabric8.kubernetes.api.model.extensions.IngressListBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; @@ -184,18 +187,22 @@ void testGetUrlFromClusterIPService() { void testWaitUntilReady() throws InterruptedException { // Given Service svc1 = new ServiceBuilder().withNewMetadata().withName("svc1").endMetadata().build(); - server.expect().get().withPath("/api/v1/namespaces/ns1/endpoints/svc1") - .andReturn(HttpURLConnection.HTTP_OK, new EndpointsBuilder() - .withNewMetadata().withName("svc1").endMetadata() - .addToSubsets(new EndpointSubsetBuilder() - .addToAddresses(new EndpointAddressBuilder().withIp("192.168.64.13").build()) - .addToPorts(new EndpointPortBuilder().withPort(8443).build()) - .build()) + Endpoints endpoint = new EndpointsBuilder() + .withNewMetadata().withName("svc1").endMetadata() + .addToSubsets(new EndpointSubsetBuilder() + .addToAddresses(new EndpointAddressBuilder().withIp("192.168.64.13").build()) + .addToPorts(new EndpointPortBuilder().withPort(8443).build()) .build()) + .build(); + server.expect().get().withPath("/api/v1/namespaces/ns1/endpoints?fieldSelector=metadata.name%3Dsvc1&watch=false") + .andReturn(HttpURLConnection.HTTP_OK, new EndpointsListBuilder().withItems(endpoint).withNewMetadata().withResourceVersion("1").endMetadata().build()) + .once(); + server.expect().get().withPath("/api/v1/namespaces/ns1/services?fieldSelector=metadata.name%3Dsvc1&watch=false") + .andReturn(HttpURLConnection.HTTP_OK, new ServiceListBuilder().withItems(svc1).withNewMetadata().withResourceVersion("1").endMetadata().build()) .once(); server.expect().get().withPath("/api/v1/namespaces/ns1/services/svc1") .andReturn(HttpURLConnection.HTTP_OK, svc1) - .times(2); + .once(); // When Service service = client.services() diff --git a/kubernetes-tests/src/test/java/io/fabric8/openshift/client/server/mock/DeploymentConfigTest.java b/kubernetes-tests/src/test/java/io/fabric8/openshift/client/server/mock/DeploymentConfigTest.java index 4748f75d187..8a7a50801c0 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/openshift/client/server/mock/DeploymentConfigTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/openshift/client/server/mock/DeploymentConfigTest.java @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model.APIGroupListBuilder; import io.fabric8.kubernetes.api.model.ContainerFluent; import io.fabric8.kubernetes.api.model.DeletionPropagation; +import io.fabric8.kubernetes.api.model.ListMeta; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.client.dsl.LogWatch; @@ -30,7 +31,6 @@ import io.fabric8.openshift.api.model.DeploymentConfigListBuilder; import io.fabric8.openshift.api.model.DeploymentConfigSpecFluent; import io.fabric8.openshift.client.OpenShiftClient; -import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; @@ -299,8 +299,8 @@ void testWaitUntilReady() throws InterruptedException { .endCondition() .withReplicas(1).withAvailableReplicas(1) .endStatus().build(); - server.expect().get().withPath("/apis/apps.openshift.io/v1/namespaces/ns1/deploymentconfigs/dc1") - .andReturn(HttpURLConnection.HTTP_OK, deploymentConfig) + server.expect().get().withPath("/apis/apps.openshift.io/v1/namespaces/ns1/deploymentconfigs?fieldSelector=metadata.name%3Ddc1&watch=false") + .andReturn(HttpURLConnection.HTTP_OK, new DeploymentConfigListBuilder().withItems(deploymentConfig).withMetadata(new ListMeta()).build()) .always(); // When