Skip to content

Commit 81eb056

Browse files
informergen,listergen: support a single-cluster-scoped informer factory
In some cases, users will want to run against a specific cluster in kcp, so we need to allow them to create a single-cluster-scoped factory. We can't extract the cluster they are in from the client we get passed without major hackery, so we use the standard shared informer, keyfunc and indices, simply leaving out the cluster entirely. Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
1 parent 009c42f commit 81eb056

File tree

9 files changed

+405
-16
lines changed

9 files changed

+405
-16
lines changed

hack/update-codegen.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,15 @@ ${KUBE_INFORMER_GEN} \
7272
./../bin/code-generator \
7373
"client:standalone=true,outputPackagePath=acme.corp/pkg/kcpexisting/clients,apiPackagePath=acme.corp/pkg/apis,singleClusterClientPackagePath=acme.corp/pkg/generated/clientset/versioned,singleClusterApplyConfigurationsPackagePath=acme.corp/pkg/generated/applyconfigurations,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
7474
"lister:apiPackagePath=acme.corp/pkg/apis,singleClusterListerPackagePath=acme.corp/pkg/generated/listers,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
75-
"informer:standalone=true,outputPackagePath=acme.corp/pkg/kcpexisting/clients,apiPackagePath=acme.corp/pkg/apis,singleClusterListerPackagePath=acme.corp/pkg/generated/listers,singleClusterInformerPackagePath=acme.corp/pkg/generated/informers/externalversions,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
75+
"informer:standalone=true,outputPackagePath=acme.corp/pkg/kcpexisting/clients,apiPackagePath=acme.corp/pkg/apis,singleClusterClientPackagePath=acme.corp/pkg/generated/clientset/versioned,singleClusterListerPackagePath=acme.corp/pkg/generated/listers,singleClusterInformerPackagePath=acme.corp/pkg/generated/informers/externalversions,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
7676
"paths=./pkg/apis/..." \
7777
"output:dir=./pkg/kcpexisting/clients"
7878

7979
# Generate cluster-aware clients, informers and listers assuming no single-cluster listers or informers
8080
./../bin/code-generator \
8181
"client:standalone=true,outputPackagePath=acme.corp/pkg/kcp/clients,apiPackagePath=acme.corp/pkg/apis,singleClusterClientPackagePath=acme.corp/pkg/generated/clientset/versioned,singleClusterApplyConfigurationsPackagePath=acme.corp/pkg/generated/applyconfigurations,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
8282
"lister:apiPackagePath=acme.corp/pkg/apis,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
83-
"informer:standalone=true,outputPackagePath=acme.corp/pkg/kcp/clients,apiPackagePath=acme.corp/pkg/apis,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
83+
"informer:standalone=true,outputPackagePath=acme.corp/pkg/kcp/clients,apiPackagePath=acme.corp/pkg/apis,singleClusterClientPackagePath=acme.corp/pkg/generated/clientset/versioned,headerFile=./../hack/boilerplate/boilerplate.go.txt" \
8484
"paths=./pkg/apis/..." \
8585
"output:dir=./pkg/kcp/clients"
8686

pkg/generators/informergen/informergen.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ type Generator struct {
5858
// e.g. "k8s.io/api"
5959
APIPackagePath string `marker:"apiPackagePath"`
6060

61+
// SingleClusterClientPackagePath is the root directory under which single-cluster-aware clients exist.
62+
// e.g. "k8s.io/client-go/kubernetes"
63+
SingleClusterClientPackagePath string `marker:""`
64+
6165
// SingleClusterInformerPackagePath is the package under which the cluster-unaware listers are exposed.
6266
// e.g. "k8s.io/client-go/informers"
6367
SingleClusterInformerPackagePath string `marker:",optional"`
@@ -136,6 +140,7 @@ func (g Generator) Generate(ctx *genall.GenerationContext) error {
136140
Groups: onlyGroups,
137141
PackagePath: filepath.Join(g.OutputPackagePath, informersDir),
138142
ClientsetPackagePath: filepath.Join(g.OutputPackagePath, clientsetDir),
143+
SingleClusterClientPackagePath: g.SingleClusterClientPackagePath,
139144
SingleClusterInformerPackagePath: g.SingleClusterInformerPackagePath,
140145
}, factoryPath); err != nil {
141146
return err
@@ -165,7 +170,9 @@ func (g Generator) Generate(ctx *genall.GenerationContext) error {
165170
interfacesPath := filepath.Join(informersDir, "internalinterfaces", "factory_interfaces.go")
166171
logger.WithValues("path", factoryPath).Info("generating internal informer interfaces")
167172
if err := util.WriteGeneratedCode(ctx, headerText, &informergen.FactoryInterface{
168-
ClientsetPackagePath: filepath.Join(g.OutputPackagePath, clientsetDir),
173+
ClientsetPackagePath: filepath.Join(g.OutputPackagePath, clientsetDir),
174+
SingleClusterClientPackagePath: g.SingleClusterClientPackagePath,
175+
UseUpstreamInterfaces: g.SingleClusterInformerPackagePath != "",
169176
}, interfacesPath); err != nil {
170177
return err
171178
}
@@ -187,9 +194,10 @@ func (g Generator) Generate(ctx *genall.GenerationContext) error {
187194

188195
logger.WithValues("path", outputFile).Info("generating group interface")
189196
if err := util.WriteGeneratedCode(ctx, headerText, &informergen.GroupInterface{
190-
Group: group,
191-
Versions: onlyVersions,
192-
PackagePath: filepath.Join(g.OutputPackagePath, informersDir),
197+
Group: group,
198+
Versions: onlyVersions,
199+
PackagePath: filepath.Join(g.OutputPackagePath, informersDir),
200+
UseUpstreamInterfaces: g.SingleClusterInformerPackagePath != "",
193201
}, outputFile); err != nil {
194202
return err
195203
}
@@ -202,9 +210,10 @@ func (g Generator) Generate(ctx *genall.GenerationContext) error {
202210

203211
logger.WithValues("path", outputFile).Info("generating version interface")
204212
if err := util.WriteGeneratedCode(ctx, headerText, &informergen.VersionInterface{
205-
Version: types.Version(namer.IC(version.Version.String())),
206-
Kinds: kinds,
207-
PackagePath: filepath.Join(g.OutputPackagePath, informersDir),
213+
Version: types.Version(namer.IC(version.Version.String())),
214+
Kinds: kinds,
215+
PackagePath: filepath.Join(g.OutputPackagePath, informersDir),
216+
UseUpstreamInterfaces: g.SingleClusterInformerPackagePath != "",
208217
}, outputFile); err != nil {
209218
return err
210219
}
@@ -223,6 +232,7 @@ func (g Generator) Generate(ctx *genall.GenerationContext) error {
223232
PackagePath: filepath.Join(g.OutputPackagePath, informersDir),
224233
ClientsetPackagePath: filepath.Join(g.OutputPackagePath, clientsetDir),
225234
ListerPackagePath: filepath.Join(g.OutputPackagePath, listersDir),
235+
SingleClusterClientPackagePath: g.SingleClusterClientPackagePath,
226236
SingleClusterInformerPackagePath: g.SingleClusterInformerPackagePath,
227237
SingleClusterListerPackagePath: g.SingleClusterListerPackagePath,
228238
}, outputFile); err != nil {

pkg/internal/informergen/factory.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ type Factory struct {
3737
// TODO(skuznets) we should be able to figure this out from the output dir, ideally
3838
ClientsetPackagePath string
3939

40+
// SingleClusterClientPackagePath is the root directory under which single-cluster-aware clients exist.
41+
// e.g. "k8s.io/client-go/kubernetes"
42+
SingleClusterClientPackagePath string `marker:""`
43+
4044
// SingleClusterInformerPackagePath is the package under which the cluster-unaware listers are exposed.
4145
// e.g. "k8s.io/client-go/informers"
4246
SingleClusterInformerPackagePath string
@@ -52,6 +56,7 @@ func (f *Factory) WriteContent(w io.Writer) error {
5256
"groups": f.Groups,
5357
"packagePath": f.PackagePath,
5458
"clientsetPackagePath": f.ClientsetPackagePath,
59+
"singleClusterClientPackagePath": f.SingleClusterClientPackagePath,
5560
"singleClusterInformerPackagePath": f.SingleClusterInformerPackagePath,
5661
"useUpstreamInterfaces": f.SingleClusterInformerPackagePath != "",
5762
}
@@ -80,6 +85,9 @@ import (
8085
"k8s.io/client-go/tools/cache"
8186
8287
clientset "{{.clientsetPackagePath}}"
88+
{{if not .useUpstreamInterfaces -}}
89+
scopedclientset "{{.singleClusterClientPackagePath}}"
90+
{{end -}}
8391
{{if .useUpstreamInterfaces -}}
8492
upstreaminformers "{{.singleClusterInformerPackagePath}}"
8593
{{end -}}
@@ -96,6 +104,9 @@ type SharedInformerOption func(*SharedInformerOptions) *SharedInformerOptions
96104
type SharedInformerOptions struct {
97105
customResync map[reflect.Type]time.Duration
98106
tweakListOptions internalinterfaces.TweakListOptionsFunc
107+
{{if not .useUpstreamInterfaces -}}
108+
namespace string
109+
{{end -}}
99110
}
100111
101112
type sharedInformerFactory struct {
@@ -264,4 +275,136 @@ func (f *scopedDynamicSharedInformerFactory) ForResource(resource schema.GroupVe
264275
func (f *scopedDynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
265276
f.sharedInformerFactory.Start(stopCh)
266277
}
278+
279+
{{if not .useUpstreamInterfaces -}}
280+
// WithNamespace limits the SharedInformerFactory to the specified namespace.
281+
func WithNamespace(namespace string) SharedInformerOption {
282+
return func(opts *SharedInformerOptions) *SharedInformerOptions {
283+
opts.namespace = namespace
284+
return opts
285+
}
286+
}
287+
288+
type sharedScopedInformerFactory struct {
289+
client scopedclientset.Interface
290+
namespace string
291+
tweakListOptions internalinterfaces.TweakListOptionsFunc
292+
lock sync.Mutex
293+
defaultResync time.Duration
294+
customResync map[reflect.Type]time.Duration
295+
296+
informers map[reflect.Type]cache.SharedIndexInformer
297+
// startedInformers is used for tracking which informers have been started.
298+
// This allows Start() to be called multiple times safely.
299+
startedInformers map[reflect.Type]bool
300+
}
301+
302+
// NewSharedScopedInformerFactory constructs a new instance of SharedInformerFactory for some or all namespaces.
303+
func NewSharedScopedInformerFactory(client scopedclientset.Interface, defaultResync time.Duration, namespace string) SharedScopedInformerFactory {
304+
return NewSharedScopedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace))
305+
}
306+
307+
// NewSharedScopedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
308+
func NewSharedScopedInformerFactoryWithOptions(client scopedclientset.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedScopedInformerFactory {
309+
factory := &sharedScopedInformerFactory{
310+
client: client,
311+
defaultResync: defaultResync,
312+
informers: make(map[reflect.Type]cache.SharedIndexInformer),
313+
startedInformers: make(map[reflect.Type]bool),
314+
customResync: make(map[reflect.Type]time.Duration),
315+
}
316+
317+
opts := &SharedInformerOptions{
318+
customResync: make(map[reflect.Type]time.Duration),
319+
}
320+
321+
// Apply all options
322+
for _, opt := range options {
323+
opts = opt(opts)
324+
}
325+
326+
// Forward options to the factory
327+
factory.customResync = opts.customResync
328+
factory.tweakListOptions = opts.tweakListOptions
329+
factory.namespace = opts.namespace
330+
331+
return factory
332+
}
333+
334+
// Start initializes all requested informers.
335+
func (f *sharedScopedInformerFactory) Start(stopCh <-chan struct{}) {
336+
f.lock.Lock()
337+
defer f.lock.Unlock()
338+
339+
for informerType, informer := range f.informers {
340+
if !f.startedInformers[informerType] {
341+
go informer.Run(stopCh)
342+
f.startedInformers[informerType] = true
343+
}
344+
}
345+
}
346+
347+
// WaitForCacheSync waits for all started informers' cache were synced.
348+
func (f *sharedScopedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
349+
informers := func()map[reflect.Type]cache.SharedIndexInformer{
350+
f.lock.Lock()
351+
defer f.lock.Unlock()
352+
353+
informers := map[reflect.Type]cache.SharedIndexInformer{}
354+
for informerType, informer := range f.informers {
355+
if f.startedInformers[informerType] {
356+
informers[informerType] = informer
357+
}
358+
}
359+
return informers
360+
}()
361+
362+
res := map[reflect.Type]bool{}
363+
for informType, informer := range informers {
364+
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
365+
}
366+
return res
367+
}
368+
369+
// InformerFor returns the SharedIndexInformer for obj using an internal
370+
// client.
371+
func (f *sharedScopedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewScopedInformerFunc) cache.SharedIndexInformer {
372+
f.lock.Lock()
373+
defer f.lock.Unlock()
374+
375+
informerType := reflect.TypeOf(obj)
376+
informer, exists := f.informers[informerType]
377+
if exists {
378+
return informer
379+
}
380+
381+
resyncPeriod, exists := f.customResync[informerType]
382+
if !exists {
383+
resyncPeriod = f.defaultResync
384+
}
385+
386+
informer = newFunc(f.client, resyncPeriod)
387+
f.informers[informerType] = informer
388+
389+
return informer
390+
}
391+
392+
// SharedScopedInformerFactory provides shared informers for resources in all known
393+
// API group versions, scoped to one workspace.
394+
type SharedScopedInformerFactory interface {
395+
internalinterfaces.SharedScopedInformerFactory
396+
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
397+
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
398+
399+
{{range .groups}} {{.GoName}}() {{.Group.PackageName}}informers.Interface
400+
{{end -}}
401+
}
402+
403+
404+
{{range .groups}}
405+
func (f *sharedScopedInformerFactory) {{.GoName}}() {{.Group.PackageName}}informers.Interface {
406+
return {{.Group.PackageName}}informers.NewScoped(f, f.namespace, f.tweakListOptions)
407+
}
408+
{{end}}
409+
{{end}}
267410
`

pkg/internal/informergen/factoryinterface.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ type FactoryInterface struct {
2525
// ClientsetPackagePath is the package under which the cluster-aware client-set will be exposed.
2626
// TODO(skuznets) we should be able to figure this out from the output dir, ideally
2727
ClientsetPackagePath string
28+
29+
// SingleClusterClientPackagePath is the root directory under which single-cluster-aware clients exist.
30+
// e.g. "k8s.io/client-go/kubernetes"
31+
SingleClusterClientPackagePath string `marker:""`
32+
33+
// UseUpstreamInterfaces determines if we're generating against existing single-cluster informer interfaces or not.
34+
UseUpstreamInterfaces bool
2835
}
2936

3037
func (f *FactoryInterface) WriteContent(w io.Writer) error {
@@ -34,7 +41,9 @@ func (f *FactoryInterface) WriteContent(w io.Writer) error {
3441
}
3542

3643
m := map[string]interface{}{
37-
"clientsetPackagePath": f.ClientsetPackagePath,
44+
"clientsetPackagePath": f.ClientsetPackagePath,
45+
"singleClusterClientPackagePath": f.SingleClusterClientPackagePath,
46+
"useUpstreamInterfaces": f.UseUpstreamInterfaces,
3847
}
3948
return templ.Execute(w, m)
4049
}
@@ -54,8 +63,14 @@ import (
5463
5564
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5665
runtime "k8s.io/apimachinery/pkg/runtime"
66+
{{if not .useUpstreamInterfaces -}}
67+
"k8s.io/client-go/tools/cache"
68+
{{end}}
5769
5870
clientset "{{.clientsetPackagePath}}"
71+
{{if not .useUpstreamInterfaces -}}
72+
scopedclientset "{{.singleClusterClientPackagePath}}"
73+
{{end}}
5974
)
6075
6176
// NewInformerFunc takes clientset.ClusterInterface and time.Duration to return a ScopeableSharedIndexInformer.
@@ -67,6 +82,17 @@ type SharedInformerFactory interface {
6782
InformerFor(obj runtime.Object, newFunc NewInformerFunc) kcpcache.ScopeableSharedIndexInformer
6883
}
6984
85+
{{if not .useUpstreamInterfaces -}}
86+
// NewScopedInformerFunc takes scopedclientset.Interface and time.Duration to return a SharedIndexInformer.
87+
type NewScopedInformerFunc func(scopedclientset.Interface, time.Duration) cache.SharedIndexInformer
88+
89+
// SharedScopedInformerFactory a small interface to allow for adding an informer without an import cycle
90+
type SharedScopedInformerFactory interface {
91+
Start(stopCh <-chan struct{})
92+
InformerFor(obj runtime.Object, newFunc NewScopedInformerFunc) cache.SharedIndexInformer
93+
}
94+
{{end}}
95+
7096
// TweakListOptionsFunc is a function that transforms a metav1.ListOptions.
7197
type TweakListOptionsFunc func(*metav1.ListOptions)
7298
`

pkg/internal/informergen/generic.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,21 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource
146146
147147
return nil, fmt.Errorf("no informer found for %v", resource)
148148
}
149+
150+
{{if not .useUpstreamInterfaces -}}
151+
// ForResource gives generic access to a shared informer of the matching type
152+
// TODO extend this to unknown resources with a client pool
153+
func (f *sharedScopedInformerFactory) ForResource(resource schema.GroupVersionResource) ({{if .useUpstreamInterfaces}}upstreaminformers.{{end}}GenericInformer, error) {
154+
switch resource {
155+
{{range $group := .groups}} // Group={{.Group.NonEmpty}}, Version={{.Version}}
156+
{{range $kind := index (index $.groupVersionKinds .Group) .Version}} case {{$group.PackageAlias}}.SchemeGroupVersion.WithResource("{{$kind.Plural|toLower}}"):
157+
informer := f.{{$group.GroupGoName}}().{{$group.Version}}().{{$kind.Plural}}().Informer()
158+
return &genericInformer{lister: cache.NewGenericLister(informer.GetIndexer(), resource.GroupResource()), informer: informer}, nil
159+
{{end -}}
160+
{{end -}}
161+
}
162+
163+
return nil, fmt.Errorf("no informer found for %v", resource)
164+
}
165+
{{end}}
149166
`

pkg/internal/informergen/groupinterface.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ type GroupInterface struct {
3636
// e.g. "github.com/kcp-dev/client-go/clients/informers"
3737
// TODO(skuznets) we should be able to figure this out from the output dir, ideally
3838
PackagePath string
39+
40+
// UseUpstreamInterfaces determines if we're generating against existing single-cluster informer interfaces or not.
41+
UseUpstreamInterfaces bool
3942
}
4043

4144
func (g GroupInterface) WriteContent(w io.Writer) error {
@@ -45,9 +48,10 @@ func (g GroupInterface) WriteContent(w io.Writer) error {
4548
}
4649

4750
m := map[string]interface{}{
48-
"group": g.Group,
49-
"packagePath": g.PackagePath,
50-
"versions": g.Versions,
51+
"group": g.Group,
52+
"packagePath": g.PackagePath,
53+
"versions": g.Versions,
54+
"useUpstreamInterfaces": g.UseUpstreamInterfaces,
5155
}
5256
return templ.Execute(w, m)
5357
}
@@ -88,4 +92,29 @@ func (g *group) {{.String}}() {{.PackageName}}.ClusterInterface {
8892
return {{.PackageName}}.New(g.factory, g.tweakListOptions)
8993
}
9094
{{end -}}
95+
96+
{{if not .useUpstreamInterfaces -}}
97+
type Interface interface {
98+
{{range .versions}} // {{.String}} provides access to the shared informers in {{.String}}.
99+
{{.String}}() {{.PackageName}}.Interface
100+
{{end -}}
101+
}
102+
103+
type scopedGroup struct {
104+
factory internalinterfaces.SharedScopedInformerFactory
105+
tweakListOptions internalinterfaces.TweakListOptionsFunc
106+
namespace string
107+
}
108+
109+
// New returns a new Interface.
110+
func NewScoped(f internalinterfaces.SharedScopedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
111+
return &scopedGroup{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
112+
}
113+
114+
{{range .versions}}// {{.String}} returns a new {{.PackageName}}.ClusterInterface.
115+
func (g *scopedGroup) {{.String}}() {{.PackageName}}.Interface {
116+
return {{.PackageName}}.NewScoped(g.factory, g.namespace, g.tweakListOptions)
117+
}
118+
{{end -}}
119+
{{end}}
91120
`

0 commit comments

Comments
 (0)