|
16 | 16 | package io.fabric8.kubernetes.client.dsl.base;
|
17 | 17 |
|
18 | 18 | import io.fabric8.kubernetes.api.model.ObjectReference;
|
19 |
| -import io.fabric8.kubernetes.client.WatcherException; |
20 | 19 | import io.fabric8.kubernetes.client.dsl.WritableOperation;
|
21 | 20 | import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
|
22 | 21 | import org.slf4j.Logger;
|
|
38 | 37 | import io.fabric8.kubernetes.client.Config;
|
39 | 38 | import io.fabric8.kubernetes.client.ConfigBuilder;
|
40 | 39 | import io.fabric8.kubernetes.client.KubernetesClientException;
|
| 40 | +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; |
41 | 41 | import io.fabric8.kubernetes.client.OperationInfo;
|
42 | 42 | import io.fabric8.kubernetes.client.ResourceNotFoundException;
|
43 | 43 | import io.fabric8.kubernetes.client.Watch;
|
|
59 | 59 | import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
|
60 | 60 | import io.fabric8.kubernetes.client.internal.readiness.Readiness;
|
61 | 61 | import io.fabric8.kubernetes.client.utils.HttpClientUtils;
|
62 |
| -import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; |
63 | 62 | import io.fabric8.kubernetes.client.utils.URLUtils;
|
64 | 63 | import io.fabric8.kubernetes.client.utils.Utils;
|
65 | 64 | import io.fabric8.kubernetes.client.utils.WatcherToggle;
|
|
73 | 72 | import java.net.MalformedURLException;
|
74 | 73 | import java.net.URL;
|
75 | 74 | import java.util.Arrays;
|
76 |
| -import java.util.Collections; |
77 | 75 | import java.util.Iterator;
|
78 | 76 | import java.util.List;
|
79 | 77 | import java.util.Map;
|
80 | 78 | import java.util.Objects;
|
| 79 | +import java.util.concurrent.CompletableFuture; |
81 | 80 | import java.util.concurrent.ExecutionException;
|
82 | 81 | import java.util.concurrent.TimeUnit;
|
83 | 82 | import java.util.concurrent.TimeoutException;
|
|
89 | 88 | import okhttp3.HttpUrl;
|
90 | 89 | import okhttp3.Request;
|
91 | 90 |
|
92 |
| -import static java.util.concurrent.TimeUnit.NANOSECONDS; |
93 |
| - |
94 | 91 | public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceList<T>, R extends Resource<T>>
|
95 | 92 | extends OperationSupport
|
96 | 93 | implements
|
@@ -1123,34 +1120,53 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedExcept
|
1123 | 1120 | @Override
|
1124 | 1121 | public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit)
|
1125 | 1122 | throws InterruptedException {
|
| 1123 | + |
| 1124 | + CompletableFuture<T> future = new CompletableFuture<>(); |
| 1125 | + // tests the condition, trapping any exceptions |
| 1126 | + Consumer<T> tester = obj -> { |
| 1127 | + try { |
| 1128 | + if (condition.test(obj)) { |
| 1129 | + future.complete(obj); |
| 1130 | + } |
| 1131 | + } catch (Exception e) { |
| 1132 | + future.completeExceptionally(e); |
| 1133 | + } |
| 1134 | + }; |
| 1135 | + // start an informer that supplies the tester with events and empty list handling |
| 1136 | + try (SharedIndexInformer<T> informer = this.createInformer(0, l -> { |
| 1137 | + if (l.getItems().isEmpty()) { |
| 1138 | + tester.accept(null); |
| 1139 | + } |
| 1140 | + }, new ResourceEventHandler<T>() { |
| 1141 | + |
| 1142 | + @Override |
| 1143 | + public void onAdd(T obj) { |
| 1144 | + tester.accept(obj); |
| 1145 | + } |
1126 | 1146 |
|
1127 |
| - long remainingNanosToWait = timeUnit.toNanos(amount); |
1128 |
| - while (remainingNanosToWait > 0) { |
1129 |
| - |
1130 |
| - T item = fromServer().get(); |
1131 |
| - if (condition.test(item)) { |
1132 |
| - return item; |
| 1147 | + @Override |
| 1148 | + public void onUpdate(T oldObj, T newObj) { |
| 1149 | + tester.accept(newObj); |
1133 | 1150 | }
|
1134 | 1151 |
|
1135 |
| - final WaitForConditionWatcher<T> watcher = new WaitForConditionWatcher<>(condition); |
1136 |
| - final long startTime = System.nanoTime(); |
1137 |
| - try (Watch ignored = watch(KubernetesResourceUtil.getResourceVersion(item), watcher)) { |
1138 |
| - return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS); |
1139 |
| - } catch (ExecutionException e) { |
1140 |
| - Throwable cause = e.getCause(); |
1141 |
| - if (cause instanceof WatcherException && ((WatcherException)cause).isHttpGone()) { |
1142 |
| - LOG.debug("Restarting the watch due to http gone"); |
1143 |
| - remainingNanosToWait -= (System.nanoTime() - startTime); |
1144 |
| - continue; |
1145 |
| - } |
1146 |
| - throw KubernetesClientException.launderThrowable(cause); |
1147 |
| - } catch (TimeoutException e) { |
1148 |
| - break; |
| 1152 | + @Override |
| 1153 | + public void onDelete(T obj, boolean deletedFinalStateUnknown) { |
| 1154 | + tester.accept(null); |
1149 | 1155 | }
|
| 1156 | + })) { |
| 1157 | + // prevent unnecessary watches |
| 1158 | + future.whenComplete((r,t) -> informer.stop()); |
| 1159 | + informer.run(); |
| 1160 | + return future.get(amount, timeUnit); |
| 1161 | + } catch (ExecutionException e) { |
| 1162 | + throw KubernetesClientException.launderThrowable(e.getCause()); |
| 1163 | + } catch (TimeoutException e) { |
| 1164 | + T item = getItem(); |
| 1165 | + if (item != null) { |
| 1166 | + throw new KubernetesClientTimeoutException(item, amount, timeUnit); |
| 1167 | + } |
| 1168 | + throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit); |
1150 | 1169 | }
|
1151 |
| - |
1152 |
| - LOG.debug("ran out of time waiting for watcher, wait condition not met"); |
1153 |
| - throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!"); |
1154 | 1170 | }
|
1155 | 1171 |
|
1156 | 1172 | public void setType(Class<T> type) {
|
@@ -1179,30 +1195,41 @@ public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexer
|
1179 | 1195 |
|
1180 | 1196 | @Override
|
1181 | 1197 | public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync) {
|
1182 |
| - // convert the name into something listable |
1183 |
| - FilterWatchListDeletable<T, L> baseOperation = |
1184 |
| - getName() == null ? this : withFields(Collections.singletonMap("metadata.name", getName())); |
| 1198 | + DefaultSharedIndexInformer<T, L> result = createInformer(resync, null, handler); |
| 1199 | + result.run(); |
| 1200 | + return result; |
| 1201 | + } |
1185 | 1202 |
|
| 1203 | + private DefaultSharedIndexInformer<T, L> createInformer(long resync, Consumer<L> onList, ResourceEventHandler<T> handler) { |
| 1204 | + T item = getItem(); |
| 1205 | + String name = (Utils.isNotNullOrEmpty(getName()) || item != null) ? checkName(item) : null; |
| 1206 | + |
1186 | 1207 | // use the local context / namespace
|
1187 | 1208 | DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), new ListerWatcher<T, L>() {
|
1188 | 1209 | @Override
|
1189 | 1210 | public L list(ListOptions params, String namespace, OperationContext context) {
|
1190 |
| - return baseOperation.list(params); |
| 1211 | + // convert the name into something listable |
| 1212 | + if (name != null) { |
| 1213 | + params.setFieldSelector("metadata.name="+name); |
| 1214 | + } |
| 1215 | + L result = BaseOperation.this.list(params); |
| 1216 | + if (onList != null) { |
| 1217 | + onList.accept(result); |
| 1218 | + } |
| 1219 | + return result; |
1191 | 1220 | }
|
1192 | 1221 |
|
1193 | 1222 | @Override
|
1194 | 1223 | public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> watcher) {
|
1195 |
| - return baseOperation.watch(params, watcher); |
| 1224 | + return BaseOperation.this.watch(params, watcher); |
1196 | 1225 | }
|
1197 | 1226 | }, resync, context, Runnable::run); // just run the event notification in the websocket thread
|
1198 |
| - if (handler != null) { |
1199 |
| - informer.addEventHandler(handler); |
1200 |
| - } |
1201 | 1227 | if (indexers != null) {
|
1202 | 1228 | informer.addIndexers(indexers);
|
1203 | 1229 | }
|
1204 |
| - // synchronous start list/watch must succeed in the calling thread |
1205 |
| - informer.run(); |
| 1230 | + if (handler != null) { |
| 1231 | + informer.addEventHandler(handler); |
| 1232 | + } |
1206 | 1233 | return informer;
|
1207 | 1234 | }
|
1208 | 1235 | }
|
|
0 commit comments