@@ -22,30 +22,35 @@ import (
22
22
"fmt"
23
23
"os"
24
24
25
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
26
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
+ "k8s.io/apimachinery/pkg/fields"
25
28
"k8s.io/apimachinery/pkg/runtime"
26
- "k8s.io/apimachinery/pkg/types"
27
29
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30
+ "k8s.io/apimachinery/pkg/watch"
28
31
"k8s.io/client-go/discovery"
29
32
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
30
33
"k8s.io/client-go/rest"
34
+ retrywatch "k8s.io/client-go/tools/watch"
31
35
"k8s.io/klog/v2"
32
- "sigs.k8s.io/controller-runtime/pkg/client"
33
36
34
37
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
35
38
// to ensure that exec-entrypoint and run can make use of them.
36
39
_ "k8s.io/client-go/plugin/pkg/client/auth"
40
+
37
41
ctrl "sigs.k8s.io/controller-runtime"
42
+ "sigs.k8s.io/controller-runtime/pkg/client"
38
43
"sigs.k8s.io/controller-runtime/pkg/healthz"
39
44
"sigs.k8s.io/controller-runtime/pkg/kcp"
40
45
"sigs.k8s.io/controller-runtime/pkg/log/zap"
41
46
42
- datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1"
47
+ apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
48
+ "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions"
43
49
44
50
// +kubebuilder:scaffold:imports
45
51
52
+ datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1"
46
53
"github.com/kcp-dev/controller-runtime-example/controllers"
47
-
48
- apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
49
54
)
50
55
51
56
var (
@@ -67,7 +72,7 @@ func main() {
67
72
var enableLeaderElection bool
68
73
var probeAddr string
69
74
var apiExportName string
70
- flag .StringVar (& apiExportName , "api-export-name" , "" , "The name of the APIExport." )
75
+ flag .StringVar (& apiExportName , "api-export-name" , "data.my.domain " , "The name of the APIExport." )
71
76
flag .StringVar (& metricsAddr , "metrics-bind-address" , ":8080" , "The address the metric endpoint binds to." )
72
77
flag .StringVar (& probeAddr , "health-probe-bind-address" , ":8081" , "The address the probe endpoint binds to." )
73
78
flag .BoolVar (& enableLeaderElection , "leader-elect" , false ,
@@ -161,48 +166,59 @@ func main() {
161
166
// +kubebuilder:rbac:groups="apis.kcp.dev",resources=apiexports,verbs=get;list;watch
162
167
163
168
// restConfigForAPIExport returns a *rest.Config properly configured to communicate with the endpoint for the
164
- // APIExport's virtual workspace.
169
+ // APIExport's virtual workspace. It blocks until the controller APIExport VirtualWorkspaceURLsReady condition
170
+ // becomes truthy, which happens when the APIExport is bound for the first time.
165
171
func restConfigForAPIExport (ctx context.Context , cfg * rest.Config , apiExportName string ) (* rest.Config , error ) {
166
- scheme := runtime .NewScheme ()
167
- if err := apisv1alpha1 .AddToScheme (scheme ); err != nil {
168
- return nil , fmt .Errorf ("error adding apis.kcp.dev/v1alpha1 to scheme: %w" , err )
169
- }
170
-
171
- apiExportClient , err := client .New (cfg , client.Options {Scheme : scheme })
172
+ apiExportClient , err := client .NewWithWatch (cfg , client.Options {Scheme : scheme })
172
173
if err != nil {
173
174
return nil , fmt .Errorf ("error creating APIExport client: %w" , err )
174
175
}
175
176
176
- var apiExport apisv1alpha1.APIExport
177
-
178
- if apiExportName != "" {
179
- if err := apiExportClient .Get (ctx , types.NamespacedName {Name : apiExportName }, & apiExport ); err != nil {
180
- return nil , fmt .Errorf ("error getting APIExport %q: %w" , apiExportName , err )
181
- }
182
- } else {
183
- setupLog .Info ("api-export-name is empty - listing" )
184
- exports := & apisv1alpha1.APIExportList {}
185
- if err := apiExportClient .List (ctx , exports ); err != nil {
186
- return nil , fmt .Errorf ("error listing APIExports: %w" , err )
187
- }
188
- if len (exports .Items ) == 0 {
189
- return nil , fmt .Errorf ("no APIExport found" )
190
- }
191
- if len (exports .Items ) > 1 {
192
- return nil , fmt .Errorf ("more than one APIExport found" )
193
- }
194
- apiExport = exports .Items [0 ]
177
+ list := & apisv1alpha1.APIExportList {}
178
+ selector := fields .OneTermEqualSelector ("metadata.name" , apiExportName )
179
+ err = apiExportClient .List (ctx , list , client.MatchingFieldsSelector {Selector : selector })
180
+ if err != nil {
181
+ return nil , fmt .Errorf ("error watching for APIExport: %w" , err )
195
182
}
196
-
197
- if len (apiExport .Status .VirtualWorkspaces ) < 1 {
198
- return nil , fmt .Errorf ("APIExport %q status.virtualWorkspaces is empty" , apiExportName )
183
+ if len (list .Items ) > 0 && isAPIExportReady (& list .Items [0 ]) {
184
+ cfg = rest .CopyConfig (cfg )
185
+ // TODO: sharding support
186
+ cfg .Host = list .Items [0 ].Status .VirtualWorkspaces [0 ].URL
187
+ return cfg , nil
199
188
}
200
189
201
- cfg = rest .CopyConfig (cfg )
202
- // TODO(ncdc): sharding support
203
- cfg .Host = apiExport .Status .VirtualWorkspaces [0 ].URL
190
+ setupLog .Info ("Watching for APIExport to become ready" , "name" , apiExportName )
204
191
205
- return cfg , nil
192
+ rw , err := retrywatch .NewRetryWatcher (list .ResourceVersion , watcher (apiExportClient .Watch ).FilteredBy (selector ))
193
+ if err != nil {
194
+ return nil , fmt .Errorf ("error creating retry watcher for APIExport: %w" , err )
195
+ }
196
+ defer rw .Stop ()
197
+
198
+ for {
199
+ select {
200
+ case <- ctx .Done ():
201
+ return nil , ctx .Err ()
202
+ case e := <- rw .ResultChan ():
203
+ switch e .Type {
204
+ case watch .Error :
205
+ return nil , fmt .Errorf ("error watching for APIExport: %w" , apierrors .FromObject (e .Object ))
206
+
207
+ case watch .Added , watch .Modified :
208
+ apiExport , ok := e .Object .(* apisv1alpha1.APIExport )
209
+ if ! ok {
210
+ return nil , fmt .Errorf ("unexpected event object: %v" , e .Object )
211
+ }
212
+ if ! isAPIExportReady (apiExport ) {
213
+ continue
214
+ }
215
+ cfg = rest .CopyConfig (cfg )
216
+ // TODO: sharding support
217
+ cfg .Host = apiExport .Status .VirtualWorkspaces [0 ].URL
218
+ return cfg , nil
219
+ }
220
+ }
221
+ }
206
222
}
207
223
208
224
func kcpAPIsGroupPresent (restConfig * rest.Config ) bool {
@@ -228,3 +244,29 @@ func kcpAPIsGroupPresent(restConfig *rest.Config) bool {
228
244
}
229
245
return false
230
246
}
247
+
248
+ func isAPIExportReady (apiExport * apisv1alpha1.APIExport ) bool {
249
+ if ! conditions .IsTrue (apiExport , apisv1alpha1 .APIExportVirtualWorkspaceURLsReady ) {
250
+ setupLog .Info ("APIExport virtual workspace URLs are not ready" , "APIExport" , apiExport .Name )
251
+ return false
252
+ }
253
+
254
+ if len (apiExport .Status .VirtualWorkspaces ) == 0 {
255
+ setupLog .Info ("APIExport does not have any virtual workspace URLs" , "APIExport" , apiExport .Name )
256
+ return false
257
+ }
258
+
259
+ return true
260
+ }
261
+
262
+ type watcher func (ctx context.Context , obj client.ObjectList , opts ... client.ListOption ) (watch.Interface , error )
263
+
264
+ func (w watcher ) Watch (options metav1.ListOptions ) (watch.Interface , error ) {
265
+ return w (context .TODO (), & apisv1alpha1.APIExportList {}, & client.ListOptions {Raw : & options })
266
+ }
267
+
268
+ func (w watcher ) FilteredBy (selector fields.Selector ) watcher {
269
+ return func (ctx context.Context , obj client.ObjectList , opts ... client.ListOption ) (watch.Interface , error ) {
270
+ return w (ctx , obj , append (opts , client.MatchingFieldsSelector {Selector : selector })... )
271
+ }
272
+ }
0 commit comments