59
59
import io .fabric8 .kubernetes .client .informers .impl .DefaultSharedIndexInformer ;
60
60
import io .fabric8 .kubernetes .client .internal .readiness .Readiness ;
61
61
import io .fabric8 .kubernetes .client .utils .HttpClientUtils ;
62
+ import io .fabric8 .kubernetes .client .utils .KubernetesResourceUtil ;
62
63
import io .fabric8 .kubernetes .client .utils .URLUtils ;
63
64
import io .fabric8 .kubernetes .client .utils .Utils ;
64
65
import io .fabric8 .kubernetes .client .utils .WatcherToggle ;
@@ -107,8 +108,6 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
107
108
private final boolean reloadingFromServer ;
108
109
private final long gracePeriodSeconds ;
109
110
private final DeletionPropagation propagationPolicy ;
110
- private final long watchRetryInitialBackoffMillis ;
111
- private final double watchRetryBackoffMultiplier ;
112
111
113
112
protected String apiVersion ;
114
113
@@ -124,8 +123,6 @@ protected BaseOperation(OperationContext ctx) {
124
123
this .resourceVersion = ctx .getResourceVersion ();
125
124
this .gracePeriodSeconds = ctx .getGracePeriodSeconds ();
126
125
this .propagationPolicy = ctx .getPropagationPolicy ();
127
- this .watchRetryInitialBackoffMillis = ctx .getWatchRetryInitialBackoffMillis ();
128
- this .watchRetryBackoffMultiplier = ctx .getWatchRetryBackoffMultiplier ();
129
126
}
130
127
131
128
public BaseOperation <T , L , R > newInstance (OperationContext context ) {
@@ -234,7 +231,7 @@ public RootPaths getRootPaths() {
234
231
public T edit (UnaryOperator <T > function ) {
235
232
throw new KubernetesClientException (READ_ONLY_EDIT_EXCEPTION_MESSAGE );
236
233
}
237
-
234
+
238
235
@ Override
239
236
public T editStatus (UnaryOperator <T > function ) {
240
237
throw new KubernetesClientException (READ_ONLY_EDIT_EXCEPTION_MESSAGE );
@@ -423,7 +420,7 @@ public FilterWatchListDeletable<T, L> withoutLabels(Map<String, String> labels)
423
420
public FilterWatchListDeletable <T , L > withLabelIn (String key , String ... values ) {
424
421
return withNewFilter ().withLabelIn (key , values ).endFilter ();
425
422
}
426
-
423
+
427
424
@ Override
428
425
public FilterWatchListDeletable <T , L > withLabelNotIn (String key , String ... values ) {
429
426
return withNewFilter ().withLabelNotIn (key , values ).endFilter ();
@@ -446,7 +443,7 @@ public FilterWatchListDeletable<T, L> withFields(Map<String, String> fields) {
446
443
447
444
@ Override
448
445
public FilterWatchListDeletable <T , L > withField (String key , String value ) {
449
- return withNewFilter ().withField (key , value ).endFilter ();
446
+ return withNewFilter ().withField (key , value ).endFilter ();
450
447
}
451
448
452
449
@ Override
@@ -461,7 +458,7 @@ public FilterWatchListDeletable<T, L> withInvolvedObject(ObjectReference objectR
461
458
public FilterNested <FilterWatchListDeletable <T , L >> withNewFilter () {
462
459
return new FilterNestedImpl <>(this );
463
460
}
464
-
461
+
465
462
@ Override
466
463
public FilterWatchListDeletable <T , L > withoutFields (Map <String , String > fields ) {
467
464
return withNewFilter ().withoutFields (fields ).endFilter ();
@@ -474,7 +471,7 @@ public FilterWatchListDeletable<T, L> withoutField(String key, String value) {
474
471
475
472
public String getLabelQueryParam () {
476
473
StringBuilder sb = new StringBuilder ();
477
-
474
+
478
475
Map <String , String > labels = context .getLabels ();
479
476
if (labels != null && !labels .isEmpty ()) {
480
477
for (Iterator <Map .Entry <String , String >> iter = labels .entrySet ().iterator (); iter .hasNext (); ) {
@@ -656,7 +653,7 @@ public T updateStatus(T item) {
656
653
}
657
654
658
655
}
659
-
656
+
660
657
@ Override
661
658
public T patchStatus (T item ) {
662
659
throw new KubernetesClientException (READ_ONLY_UPDATE_EXCEPTION_MESSAGE );
@@ -760,7 +757,7 @@ public Watch watch(ListOptions options, final Watcher<T> watcher) {
760
757
public T replace (T item ) {
761
758
throw new KubernetesClientException (READ_ONLY_UPDATE_EXCEPTION_MESSAGE );
762
759
}
763
-
760
+
764
761
@ Override
765
762
public T replaceStatus (T item ) {
766
763
throw new KubernetesClientException (READ_ONLY_UPDATE_EXCEPTION_MESSAGE );
@@ -770,7 +767,7 @@ public T replaceStatus(T item) {
770
767
public T patch (PatchContext patchContext , String patch ) {
771
768
throw new KubernetesClientException (READ_ONLY_UPDATE_EXCEPTION_MESSAGE );
772
769
}
773
-
770
+
774
771
@ Override
775
772
public T patch (PatchContext patchContext , T item ) {
776
773
throw new KubernetesClientException (READ_ONLY_UPDATE_EXCEPTION_MESSAGE );
@@ -992,42 +989,27 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedExcept
992
989
@ Override
993
990
public T waitUntilCondition (Predicate <T > condition , long amount , TimeUnit timeUnit )
994
991
throws InterruptedException {
995
- return waitUntilConditionWithRetries (condition , timeUnit .toNanos (amount ), watchRetryInitialBackoffMillis );
996
- }
997
-
998
- private T waitUntilConditionWithRetries (Predicate <T > condition , long timeoutNanos , long backoffMillis )
999
- throws InterruptedException {
1000
- ListOptions options = null ;
1001
-
1002
- if (resourceVersion != null ) {
1003
- options = createListOptions (resourceVersion );
1004
- }
1005
992
1006
- long currentBackOff = backoffMillis ;
1007
- long remainingNanosToWait = timeoutNanos ;
993
+ long remainingNanosToWait = timeUnit .toNanos (amount );
1008
994
while (remainingNanosToWait > 0 ) {
1009
995
1010
996
T item = fromServer ().get ();
1011
997
if (condition .test (item )) {
1012
998
return item ;
1013
- } else if (options == null ) {
1014
- options = createListOptions (getResourceVersion (item ));
1015
999
}
1016
1000
1017
1001
final WaitForConditionWatcher <T > watcher = new WaitForConditionWatcher <>(condition );
1018
1002
final long startTime = System .nanoTime ();
1019
- try (Watch ignored = watch (options , watcher )) {
1003
+ try (Watch ignored = watch (KubernetesResourceUtil . getResourceVersion ( item ) , watcher )) {
1020
1004
return watcher .getFuture ().get (remainingNanosToWait , NANOSECONDS );
1021
1005
} catch (ExecutionException e ) {
1022
1006
Throwable cause = e .getCause ();
1023
- if (cause instanceof WatcherException && ((WatcherException ) cause ).isShouldRetry ()) {
1024
- LOG .debug ("retryable watch exception encountered, retrying after {} millis" , currentBackOff , cause );
1025
- Thread .sleep (currentBackOff );
1026
- currentBackOff *= watchRetryBackoffMultiplier ;
1007
+ if (cause instanceof WatcherException && ((WatcherException )cause ).isHttpGone ()) {
1008
+ LOG .debug ("Restarting the watch due to http gone" );
1027
1009
remainingNanosToWait -= (System .nanoTime () - startTime );
1028
- } else {
1029
- throw KubernetesClientException .launderThrowable (cause );
1010
+ continue ;
1030
1011
}
1012
+ throw KubernetesClientException .launderThrowable (cause );
1031
1013
} catch (TimeoutException e ) {
1032
1014
break ;
1033
1015
}
@@ -1037,16 +1019,6 @@ private T waitUntilConditionWithRetries(Predicate<T> condition, long timeoutNano
1037
1019
throw new IllegalArgumentException (type .getSimpleName () + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!" );
1038
1020
}
1039
1021
1040
- private static String getResourceVersion (HasMetadata item ) {
1041
- return (item == null ) ? null : item .getMetadata ().getResourceVersion ();
1042
- }
1043
-
1044
- private static ListOptions createListOptions (String resourceVersion ) {
1045
- return new ListOptionsBuilder ()
1046
- .withResourceVersion (resourceVersion )
1047
- .build ();
1048
- }
1049
-
1050
1022
public void setType (Class <T > type ) {
1051
1023
this .type = type ;
1052
1024
}
@@ -1063,14 +1035,14 @@ public void setNamespace(String namespace) {
1063
1035
public WritableOperation <T > dryRun (boolean isDryRun ) {
1064
1036
return newInstance (context .withDryRun (isDryRun ));
1065
1037
}
1066
-
1038
+
1067
1039
@ Override
1068
1040
public Informable <T > withIndexers (Map <String , Function <T , List <String >>> indexers ) {
1069
1041
BaseOperation <T , L , R > result = newInstance (context );
1070
1042
result .indexers = indexers ;
1071
1043
return result ;
1072
1044
}
1073
-
1045
+
1074
1046
@ Override
1075
1047
public SharedIndexInformer <T > inform (ResourceEventHandler <T > handler , long resync ) {
1076
1048
// convert the name into something listable
@@ -1083,7 +1055,7 @@ public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resyn
1083
1055
public L list (ListOptions params , String namespace , OperationContext context ) {
1084
1056
return baseOperation .list (params );
1085
1057
}
1086
-
1058
+
1087
1059
@ Override
1088
1060
public Watch watch (ListOptions params , String namespace , OperationContext context , Watcher <T > watcher ) {
1089
1061
return baseOperation .watch (params , watcher );
0 commit comments