Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions pkg/controller/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ type Controller struct {

// lister indexes properties about Configuration
lister listers.ConfigurationLister
synced cache.InformerSynced

// don't start the workers until revisions cache have been synced
revisionsSynced cache.InformerSynced
}

// NewController creates a new Configuration controller
Expand All @@ -64,15 +60,23 @@ func NewController(
informer := elaInformerFactory.Serving().V1alpha1().Configurations()
revisionInformer := elaInformerFactory.Serving().V1alpha1().Revisions()

informers := []cache.SharedIndexInformer{informer.Informer(), revisionInformer.Informer()}

controller := &Controller{
Base: controller.NewBase(opt, informer.Informer(), controllerAgentName, "Configurations"),
buildClientSet: buildClientSet,
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
revisionsSynced: revisionInformer.Informer().HasSynced,
Base: controller.NewBase(opt, controllerAgentName, "Configurations", informers),
buildClientSet: buildClientSet,
lister: informer.Lister(),
}

controller.Logger.Info("Setting up event handlers")
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.Enqueue,
UpdateFunc: func(old, new interface{}) {
controller.Enqueue(new)
},
DeleteFunc: controller.Enqueue,
})

revisionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controller.SyncRevision(obj.(*v1alpha1.Revision))
Expand All @@ -89,8 +93,7 @@ func NewController(
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return c.RunController(threadiness, stopCh,
[]cache.InformerSynced{c.synced, c.revisionsSynced}, c.syncHandler, "Configuration")
return c.RunController(threadiness, stopCh, c.syncHandler, "Configuration")
}

// loggerWithConfigInfo enriches the logs with configuration name and namespace.
Expand Down
26 changes: 13 additions & 13 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Base struct {
// performance benefits, raw logger also preserves type-safety at
// the expense of slightly greater verbosity.
Logger *zap.SugaredLogger

// don't start the workers until informers are synced
informersSynced []cache.InformerSynced
}

// Options defines the common controller options passed to NewBase.
Expand All @@ -85,7 +88,8 @@ type Options struct {

// NewBase instantiates a new instance of Base implementing
// the common & boilerplate code between our controllers.
func NewBase(opt Options, informer cache.SharedIndexInformer, controllerAgentName string, workQueueName string) *Base {
func NewBase(opt Options, controllerAgentName, workQueueName string,
informers []cache.SharedIndexInformer) *Base {

// Enrich the logs with controller name
logger := opt.Logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName))
Expand All @@ -106,21 +110,16 @@ func NewBase(opt Options, informer cache.SharedIndexInformer, controllerAgentNam
Logger: logger,
}

// Set up an event handler for when the resource types of interest change
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: base.enqueueWork,
UpdateFunc: func(old, new interface{}) {
base.enqueueWork(new)
},
DeleteFunc: base.enqueueWork,
})
for _, i := range informers {
base.informersSynced = append(base.informersSynced, i.HasSynced)
}

return base
}

// enqueueWork takes a resource and converts it into a
// Enqueue takes a resource and converts it into a
// namespace/name string which is then put onto the work queue.
func (c *Base) enqueueWork(obj interface{}) {
func (c *Base) Enqueue(obj interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
Expand All @@ -130,14 +129,15 @@ func (c *Base) enqueueWork(obj interface{}) {
c.WorkQueue.AddRateLimited(key)
}

// TODO(mattmoor): EnqueueControllerOf

// RunController will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Base) RunController(
threadiness int,
stopCh <-chan struct{},
informersSynced []cache.InformerSynced,
syncHandler func(string) error,
controllerName string) error {

Expand All @@ -149,7 +149,7 @@ func (c *Base) RunController(

// Wait for the caches to be synced before starting workers
logger.Info("Waiting for informer caches to sync")
for i, synced := range informersSynced {
for i, synced := range c.informersSynced {
if ok := cache.WaitForCacheSync(stopCh, synced); !ok {
return fmt.Errorf("failed to wait for cache at index %v to sync", i)
}
Expand Down
34 changes: 18 additions & 16 deletions pkg/controller/revision/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,11 @@ type Controller struct {

// lister indexes properties about Revision
lister listers.RevisionLister
synced cache.InformerSynced

buildtracker *buildTracker

resolver resolver

// don't start the workers until informers are synced
endpointsSynced cache.InformerSynced
configMapSynced cache.InformerSynced

// enableVarLogCollection dedicates whether to set up a fluentd sidecar to
// collect logs under /var/log/.
enableVarLogCollection bool
Expand Down Expand Up @@ -172,30 +167,38 @@ func NewController(

// obtain references to a shared index informer for the Revision and Endpoint type.
informer := elaInformerFactory.Serving().V1alpha1().Revisions()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
buildInformer := buildInformerFactory.Build().V1alpha1().Builds()
configMapInformer := servingSystemInformerFactory.Core().V1().ConfigMaps()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
configMapInformer := servingSystemInformerFactory.Core().V1().ConfigMaps().Informer()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()

informers := []cache.SharedIndexInformer{informer.Informer(), buildInformer.Informer(),
configMapInformer.Informer(), deploymentInformer.Informer(), endpointsInformer.Informer()}

networkConfig, err := NewNetworkConfig(opt.KubeClientSet)
if err != nil {
opt.Logger.Fatalf("Error loading network config: %v", err)
}

controller := &Controller{
Base: controller.NewBase(opt, informer.Informer(), controllerAgentName, "Revisions"),
Base: controller.NewBase(opt, controllerAgentName, "Revisions", informers),
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
endpointsSynced: endpointsInformer.Informer().HasSynced,
configMapSynced: configMapInformer.HasSynced,
buildtracker: &buildTracker{builds: map[key]set{}},
resolver: &digestResolver{client: opt.KubeClientSet, transport: http.DefaultTransport},
controllerConfig: controllerConfig,
networkConfig: networkConfig,
}

// Set up an event handler for when the resource types of interest change
controller.Logger.Info("Setting up event handlers")
// Obtain a reference to a shared index informer for the Build type.
buildInformer := buildInformerFactory.Build().V1alpha1().Builds()
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.Enqueue,
UpdateFunc: func(old, new interface{}) {
controller.Enqueue(new)
},
DeleteFunc: controller.Enqueue,
})

buildInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controller.SyncBuild(obj.(*buildv1alpha1.Build))
Expand Down Expand Up @@ -223,7 +226,7 @@ func NewController(
},
})

configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
configMapInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addConfigMapEvent,
UpdateFunc: controller.updateConfigMapEvent,
})
Expand All @@ -236,8 +239,7 @@ func NewController(
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return c.RunController(threadiness, stopCh, []cache.InformerSynced{c.synced, c.endpointsSynced, c.configMapSynced},
c.syncHandler, "Revision")
return c.RunController(threadiness, stopCh, c.syncHandler, "Revision")
}

// loggerWithRevisionInfo enriches the logs with revision name and namespace.
Expand Down
Loading