|
16 | 16 | package io.fabric8.kubernetes.client.dsl.base;
|
17 | 17 |
|
18 | 18 | import io.fabric8.kubernetes.api.model.ObjectReference;
|
19 |
| -import io.fabric8.kubernetes.client.WatcherException; |
20 | 19 | import io.fabric8.kubernetes.client.dsl.WritableOperation;
|
21 | 20 | import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
|
22 | 21 | import org.slf4j.Logger;
|
|
38 | 37 | import io.fabric8.kubernetes.client.Config;
|
39 | 38 | import io.fabric8.kubernetes.client.ConfigBuilder;
|
40 | 39 | import io.fabric8.kubernetes.client.KubernetesClientException;
|
| 40 | +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; |
41 | 41 | import io.fabric8.kubernetes.client.OperationInfo;
|
42 | 42 | import io.fabric8.kubernetes.client.ResourceNotFoundException;
|
43 | 43 | import io.fabric8.kubernetes.client.Watch;
|
|
59 | 59 | import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
|
60 | 60 | import io.fabric8.kubernetes.client.internal.readiness.Readiness;
|
61 | 61 | import io.fabric8.kubernetes.client.utils.HttpClientUtils;
|
62 |
| -import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; |
63 | 62 | import io.fabric8.kubernetes.client.utils.URLUtils;
|
64 | 63 | import io.fabric8.kubernetes.client.utils.Utils;
|
65 | 64 | import io.fabric8.kubernetes.client.utils.WatcherToggle;
|
|
73 | 72 | import java.net.MalformedURLException;
|
74 | 73 | import java.net.URL;
|
75 | 74 | import java.util.Arrays;
|
76 |
| -import java.util.Collections; |
77 | 75 | import java.util.Iterator;
|
78 | 76 | import java.util.List;
|
79 | 77 | import java.util.Map;
|
80 | 78 | import java.util.Objects;
|
| 79 | +import java.util.concurrent.CompletableFuture; |
81 | 80 | import java.util.concurrent.ExecutionException;
|
82 | 81 | import java.util.concurrent.TimeUnit;
|
83 | 82 | import java.util.concurrent.TimeoutException;
|
|
89 | 88 | import okhttp3.HttpUrl;
|
90 | 89 | import okhttp3.Request;
|
91 | 90 |
|
92 |
| -import static java.util.concurrent.TimeUnit.NANOSECONDS; |
93 |
| - |
94 | 91 | public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceList<T>, R extends Resource<T>>
|
95 | 92 | extends OperationSupport
|
96 | 93 | implements
|
@@ -1123,34 +1120,49 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedExcept
|
1123 | 1120 | @Override
|
1124 | 1121 | public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit)
|
1125 | 1122 | throws InterruptedException {
|
| 1123 | + |
| 1124 | + CompletableFuture<T> future = new CompletableFuture<>(); |
| 1125 | + |
| 1126 | + try (SharedIndexInformer<T> informer = this.createInformer(0)) { |
| 1127 | + informer.addEventHandler(new ResourceEventHandler<T>() { |
| 1128 | + |
| 1129 | + void test(T obj) { |
| 1130 | + try { |
| 1131 | + if (condition.test(obj)) { |
| 1132 | + future.complete(obj); |
| 1133 | + informer.stop(); // immediately stop the watch |
| 1134 | + } |
| 1135 | + } catch (Exception e) { |
| 1136 | + future.completeExceptionally(e); |
| 1137 | + } |
| 1138 | + } |
| 1139 | + |
| 1140 | + @Override |
| 1141 | + public void onAdd(T obj) { |
| 1142 | + test(obj); |
| 1143 | + } |
1126 | 1144 |
|
1127 |
| - long remainingNanosToWait = timeUnit.toNanos(amount); |
1128 |
| - while (remainingNanosToWait > 0) { |
1129 |
| - |
1130 |
| - T item = fromServer().get(); |
1131 |
| - if (condition.test(item)) { |
1132 |
| - return item; |
1133 |
| - } |
| 1145 | + @Override |
| 1146 | + public void onUpdate(T oldObj, T newObj) { |
| 1147 | + test(newObj); |
| 1148 | + } |
1134 | 1149 |
|
1135 |
| - final WaitForConditionWatcher<T> watcher = new WaitForConditionWatcher<>(condition); |
1136 |
| - final long startTime = System.nanoTime(); |
1137 |
| - try (Watch ignored = watch(KubernetesResourceUtil.getResourceVersion(item), watcher)) { |
1138 |
| - return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS); |
1139 |
| - } catch (ExecutionException e) { |
1140 |
| - Throwable cause = e.getCause(); |
1141 |
| - if (cause instanceof WatcherException && ((WatcherException)cause).isHttpGone()) { |
1142 |
| - LOG.debug("Restarting the watch due to http gone"); |
1143 |
| - remainingNanosToWait -= (System.nanoTime() - startTime); |
1144 |
| - continue; |
| 1150 | + @Override |
| 1151 | + public void onDelete(T obj, boolean deletedFinalStateUnknown) { |
| 1152 | + test(null); |
1145 | 1153 | }
|
1146 |
| - throw KubernetesClientException.launderThrowable(cause); |
1147 |
| - } catch (TimeoutException e) { |
1148 |
| - break; |
| 1154 | + }); |
| 1155 | + informer.run(); |
| 1156 | + return future.get(amount, timeUnit); |
| 1157 | + } catch (ExecutionException e) { |
| 1158 | + throw KubernetesClientException.launderThrowable(e.getCause()); |
| 1159 | + } catch (TimeoutException e) { |
| 1160 | + T item = getItem(); |
| 1161 | + if (item != null) { |
| 1162 | + throw new KubernetesClientTimeoutException(item, amount, timeUnit); |
1149 | 1163 | }
|
| 1164 | + throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit); |
1150 | 1165 | }
|
1151 |
| - |
1152 |
| - LOG.debug("ran out of time waiting for watcher, wait condition not met"); |
1153 |
| - throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!"); |
1154 | 1166 | }
|
1155 | 1167 |
|
1156 | 1168 | public void setType(Class<T> type) {
|
@@ -1179,30 +1191,38 @@ public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexer
|
1179 | 1191 |
|
1180 | 1192 | @Override
|
1181 | 1193 | public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync) {
|
1182 |
| - // convert the name into something listable |
1183 |
| - FilterWatchListDeletable<T, L> baseOperation = |
1184 |
| - getName() == null ? this : withFields(Collections.singletonMap("metadata.name", getName())); |
| 1194 | + DefaultSharedIndexInformer<T, L> informer = createInformer(resync); |
| 1195 | + if (handler != null) { |
| 1196 | + informer.addEventHandler(handler); |
| 1197 | + } |
| 1198 | + // synchronous start list/watch must succeed in the calling thread |
| 1199 | + informer.run(); |
| 1200 | + return informer; |
| 1201 | + } |
1185 | 1202 |
|
| 1203 | + private DefaultSharedIndexInformer<T, L> createInformer(long resync) { |
| 1204 | + T item = getItem(); |
| 1205 | + String name = (Utils.isNotNullOrEmpty(getName()) || item != null) ? checkName(item) : null; |
| 1206 | + |
1186 | 1207 | // use the local context / namespace
|
1187 | 1208 | DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), new ListerWatcher<T, L>() {
|
1188 | 1209 | @Override
|
1189 | 1210 | public L list(ListOptions params, String namespace, OperationContext context) {
|
1190 |
| - return baseOperation.list(params); |
| 1211 | + // convert the name into something listable |
| 1212 | + if (name != null) { |
| 1213 | + params.setFieldSelector("metadata.name="+name); |
| 1214 | + } |
| 1215 | + return BaseOperation.this.list(params); |
1191 | 1216 | }
|
1192 | 1217 |
|
1193 | 1218 | @Override
|
1194 | 1219 | public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> watcher) {
|
1195 |
| - return baseOperation.watch(params, watcher); |
| 1220 | + return BaseOperation.this.watch(params, watcher); |
1196 | 1221 | }
|
1197 | 1222 | }, resync, context, Runnable::run); // just run the event notification in the websocket thread
|
1198 |
| - if (handler != null) { |
1199 |
| - informer.addEventHandler(handler); |
1200 |
| - } |
1201 | 1223 | if (indexers != null) {
|
1202 | 1224 | informer.addIndexers(indexers);
|
1203 | 1225 | }
|
1204 |
| - // synchronous start list/watch must succeed in the calling thread |
1205 |
| - informer.run(); |
1206 | 1226 | return informer;
|
1207 | 1227 | }
|
1208 | 1228 | }
|
|
0 commit comments