Skip to content

Commit 10f1e74

Browse files
committed
Add shutdown support to shared informer factory
Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
1 parent 2f8ab8a commit 10f1e74

File tree

1 file changed

+87
-12
lines changed

1 file changed

+87
-12
lines changed

pkg/internal/informergen/factory.go

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ type sharedInformerFactory struct {
120120
// startedInformers is used for tracking which informers have been started.
121121
// This allows Start() to be called multiple times safely.
122122
startedInformers map[reflect.Type]bool
123+
// wg tracks how many goroutines were started.
124+
wg sync.WaitGroup
125+
// shuttingDown is true when Shutdown has been called. It may still be running
126+
// because it needs to wait for goroutines.
127+
shuttingDown bool
123128
}
124129
125130
// WithCustomResyncConfig sets a custom resync period for the specified informer types.
@@ -173,15 +178,36 @@ func NewSharedInformerFactoryWithOptions(client clientset.ClusterInterface, defa
173178
174179
// Start initializes all requested informers.
175180
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
176-
f.lock.Lock()
177-
defer f.lock.Unlock()
181+
f.lock.Lock()
182+
defer f.lock.Unlock()
178183
179-
for informerType, informer := range f.informers {
180-
if !f.startedInformers[informerType] {
181-
go informer.Run(stopCh)
182-
f.startedInformers[informerType] = true
183-
}
184-
}
184+
if f.shuttingDown {
185+
return
186+
}
187+
188+
for informerType, informer := range f.informers {
189+
if !f.startedInformers[informerType] {
190+
f.wg.Add(1)
191+
// We need a new variable in each loop iteration,
192+
// otherwise the goroutine would use the loop variable
193+
// and that keeps changing.
194+
informer := informer
195+
go func() {
196+
defer f.wg.Done()
197+
informer.Run(stopCh)
198+
}()
199+
f.startedInformers[informerType] = true
200+
}
201+
}
202+
}
203+
204+
func (f *sharedInformerFactory) Shutdown() {
205+
f.lock.Lock()
206+
f.shuttingDown = true
207+
f.lock.Unlock()
208+
209+
// Will return immediately if there is nothing to wait for.
210+
f.wg.Wait()
185211
}
186212
187213
// WaitForCacheSync waits for all started informers' cache were synced.
@@ -206,8 +232,7 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref
206232
return res
207233
}
208234
209-
// InformerFor returns the SharedIndexInformer for obj using an internal
210-
// client.
235+
// InformerFor returns the SharedIndexInformer for obj.
211236
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) kcpcache.ScopeableSharedIndexInformer {
212237
f.lock.Lock()
213238
defer f.lock.Unlock()
@@ -230,18 +255,69 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
230255
}
231256
232257
type ScopedDynamicSharedInformerFactory interface {
258+
// ForResource gives generic access to a shared informer of the matching type.
233259
ForResource(resource schema.GroupVersionResource) ({{if .useUpstreamInterfaces}}upstreaminformers.{{end}}GenericInformer, error)
260+
261+
// Start initializes all requested informers. They are handled in goroutines
262+
// which run until the stop channel gets closed.
234263
Start(stopCh <-chan struct{})
235264
}
236265
237266
// SharedInformerFactory provides shared informers for resources in all known
238267
// API group versions.
268+
//
269+
// It is typically used like this:
270+
//
271+
// ctx, cancel := context.Background()
272+
// defer cancel()
273+
// factory := NewSharedInformerFactoryWithOptions(client, resyncPeriod)
274+
// defer factory.Shutdown() // Returns immediately if nothing was started.
275+
// genericInformer := factory.ForResource(resource)
276+
// typedInformer := factory.SomeAPIGroup().V1().SomeType()
277+
// factory.Start(ctx.Done()) // Start processing these informers.
278+
// synced := factory.WaitForCacheSync(ctx.Done())
279+
// for v, ok := range synced {
280+
// if !ok {
281+
// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v)
282+
// return
283+
// }
284+
// }
285+
//
286+
// // Creating informers can also be created after Start, but then
287+
// // Start must be called again:
288+
// anotherGenericInformer := factory.ForResource(resource)
289+
// factory.Start(ctx.Done())
239290
type SharedInformerFactory interface {
240291
internalinterfaces.SharedInformerFactory
292+
241293
Cluster(logicalcluster.Name) ScopedDynamicSharedInformerFactory
294+
295+
// Start initializes all requested informers. They are handled in goroutines
296+
// which run until the stop channel gets closed.
297+
Start(stopCh <-chan struct{})
298+
299+
// Shutdown marks a factory as shutting down. At that point no new
300+
// informers can be started anymore and Start will return without
301+
// doing anything.
302+
//
303+
// In addition, Shutdown blocks until all goroutines have terminated. For that
304+
// to happen, the close channel(s) that they were started with must be closed,
305+
// either before Shutdown gets called or while it is waiting.
306+
//
307+
// Shutdown may be called multiple times, even concurrently. All such calls will
308+
// block until all goroutines have terminated.
309+
Shutdown()
310+
311+
// ForResource gives generic access to a shared informer of the matching type.
242312
ForResource(resource schema.GroupVersionResource) (GenericClusterInformer, error)
313+
314+
// WaitForCacheSync blocks until all started informers' caches were synced
315+
// or the stop channel gets closed.
243316
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
244317
318+
// InformerFor returns the SharedIndexInformer for obj.
319+
InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) kcpcache.ScopeableSharedIndexInformer
320+
245321
{{range .groups}} {{.GoName}}() {{.Group.PackageName}}informers.ClusterInterface
246322
{{end -}}
247323
}
@@ -366,8 +442,7 @@ func (f *sharedScopedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) m
366442
return res
367443
}
368444
369-
// InformerFor returns the SharedIndexInformer for obj using an internal
370-
// client.
445+
// InformerFor returns the SharedIndexInformer for obj.
371446
func (f *sharedScopedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewScopedInformerFunc) cache.SharedIndexInformer {
372447
f.lock.Lock()
373448
defer f.lock.Unlock()

0 commit comments

Comments
 (0)