From ffbebb454d0e03d1afce01690db5b4fcb7e6d312 Mon Sep 17 00:00:00 2001 From: mdemirhan Date: Mon, 14 May 2018 18:51:11 -0700 Subject: [PATCH 1/5] Refactor common functionality in controllers into controller.go and reduce the repetition in various controllers. --- pkg/controller/BUILD.bazel | 10 + pkg/controller/configuration/BUILD.bazel | 9 - pkg/controller/configuration/configuration.go | 205 ++------------- .../configuration/configuration_test.go | 11 +- pkg/controller/controller.go | 190 +++++++++++++- pkg/controller/revision/BUILD.bazel | 10 +- pkg/controller/revision/revision.go | 241 ++---------------- pkg/controller/revision/revision_test.go | 11 +- 8 files changed, 259 insertions(+), 428 deletions(-) diff --git a/pkg/controller/BUILD.bazel b/pkg/controller/BUILD.bazel index 9aaa973ae4f9..f49d433669e5 100644 --- a/pkg/controller/BUILD.bazel +++ b/pkg/controller/BUILD.bazel @@ -14,13 +14,23 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/ela/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned:go_default_library", "//pkg/client/clientset/versioned/scheme:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", "//vendor/github.com/ghodss/yaml:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/configuration/BUILD.bazel b/pkg/controller/configuration/BUILD.bazel index ab0547a585c6..8d7a2f4ca778 100644 --- a/pkg/controller/configuration/BUILD.bazel +++ b/pkg/controller/configuration/BUILD.bazel @@ -8,8 +8,6 @@ go_library( deps = [ "//pkg/apis/ela:go_default_library", "//pkg/apis/ela/v1alpha1:go_default_library", - "//pkg/client/clientset/versioned:go_default_library", - "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/ela/v1alpha1:go_default_library", "//pkg/controller:go_default_library", "//vendor/github.com/elafros/build/pkg/apis/build/v1alpha1:go_default_library", @@ -19,15 +17,8 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/client-go/informers:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/configuration/configuration.go b/pkg/controller/configuration/configuration.go index e7d8144c1be1..b8d117bb174a 100644 --- a/pkg/controller/configuration/configuration.go +++ b/pkg/controller/configuration/configuration.go @@ -18,104 +18,60 @@ package configuration import ( "fmt" - "time" + buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1" + buildclientset "github.com/elafros/build/pkg/client/clientset/versioned" + "github.com/elafros/elafros/pkg/apis/ela" + "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" + listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" - "github.com/golang/glog" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - - buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1" - buildclientset "github.com/elafros/build/pkg/client/clientset/versioned" - "github.com/elafros/elafros/pkg/apis/ela" - "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" - clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" - informers "github.com/elafros/elafros/pkg/client/informers/externalversions" - listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" ) const controllerAgentName = "configuration-controller" // Controller implements the controller for Configuration resources type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - elaclientset clientset.Interface - buildclientset buildclientset.Interface + base *controller.ControllerBase + + buildClientSet buildclientset.Interface // lister indexes properties about Configuration lister listers.ConfigurationLister synced cache.InformerSynced - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder - // don't start the workers until revisions cache have been synced revisionsSynced cache.InformerSynced } // NewController creates a new Configuration controller -//TODO(grantr): somewhat generic (generic behavior) func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, - buildclientset buildclientset.Interface, - kubeInformerFactory kubeinformers.SharedInformerFactory, - elaInformerFactory informers.SharedInformerFactory, + base *controller.ControllerBase, + buildClientSet buildclientset.Interface, config *rest.Config, controllerConfig controller.Config) controller.Interface { // obtain references to a shared index informer for the Configuration // and Revision type. - informer := elaInformerFactory.Elafros().V1alpha1().Configurations() - revisionInformer := elaInformerFactory.Elafros().V1alpha1().Revisions() - - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + informer := base.ElaInformerFactory.Elafros().V1alpha1().Configurations() + revisionInformer := base.ElaInformerFactory.Elafros().V1alpha1().Revisions() + base.Init(controllerAgentName, "Configurations", informer.Informer()) controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, - buildclientset: buildclientset, + base: base, + buildClientSet: buildClientSet, lister: informer.Lister(), synced: informer.Informer().HasSynced, revisionsSynced: revisionInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Configurations"), - recorder: recorder, } glog.Info("Setting up event handlers") - // Set up an event handler for when Configuration resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueConfiguration, - UpdateFunc: func(old, new interface{}) { - controller.enqueueConfiguration(new) - }, - }) - revisionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.addRevisionEvent, UpdateFunc: controller.updateRevisionEvent, @@ -127,121 +83,14 @@ func NewController( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -//TODO(grantr): generic func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Configuration controller") - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - // Wait for the revisions caches to be synced before starting workers - glog.Info("Waiting for revisions informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.revisionsSynced); !ok { - return fmt.Errorf("failed to wait for revisions caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Configuration resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -//TODO(grantr): generic -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. -//TODO(grantr): generic -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - // Run the syncHandler, passing it the namespace/name string of the - // Configuration resource to be synced. - if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil - }(obj) - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueConfiguration takes a Configuration resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Configuration. -//TODO(grantr): generic -func (c *Controller) enqueueConfiguration(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.base.Run(threadiness, stopCh, []cache.InformerSynced{c.synced, c.revisionsSynced}, + c.syncHandler, "Configuration") } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Configuration // resource with the current status of the resource. -//TODO(grantr): not generic func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -288,14 +137,14 @@ func (c *Controller) syncHandler(key string) error { Spec: *config.Spec.Build, } build.OwnerReferences = append(build.OwnerReferences, *controllerRef) - created, err := c.buildclientset.BuildV1alpha1().Builds(build.Namespace).Create(build) + created, err := c.buildClientSet.BuildV1alpha1().Builds(build.Namespace).Create(build) if err != nil { glog.Errorf("Failed to create Build:\n%+v\n%s", build, err) - c.recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err) + c.base.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err) return err } glog.Infof("Created Build:\n%+v", created.Name) - c.recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name) + c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name) spec.BuildName = created.Name } @@ -304,7 +153,7 @@ func (c *Controller) syncHandler(key string) error { return err } - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(config.Namespace) + revClient := c.base.ElaClientSet.ElafrosV1alpha1().Revisions(config.Namespace) created, err := revClient.Get(revName, metav1.GetOptions{}) if err != nil { if !errors.IsNotFound(err) { @@ -339,10 +188,10 @@ func (c *Controller) syncHandler(key string) error { created, err = revClient.Create(rev) if err != nil { glog.Errorf("Failed to create Revision:\n%+v\n%s", rev, err) - c.recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err) + c.base.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err) return err } - c.recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name) + c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name) glog.Infof("Created Revision:\n%+v", created) } else { glog.Infof("Revision already created %s: %s", created.ObjectMeta.Name, err) @@ -374,7 +223,7 @@ func generateRevisionName(u *v1alpha1.Configuration) (string, error) { } func (c *Controller) updateStatus(u *v1alpha1.Configuration) (*v1alpha1.Configuration, error) { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(u.Namespace) + configClient := c.base.ElaClientSet.ElafrosV1alpha1().Configurations(u.Namespace) newu, err := configClient.Get(u.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -423,7 +272,7 @@ func (c *Controller) addRevisionEvent(obj interface{}) { glog.Errorf("Error updating configuration '%s/%s': %v", namespace, configName, err) } - c.recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate", + c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate", "Latest revision of configuration is not ready") } else { @@ -442,10 +291,10 @@ func (c *Controller) addRevisionEvent(obj interface{}) { namespace, configName, err) } if !alreadyReady { - c.recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady", + c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady", "Configuration becomes ready") } - c.recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate", + c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate", "LatestReadyRevisionName updated to %q", revision.Name) } diff --git a/pkg/controller/configuration/configuration_test.go b/pkg/controller/configuration/configuration_test.go index f84f9ecb8206..1a0deacda68a 100644 --- a/pkg/controller/configuration/configuration_test.go +++ b/pkg/controller/configuration/configuration_test.go @@ -143,15 +143,8 @@ func newTestController(t *testing.T, elaObjects ...runtime.Object) ( kubeInformer = kubeinformers.NewSharedInformerFactory(kubeClient, 0) elaInformer = informers.NewSharedInformerFactory(elaClient, 0) - controller = NewController( - kubeClient, - elaClient, - buildClient, - kubeInformer, - elaInformer, - &rest.Config{}, - ctrl.Config{}, - ).(*Controller) + base := ctrl.NewControllerBase(kubeClient, elaClient, kubeInformer, elaInformer) + controller = NewController(base, buildClient, &rest.Config{}, ctrl.Config{}).(*Controller) return } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ff249fd3274d..bc8184ff34f6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -17,9 +17,24 @@ limitations under the License. package controller import ( - "k8s.io/client-go/kubernetes/scheme" + "errors" + "fmt" + "time" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" elascheme "github.com/elafros/elafros/pkg/client/clientset/versioned/scheme" + informers "github.com/elafros/elafros/pkg/client/informers/externalversions" + "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" ) type Interface interface { @@ -31,3 +46,176 @@ func init() { // logged for ela types. elascheme.AddToScheme(scheme.Scheme) } + +type ControllerBase struct { + // kubeClient allows us to talk to the k8s for core APIs + KubeClientSet kubernetes.Interface + + // elaClient allows us to configure Ela objects + ElaClientSet clientset.Interface + + KubeInformerFactory kubeinformers.SharedInformerFactory + + ElaInformerFactory informers.SharedInformerFactory + + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + WorkQueue workqueue.RateLimitingInterface + + initialized bool +} + +func NewControllerBase( + kubeclientset kubernetes.Interface, + elaclientset clientset.Interface, + kubeInformerFactory kubeinformers.SharedInformerFactory, + elaInformerFactory informers.SharedInformerFactory) *ControllerBase { + + return &ControllerBase{ + KubeClientSet: kubeclientset, + ElaClientSet: elaclientset, + KubeInformerFactory: kubeInformerFactory, + ElaInformerFactory: elaInformerFactory, + } +} + +func (c *ControllerBase) Init( + controllerAgentName string, workQueueName string, + informer cache.SharedIndexInformer) error { + if c.initialized { + return errors.New("This instance of ControllerBase is already initialized") + } + + // Create event broadcaster + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClientSet.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + c.Recorder = recorder + c.WorkQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName) + + // Set up an event handler for when the resource types of interest change + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueueWork, + UpdateFunc: func(old, new interface{}) { + c.enqueueWork(new) + }, + }) + + c.initialized = true + return nil +} + +// enqueueWork takes a resource and converts it into a +// namespace/name string which is then put onto the work queue. +func (c *ControllerBase) enqueueWork(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return + } + c.WorkQueue.AddRateLimited(key) +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *ControllerBase) Run( + threadiness int, + stopCh <-chan struct{}, + informersSynced []cache.InformerSynced, + syncHandler func(string) error, + controllerName string) error { + + defer runtime.HandleCrash() + defer c.WorkQueue.ShutDown() + + glog.Infof("Starting %s controller", controllerName) + + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for informer caches to sync") + for i, synced := range informersSynced { + if ok := cache.WaitForCacheSync(stopCh, synced); !ok { + return fmt.Errorf("failed to wait for cache at index %v to sync", i) + } + } + + // Launch workers to process Revision resources + glog.Info("Starting workers") + for i := 0; i < threadiness; i++ { + go wait.Until(func() { + for c.processNextWorkItem(syncHandler) { + } + }, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *ControllerBase) processNextWorkItem(syncHandler func(string) error) bool { + obj, shutdown := c.WorkQueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.base.WorkQueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.WorkQueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.WorkQueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Configuration resource to be synced. + if err := syncHandler(key); err != nil { + return fmt.Errorf("error syncing %q: %v", key, err) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.WorkQueue.Forget(obj) + glog.Infof("Successfully synced %q", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} diff --git a/pkg/controller/revision/BUILD.bazel b/pkg/controller/revision/BUILD.bazel index f2b56a9ab58f..d944f588c14d 100644 --- a/pkg/controller/revision/BUILD.bazel +++ b/pkg/controller/revision/BUILD.bazel @@ -17,8 +17,6 @@ go_library( deps = [ "//pkg/apis/ela:go_default_library", "//pkg/apis/ela/v1alpha1:go_default_library", - "//pkg/client/clientset/versioned:go_default_library", - "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/ela/v1alpha1:go_default_library", "//pkg/controller:go_default_library", "//pkg/queue:go_default_library", @@ -30,7 +28,6 @@ go_library( "//vendor/github.com/josephburnett/k8sflag/pkg/k8sflag:go_default_library", "//vendor/github.com/mattmoor/k8schain:go_default_library", "//vendor/go.opencensus.io/stats:go_default_library", - "//vendor/go.opencensus.io/stats/view:go_default_library", "//vendor/go.opencensus.io/tag:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -40,15 +37,9 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) @@ -67,6 +58,7 @@ go_test( "//pkg/client/informers/externalversions:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/testing:go_default_library", + "//pkg/queue:go_default_library", "//vendor/github.com/elafros/build/pkg/apis/build/v1alpha1:go_default_library", "//vendor/github.com/elafros/build/pkg/client/clientset/versioned/fake:go_default_library", "//vendor/github.com/elafros/build/pkg/client/informers/externalversions:go_default_library", diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index ab1ce1fe7b64..8a3cba709caf 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -17,7 +17,6 @@ limitations under the License. package revision import ( - "context" "fmt" "log" "net/http" @@ -37,26 +36,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1" buildinformers "github.com/elafros/build/pkg/client/informers/externalversions" "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" - clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" - informers "github.com/elafros/elafros/pkg/client/informers/externalversions" listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" ) const ( @@ -84,11 +71,6 @@ var ( elaPodReplicaCount = int32(1) elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 1} elaPodMaxSurge = intstr.IntOrString{Type: intstr.Int, IntVal: 1} - processItemCount = stats.Int64( - "controller_revision_queue_process_count", - "Counter to keep track of items in the revision work queue.", - stats.UnitNone) - statusTagKey tag.Key ) // Helper to make sure we log error messages returned by Reconcile(). @@ -106,26 +88,12 @@ type resolver interface { // Controller implements the controller for Revision resources. // +controller:group=ela,version=v1alpha1,kind=Revision,resource=revisions type Controller struct { - // kubeClient allows us to talk to the k8s for core APIs - kubeclientset kubernetes.Interface - - // elaClient allows us to configure Ela objects - elaclientset clientset.Interface + base *controller.ControllerBase // lister indexes properties about Revision lister listers.RevisionLister synced cache.InformerSynced - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder - buildtracker *buildTracker resolver resolver @@ -177,51 +145,29 @@ type ControllerConfig struct { // config - client configuration for talking to the apiserver // si - informer factory shared across all controllers for listening to events and indexing resource properties // queue - message queue for handling new events. unique to this controller. -//TODO(vaikas): somewhat generic (generic behavior) func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, - kubeInformerFactory kubeinformers.SharedInformerFactory, - elaInformerFactory informers.SharedInformerFactory, + base *controller.ControllerBase, buildInformerFactory buildinformers.SharedInformerFactory, config *rest.Config, controllerConfig *ControllerConfig) controller.Interface { - // obtain references to a shared index informer for the Revision and - // Endpoint type. - informer := elaInformerFactory.Elafros().V1alpha1().Revisions() - endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() - deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() - - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + // obtain references to a shared index informer for the Revision and Endpoint type. + informer := base.ElaInformerFactory.Elafros().V1alpha1().Revisions() + endpointsInformer := base.KubeInformerFactory.Core().V1().Endpoints() + deploymentInformer := base.KubeInformerFactory.Apps().V1().Deployments() + base.Init(controllerAgentName, "Revisions", informer.Informer()) controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, + base: base, lister: informer.Lister(), synced: informer.Informer().HasSynced, endpointsSynced: endpointsInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Revisions"), - recorder: recorder, buildtracker: &buildTracker{builds: map[key]set{}}, - resolver: &digestResolver{client: kubeclientset, transport: http.DefaultTransport}, + resolver: &digestResolver{client: base.KubeClientSet, transport: http.DefaultTransport}, controllerConfig: controllerConfig, } glog.Info("Setting up event handlers") - // Set up an event handler for when Revision resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueRevision, - UpdateFunc: func(old, new interface{}) { - controller.enqueueRevision(new) - }, - }) - // Obtain a reference to a shared index informer for the Build type. buildInformer := buildInformerFactory.Build().V1alpha1().Builds() buildInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -246,145 +192,14 @@ func NewController( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -//TODO(grantr): generic func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Revision controller") - - // Metrics setup: begin - // Create the tag keys that will be used to add tags to our measurements. - var err error - if statusTagKey, err = tag.NewKey("status"); err != nil { - return fmt.Errorf("failed to create tag key in OpenCensus: %v", err) - } - // Create view to see our measurements cumulatively. - countView := &view.View{ - Description: "Counter to keep track of items in the revision work queue.", - Measure: processItemCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{statusTagKey}, - } - if err = view.Register(countView); err != nil { - return fmt.Errorf("failed to register the views in OpenCensus: %v", err) - } - defer view.Unregister(countView) - // Metrics setup: end - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for endpoints informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.endpointsSynced); !ok { - return fmt.Errorf("failed to wait for endpoints caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Revision resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -//TODO(grantr): generic -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. -//TODO(vaikas): generic -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err, processStatus := func(obj interface{}) (error, string) { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil, controller.PromLabelValueInvalid - } - // Run the syncHandler, passing it the namespace/name string of the - // Revision resource to be synced. - if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err), controller.PromLabelValueFailure - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil, controller.PromLabelValueSuccess - }(obj) - - if ctx, tagError := tag.New(context.Background(), tag.Insert(statusTagKey, processStatus)); tagError == nil { - // Increment the request count by one. - stats.Record(ctx, processItemCount.M(1)) - } - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueRevision takes a Revision resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Revision. -//TODO(grantr): generic -func (c *Controller) enqueueRevision(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.base.Run(threadiness, stopCh, []cache.InformerSynced{c.synced, c.endpointsSynced}, + c.syncHandler, "Revision") } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Revision resource // with the current status of the resource. -//TODO(grantr): not generic func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -432,7 +247,7 @@ func (c *Controller) syncHandler(key string) error { } } - ns, err := controller.GetOrCreateRevisionNamespace(namespace, c.kubeclientset) + ns, err := controller.GetOrCreateRevisionNamespace(namespace, c.base.KubeClientSet) if err != nil { log.Printf("Failed to create namespace: %s", err) panic("Failed to create namespace") @@ -546,7 +361,7 @@ func (c *Controller) markBuildComplete(rev *v1alpha1.Revision, bc *buildv1alpha1 Type: v1alpha1.RevisionConditionBuildSucceeded, Status: corev1.ConditionTrue, }) - c.recorder.Event(rev, corev1.EventTypeNormal, "BuildComplete", bc.Message) + c.base.Recorder.Event(rev, corev1.EventTypeNormal, "BuildComplete", bc.Message) case buildv1alpha1.BuildFailed, buildv1alpha1.BuildInvalid: rev.Status.SetCondition( &v1alpha1.RevisionCondition{ @@ -562,7 +377,7 @@ func (c *Controller) markBuildComplete(rev *v1alpha1.Revision, bc *buildv1alpha1 Reason: bc.Reason, Message: bc.Message, }) - c.recorder.Event(rev, corev1.EventTypeWarning, "BuildFailed", bc.Message) + c.base.Recorder.Event(rev, corev1.EventTypeWarning, "BuildFailed", bc.Message) } // This will trigger a reconciliation that will cause us to stop tracking the build. _, err := c.updateStatus(rev) @@ -676,7 +491,7 @@ func (c *Controller) addDeploymentProgressEvent(obj interface{}) { glog.Errorf("Error recording revision completion: %s", err) return } - c.recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", "Revision %s not ready due to Deployment timeout", revName) + c.base.Recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", "Revision %s not ready due to Deployment timeout", revName) return } @@ -721,7 +536,7 @@ func (c *Controller) addEndpointsEvent(obj interface{}) { glog.Errorf("Error marking revision ready for '%s/%s': %v", namespace, revName, err) return } - c.recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name) + c.base.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name) return } @@ -734,7 +549,7 @@ func (c *Controller) addEndpointsEvent(obj interface{}) { glog.Errorf("Error marking revision failed for '%s/%s': %v", namespace, revName, err) return } - c.recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name) + c.base.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name) return } @@ -872,7 +687,7 @@ func (c *Controller) createK8SResources(rev *v1alpha1.Revision, ns string) error func (c *Controller) deleteDeployment(rev *v1alpha1.Revision, ns string) error { deploymentName := controller.GetRevisionDeploymentName(rev) - dc := c.kubeclientset.AppsV1().Deployments(ns) + dc := c.base.KubeClientSet.AppsV1().Deployments(ns) if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil && apierrs.IsNotFound(err) { return nil } @@ -890,7 +705,7 @@ func (c *Controller) deleteDeployment(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) reconcileDeployment(rev *v1alpha1.Revision, ns string) error { - dc := c.kubeclientset.AppsV1().Deployments(ns) + dc := c.base.KubeClientSet.AppsV1().Deployments(ns) // First, check if deployment exists already. deploymentName := controller.GetRevisionDeploymentName(rev) @@ -952,7 +767,7 @@ func (c *Controller) reconcileDeployment(rev *v1alpha1.Revision, ns string) erro } func (c *Controller) deleteService(rev *v1alpha1.Revision, ns string) error { - sc := c.kubeclientset.Core().Services(ns) + sc := c.base.KubeClientSet.Core().Services(ns) serviceName := controller.GetElaK8SServiceNameForRevision(rev) log.Printf("Deleting service %q", serviceName) @@ -968,7 +783,7 @@ func (c *Controller) deleteService(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) reconcileService(rev *v1alpha1.Revision, ns string) (string, error) { - sc := c.kubeclientset.Core().Services(ns) + sc := c.base.KubeClientSet.Core().Services(ns) serviceName := controller.GetElaK8SServiceNameForRevision(rev) if _, err := sc.Get(serviceName, metav1.GetOptions{}); err != nil { @@ -994,7 +809,7 @@ func (c *Controller) reconcileService(rev *v1alpha1.Revision, ns string) (string func (c *Controller) reconcileFluentdConfigMap(rev *v1alpha1.Revision) error { ns := rev.Namespace - cmc := c.kubeclientset.Core().ConfigMaps(ns) + cmc := c.base.KubeClientSet.Core().ConfigMaps(ns) _, err := cmc.Get(fluentdConfigMapName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -1017,7 +832,7 @@ func (c *Controller) reconcileFluentdConfigMap(rev *v1alpha1.Revision) error { func (c *Controller) deleteAutoscalerService(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - sc := c.kubeclientset.Core().Services(AutoscalerNamespace) + sc := c.base.KubeClientSet.Core().Services(AutoscalerNamespace) if _, err := sc.Get(autoscalerName, metav1.GetOptions{}); err != nil && apierrs.IsNotFound(err) { return nil } @@ -1035,7 +850,7 @@ func (c *Controller) deleteAutoscalerService(rev *v1alpha1.Revision) error { func (c *Controller) reconcileAutoscalerService(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - sc := c.kubeclientset.Core().Services(AutoscalerNamespace) + sc := c.base.KubeClientSet.Core().Services(AutoscalerNamespace) _, err := sc.Get(autoscalerName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -1058,7 +873,7 @@ func (c *Controller) reconcileAutoscalerService(rev *v1alpha1.Revision) error { func (c *Controller) deleteAutoscalerDeployment(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - dc := c.kubeclientset.AppsV1().Deployments(AutoscalerNamespace) + dc := c.base.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) _, err := dc.Get(autoscalerName, metav1.GetOptions{}) if err != nil && apierrs.IsNotFound(err) { return nil @@ -1077,7 +892,7 @@ func (c *Controller) deleteAutoscalerDeployment(rev *v1alpha1.Revision) error { func (c *Controller) reconcileAutoscalerDeployment(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - dc := c.kubeclientset.AppsV1().Deployments(AutoscalerNamespace) + dc := c.base.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) _, err := dc.Get(autoscalerName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -1112,7 +927,7 @@ func (c *Controller) removeFinalizers(rev *v1alpha1.Revision, ns string) error { } } accessor.SetFinalizers(finalizers) - prClient := c.elaclientset.ElafrosV1alpha1().Revisions(rev.Namespace) + prClient := c.base.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) prClient.Update(rev) log.Printf("The finalizer 'controller' is removed.") @@ -1120,7 +935,7 @@ func (c *Controller) removeFinalizers(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) updateStatus(rev *v1alpha1.Revision) (*v1alpha1.Revision, error) { - prClient := c.elaclientset.ElafrosV1alpha1().Revisions(rev.Namespace) + prClient := c.base.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) newRev, err := prClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { return nil, err diff --git a/pkg/controller/revision/revision_test.go b/pkg/controller/revision/revision_test.go index 790a797c6fbd..699def4af4e8 100644 --- a/pkg/controller/revision/revision_test.go +++ b/pkg/controller/revision/revision_test.go @@ -225,15 +225,8 @@ func newTestControllerWithConfig(t *testing.T, controllerConfig *ControllerConfi buildInformer := buildinformers.NewSharedInformerFactory(buildClient, 0) elaInformer = informers.NewSharedInformerFactory(elaClient, 0) - controller = NewController( - kubeClient, - elaClient, - kubeInformer, - elaInformer, - buildInformer, - &rest.Config{}, - controllerConfig, - ).(*Controller) + base := ctrl.NewControllerBase(kubeClient, elaClient, kubeInformer, elaInformer) + controller = NewController(base, buildInformer, &rest.Config{}, controllerConfig).(*Controller) controller.resolver = &nopResolver{} From cffd0948644634d695f80731be6fdda9e97929a3 Mon Sep 17 00:00:00 2001 From: mdemirhan Date: Tue, 15 May 2018 11:39:06 -0700 Subject: [PATCH 2/5] Address PR comments. --- pkg/controller/configuration/BUILD.bazel | 4 ++ pkg/controller/configuration/configuration.go | 41 +++++++------ .../configuration/configuration_test.go | 11 +++- pkg/controller/controller.go | 48 +++++++-------- pkg/controller/revision/BUILD.bazel | 5 +- pkg/controller/revision/revision.go | 59 +++++++++++-------- pkg/controller/revision/revision_test.go | 11 +++- 7 files changed, 102 insertions(+), 77 deletions(-) diff --git a/pkg/controller/configuration/BUILD.bazel b/pkg/controller/configuration/BUILD.bazel index 8d7a2f4ca778..aa1326d24414 100644 --- a/pkg/controller/configuration/BUILD.bazel +++ b/pkg/controller/configuration/BUILD.bazel @@ -8,6 +8,8 @@ go_library( deps = [ "//pkg/apis/ela:go_default_library", "//pkg/apis/ela/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/ela/v1alpha1:go_default_library", "//pkg/controller:go_default_library", "//vendor/github.com/elafros/build/pkg/apis/build/v1alpha1:go_default_library", @@ -17,6 +19,8 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/pkg/controller/configuration/configuration.go b/pkg/controller/configuration/configuration.go index b8d117bb174a..d6f0dfe7fdbd 100644 --- a/pkg/controller/configuration/configuration.go +++ b/pkg/controller/configuration/configuration.go @@ -23,6 +23,8 @@ import ( buildclientset "github.com/elafros/build/pkg/client/clientset/versioned" "github.com/elafros/elafros/pkg/apis/ela" "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" + informers "github.com/elafros/elafros/pkg/client/informers/externalversions" listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" "github.com/golang/glog" @@ -30,6 +32,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) @@ -38,7 +42,7 @@ const controllerAgentName = "configuration-controller" // Controller implements the controller for Configuration resources type Controller struct { - base *controller.ControllerBase + *controller.ControllerBase buildClientSet buildclientset.Interface @@ -52,19 +56,22 @@ type Controller struct { // NewController creates a new Configuration controller func NewController( - base *controller.ControllerBase, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, buildClientSet buildclientset.Interface, + kubeInformerFactory kubeinformers.SharedInformerFactory, + elaInformerFactory informers.SharedInformerFactory, config *rest.Config, controllerConfig controller.Config) controller.Interface { // obtain references to a shared index informer for the Configuration // and Revision type. - informer := base.ElaInformerFactory.Elafros().V1alpha1().Configurations() - revisionInformer := base.ElaInformerFactory.Elafros().V1alpha1().Revisions() + informer := elaInformerFactory.Elafros().V1alpha1().Configurations() + revisionInformer := elaInformerFactory.Elafros().V1alpha1().Revisions() - base.Init(controllerAgentName, "Configurations", informer.Informer()) controller := &Controller{ - base: base, + ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Configurations"), buildClientSet: buildClientSet, lister: informer.Lister(), synced: informer.Informer().HasSynced, @@ -84,8 +91,8 @@ func NewController( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - return c.base.Run(threadiness, stopCh, []cache.InformerSynced{c.synced, c.revisionsSynced}, - c.syncHandler, "Configuration") + return c.RunController(threadiness, stopCh, + []cache.InformerSynced{c.synced, c.revisionsSynced}, c.syncHandler, "Configuration") } // syncHandler compares the actual state with the desired, and attempts to @@ -140,11 +147,11 @@ func (c *Controller) syncHandler(key string) error { created, err := c.buildClientSet.BuildV1alpha1().Builds(build.Namespace).Create(build) if err != nil { glog.Errorf("Failed to create Build:\n%+v\n%s", build, err) - c.base.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err) + c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err) return err } glog.Infof("Created Build:\n%+v", created.Name) - c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name) + c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name) spec.BuildName = created.Name } @@ -153,7 +160,7 @@ func (c *Controller) syncHandler(key string) error { return err } - revClient := c.base.ElaClientSet.ElafrosV1alpha1().Revisions(config.Namespace) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(config.Namespace) created, err := revClient.Get(revName, metav1.GetOptions{}) if err != nil { if !errors.IsNotFound(err) { @@ -188,10 +195,10 @@ func (c *Controller) syncHandler(key string) error { created, err = revClient.Create(rev) if err != nil { glog.Errorf("Failed to create Revision:\n%+v\n%s", rev, err) - c.base.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err) + c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err) return err } - c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name) + c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name) glog.Infof("Created Revision:\n%+v", created) } else { glog.Infof("Revision already created %s: %s", created.ObjectMeta.Name, err) @@ -223,7 +230,7 @@ func generateRevisionName(u *v1alpha1.Configuration) (string, error) { } func (c *Controller) updateStatus(u *v1alpha1.Configuration) (*v1alpha1.Configuration, error) { - configClient := c.base.ElaClientSet.ElafrosV1alpha1().Configurations(u.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(u.Namespace) newu, err := configClient.Get(u.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -272,7 +279,7 @@ func (c *Controller) addRevisionEvent(obj interface{}) { glog.Errorf("Error updating configuration '%s/%s': %v", namespace, configName, err) } - c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate", + c.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate", "Latest revision of configuration is not ready") } else { @@ -291,10 +298,10 @@ func (c *Controller) addRevisionEvent(obj interface{}) { namespace, configName, err) } if !alreadyReady { - c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady", + c.Recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady", "Configuration becomes ready") } - c.base.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate", + c.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate", "LatestReadyRevisionName updated to %q", revision.Name) } diff --git a/pkg/controller/configuration/configuration_test.go b/pkg/controller/configuration/configuration_test.go index 1a0deacda68a..f84f9ecb8206 100644 --- a/pkg/controller/configuration/configuration_test.go +++ b/pkg/controller/configuration/configuration_test.go @@ -143,8 +143,15 @@ func newTestController(t *testing.T, elaObjects ...runtime.Object) ( kubeInformer = kubeinformers.NewSharedInformerFactory(kubeClient, 0) elaInformer = informers.NewSharedInformerFactory(elaClient, 0) - base := ctrl.NewControllerBase(kubeClient, elaClient, kubeInformer, elaInformer) - controller = NewController(base, buildClient, &rest.Config{}, ctrl.Config{}).(*Controller) + controller = NewController( + kubeClient, + elaClient, + buildClient, + kubeInformer, + elaInformer, + &rest.Config{}, + ctrl.Config{}, + ).(*Controller) return } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bc8184ff34f6..b386fd1111f0 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -17,7 +17,6 @@ limitations under the License. package controller import ( - "errors" "fmt" "time" @@ -73,46 +72,39 @@ type ControllerBase struct { } func NewControllerBase( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, - elaInformerFactory informers.SharedInformerFactory) *ControllerBase { - - return &ControllerBase{ - KubeClientSet: kubeclientset, - ElaClientSet: elaclientset, - KubeInformerFactory: kubeInformerFactory, - ElaInformerFactory: elaInformerFactory, - } -} - -func (c *ControllerBase) Init( - controllerAgentName string, workQueueName string, - informer cache.SharedIndexInformer) error { - if c.initialized { - return errors.New("This instance of ControllerBase is already initialized") - } + elaInformerFactory informers.SharedInformerFactory, + informer cache.SharedIndexInformer, + controllerAgentName string, + workQueueName string) *ControllerBase { // Create event broadcaster glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClientSet.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - c.Recorder = recorder - c.WorkQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName) + base := &ControllerBase{ + KubeClientSet: kubeClientSet, + ElaClientSet: elaClientSet, + KubeInformerFactory: kubeInformerFactory, + ElaInformerFactory: elaInformerFactory, + Recorder: recorder, + WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), + } // Set up an event handler for when the resource types of interest change informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueWork, + AddFunc: base.enqueueWork, UpdateFunc: func(old, new interface{}) { - c.enqueueWork(new) + base.enqueueWork(new) }, }) - c.initialized = true - return nil + return base } // enqueueWork takes a resource and converts it into a @@ -127,11 +119,11 @@ func (c *ControllerBase) enqueueWork(obj interface{}) { c.WorkQueue.AddRateLimited(key) } -// Run will set up the event handlers for types we are interested in, as well +// RunSyncHandler will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (c *ControllerBase) Run( +func (c *ControllerBase) RunController( threadiness int, stopCh <-chan struct{}, informersSynced []cache.InformerSynced, diff --git a/pkg/controller/revision/BUILD.bazel b/pkg/controller/revision/BUILD.bazel index d944f588c14d..ed747bec250c 100644 --- a/pkg/controller/revision/BUILD.bazel +++ b/pkg/controller/revision/BUILD.bazel @@ -17,6 +17,8 @@ go_library( deps = [ "//pkg/apis/ela:go_default_library", "//pkg/apis/ela/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", "//pkg/client/listers/ela/v1alpha1:go_default_library", "//pkg/controller:go_default_library", "//pkg/queue:go_default_library", @@ -27,8 +29,6 @@ go_library( "//vendor/github.com/google/go-containerregistry/v1/remote:go_default_library", "//vendor/github.com/josephburnett/k8sflag/pkg/k8sflag:go_default_library", "//vendor/github.com/mattmoor/k8schain:go_default_library", - "//vendor/go.opencensus.io/stats:go_default_library", - "//vendor/go.opencensus.io/tag:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -37,6 +37,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index 8a3cba709caf..0896194c5f2d 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -26,16 +26,20 @@ import ( "github.com/elafros/elafros/pkg/apis/ela" "github.com/josephburnett/k8sflag/pkg/k8sflag" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" "github.com/golang/glog" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + kubeinformers "k8s.io/client-go/informers" + informers "github.com/elafros/elafros/pkg/client/informers/externalversions" "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -88,7 +92,7 @@ type resolver interface { // Controller implements the controller for Revision resources. // +controller:group=ela,version=v1alpha1,kind=Revision,resource=revisions type Controller struct { - base *controller.ControllerBase + *controller.ControllerBase // lister indexes properties about Revision lister listers.RevisionLister @@ -146,24 +150,27 @@ type ControllerConfig struct { // si - informer factory shared across all controllers for listening to events and indexing resource properties // queue - message queue for handling new events. unique to this controller. func NewController( - base *controller.ControllerBase, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, + kubeInformerFactory kubeinformers.SharedInformerFactory, + elaInformerFactory informers.SharedInformerFactory, buildInformerFactory buildinformers.SharedInformerFactory, config *rest.Config, controllerConfig *ControllerConfig) controller.Interface { // obtain references to a shared index informer for the Revision and Endpoint type. - informer := base.ElaInformerFactory.Elafros().V1alpha1().Revisions() - endpointsInformer := base.KubeInformerFactory.Core().V1().Endpoints() - deploymentInformer := base.KubeInformerFactory.Apps().V1().Deployments() + informer := elaInformerFactory.Elafros().V1alpha1().Revisions() + endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() + deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() - base.Init(controllerAgentName, "Revisions", informer.Informer()) controller := &Controller{ - base: base, + ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Revisions"), lister: informer.Lister(), synced: informer.Informer().HasSynced, endpointsSynced: endpointsInformer.Informer().HasSynced, buildtracker: &buildTracker{builds: map[key]set{}}, - resolver: &digestResolver{client: base.KubeClientSet, transport: http.DefaultTransport}, + resolver: &digestResolver{client: kubeClientSet, transport: http.DefaultTransport}, controllerConfig: controllerConfig, } @@ -193,7 +200,7 @@ func NewController( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - return c.base.Run(threadiness, stopCh, []cache.InformerSynced{c.synced, c.endpointsSynced}, + return c.RunController(threadiness, stopCh, []cache.InformerSynced{c.synced, c.endpointsSynced}, c.syncHandler, "Revision") } @@ -247,7 +254,7 @@ func (c *Controller) syncHandler(key string) error { } } - ns, err := controller.GetOrCreateRevisionNamespace(namespace, c.base.KubeClientSet) + ns, err := controller.GetOrCreateRevisionNamespace(namespace, c.KubeClientSet) if err != nil { log.Printf("Failed to create namespace: %s", err) panic("Failed to create namespace") @@ -361,7 +368,7 @@ func (c *Controller) markBuildComplete(rev *v1alpha1.Revision, bc *buildv1alpha1 Type: v1alpha1.RevisionConditionBuildSucceeded, Status: corev1.ConditionTrue, }) - c.base.Recorder.Event(rev, corev1.EventTypeNormal, "BuildComplete", bc.Message) + c.Recorder.Event(rev, corev1.EventTypeNormal, "BuildComplete", bc.Message) case buildv1alpha1.BuildFailed, buildv1alpha1.BuildInvalid: rev.Status.SetCondition( &v1alpha1.RevisionCondition{ @@ -377,7 +384,7 @@ func (c *Controller) markBuildComplete(rev *v1alpha1.Revision, bc *buildv1alpha1 Reason: bc.Reason, Message: bc.Message, }) - c.base.Recorder.Event(rev, corev1.EventTypeWarning, "BuildFailed", bc.Message) + c.Recorder.Event(rev, corev1.EventTypeWarning, "BuildFailed", bc.Message) } // This will trigger a reconciliation that will cause us to stop tracking the build. _, err := c.updateStatus(rev) @@ -491,7 +498,7 @@ func (c *Controller) addDeploymentProgressEvent(obj interface{}) { glog.Errorf("Error recording revision completion: %s", err) return } - c.base.Recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", "Revision %s not ready due to Deployment timeout", revName) + c.Recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", "Revision %s not ready due to Deployment timeout", revName) return } @@ -536,7 +543,7 @@ func (c *Controller) addEndpointsEvent(obj interface{}) { glog.Errorf("Error marking revision ready for '%s/%s': %v", namespace, revName, err) return } - c.base.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name) + c.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name) return } @@ -549,7 +556,7 @@ func (c *Controller) addEndpointsEvent(obj interface{}) { glog.Errorf("Error marking revision failed for '%s/%s': %v", namespace, revName, err) return } - c.base.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name) + c.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name) return } @@ -687,7 +694,7 @@ func (c *Controller) createK8SResources(rev *v1alpha1.Revision, ns string) error func (c *Controller) deleteDeployment(rev *v1alpha1.Revision, ns string) error { deploymentName := controller.GetRevisionDeploymentName(rev) - dc := c.base.KubeClientSet.AppsV1().Deployments(ns) + dc := c.KubeClientSet.AppsV1().Deployments(ns) if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil && apierrs.IsNotFound(err) { return nil } @@ -705,7 +712,7 @@ func (c *Controller) deleteDeployment(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) reconcileDeployment(rev *v1alpha1.Revision, ns string) error { - dc := c.base.KubeClientSet.AppsV1().Deployments(ns) + dc := c.KubeClientSet.AppsV1().Deployments(ns) // First, check if deployment exists already. deploymentName := controller.GetRevisionDeploymentName(rev) @@ -767,7 +774,7 @@ func (c *Controller) reconcileDeployment(rev *v1alpha1.Revision, ns string) erro } func (c *Controller) deleteService(rev *v1alpha1.Revision, ns string) error { - sc := c.base.KubeClientSet.Core().Services(ns) + sc := c.KubeClientSet.Core().Services(ns) serviceName := controller.GetElaK8SServiceNameForRevision(rev) log.Printf("Deleting service %q", serviceName) @@ -783,7 +790,7 @@ func (c *Controller) deleteService(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) reconcileService(rev *v1alpha1.Revision, ns string) (string, error) { - sc := c.base.KubeClientSet.Core().Services(ns) + sc := c.KubeClientSet.Core().Services(ns) serviceName := controller.GetElaK8SServiceNameForRevision(rev) if _, err := sc.Get(serviceName, metav1.GetOptions{}); err != nil { @@ -809,7 +816,7 @@ func (c *Controller) reconcileService(rev *v1alpha1.Revision, ns string) (string func (c *Controller) reconcileFluentdConfigMap(rev *v1alpha1.Revision) error { ns := rev.Namespace - cmc := c.base.KubeClientSet.Core().ConfigMaps(ns) + cmc := c.KubeClientSet.Core().ConfigMaps(ns) _, err := cmc.Get(fluentdConfigMapName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -832,7 +839,7 @@ func (c *Controller) reconcileFluentdConfigMap(rev *v1alpha1.Revision) error { func (c *Controller) deleteAutoscalerService(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - sc := c.base.KubeClientSet.Core().Services(AutoscalerNamespace) + sc := c.KubeClientSet.Core().Services(AutoscalerNamespace) if _, err := sc.Get(autoscalerName, metav1.GetOptions{}); err != nil && apierrs.IsNotFound(err) { return nil } @@ -850,7 +857,7 @@ func (c *Controller) deleteAutoscalerService(rev *v1alpha1.Revision) error { func (c *Controller) reconcileAutoscalerService(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - sc := c.base.KubeClientSet.Core().Services(AutoscalerNamespace) + sc := c.KubeClientSet.Core().Services(AutoscalerNamespace) _, err := sc.Get(autoscalerName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -873,7 +880,7 @@ func (c *Controller) reconcileAutoscalerService(rev *v1alpha1.Revision) error { func (c *Controller) deleteAutoscalerDeployment(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - dc := c.base.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) + dc := c.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) _, err := dc.Get(autoscalerName, metav1.GetOptions{}) if err != nil && apierrs.IsNotFound(err) { return nil @@ -892,7 +899,7 @@ func (c *Controller) deleteAutoscalerDeployment(rev *v1alpha1.Revision) error { func (c *Controller) reconcileAutoscalerDeployment(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - dc := c.base.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) + dc := c.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) _, err := dc.Get(autoscalerName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -927,7 +934,7 @@ func (c *Controller) removeFinalizers(rev *v1alpha1.Revision, ns string) error { } } accessor.SetFinalizers(finalizers) - prClient := c.base.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) + prClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) prClient.Update(rev) log.Printf("The finalizer 'controller' is removed.") @@ -935,7 +942,7 @@ func (c *Controller) removeFinalizers(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) updateStatus(rev *v1alpha1.Revision) (*v1alpha1.Revision, error) { - prClient := c.base.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) + prClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) newRev, err := prClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { return nil, err diff --git a/pkg/controller/revision/revision_test.go b/pkg/controller/revision/revision_test.go index 699def4af4e8..790a797c6fbd 100644 --- a/pkg/controller/revision/revision_test.go +++ b/pkg/controller/revision/revision_test.go @@ -225,8 +225,15 @@ func newTestControllerWithConfig(t *testing.T, controllerConfig *ControllerConfi buildInformer := buildinformers.NewSharedInformerFactory(buildClient, 0) elaInformer = informers.NewSharedInformerFactory(elaClient, 0) - base := ctrl.NewControllerBase(kubeClient, elaClient, kubeInformer, elaInformer) - controller = NewController(base, buildInformer, &rest.Config{}, controllerConfig).(*Controller) + controller = NewController( + kubeClient, + elaClient, + kubeInformer, + elaInformer, + buildInformer, + &rest.Config{}, + controllerConfig, + ).(*Controller) controller.resolver = &nopResolver{} From e4fcc15a48f25ed5be998684f2f10af7f54d6a52 Mon Sep 17 00:00:00 2001 From: mdemirhan Date: Tue, 15 May 2018 11:54:31 -0700 Subject: [PATCH 3/5] Documentation fixes. --- pkg/controller/controller.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b386fd1111f0..21ac6ece6d5b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/util/workqueue" ) +// Interface defines the controller interface type Interface interface { Run(threadiness int, stopCh <-chan struct{}) error } @@ -46,31 +47,37 @@ func init() { elascheme.AddToScheme(scheme.Scheme) } +// ControllerBase implements most of the boilerplate and common code +// we have in our controllers. type ControllerBase struct { - // kubeClient allows us to talk to the k8s for core APIs + // KubeClientSet allows us to talk to the k8s for core APIs KubeClientSet kubernetes.Interface - // elaClient allows us to configure Ela objects + // ElaClientSet allows us to configure Ela objects ElaClientSet clientset.Interface + // KubeInformerFactory provides shared informers for resources + // in all known API group versions KubeInformerFactory kubeinformers.SharedInformerFactory + // ElaInformerFactory provides shared informers for resources + // in all known API group versions ElaInformerFactory informers.SharedInformerFactory - // recorder is an event recorder for recording Event resources to the + // Recorder is an event recorder for recording Event resources to the // Kubernetes API. Recorder record.EventRecorder - // workqueue is a rate limited work queue. This is used to queue work to be + // WorkQueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. WorkQueue workqueue.RateLimitingInterface - - initialized bool } +// NewControllerBase instantiates a new instance of ControllerBase implementing +// the common & boilerplate code between our controllers. func NewControllerBase( kubeClientSet kubernetes.Interface, elaClientSet clientset.Interface, @@ -119,7 +126,7 @@ func (c *ControllerBase) enqueueWork(obj interface{}) { c.WorkQueue.AddRateLimited(key) } -// RunSyncHandler will set up the event handlers for types we are interested in, as well +// RunController will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. From bb3093890a60c04f3b448b6cb1576afd0bf6bd13 Mon Sep 17 00:00:00 2001 From: mdemirhan Date: Tue, 15 May 2018 13:44:15 -0700 Subject: [PATCH 4/5] Refactor the remaining two controllers. --- pkg/controller/route/BUILD.bazel | 6 - pkg/controller/route/route.go | 241 +++++------------------------ pkg/controller/service/BUILD.bazel | 5 - pkg/controller/service/service.go | 172 ++------------------ 4 files changed, 49 insertions(+), 375 deletions(-) diff --git a/pkg/controller/route/BUILD.bazel b/pkg/controller/route/BUILD.bazel index 686152ed3aad..e612a0b155bb 100644 --- a/pkg/controller/route/BUILD.bazel +++ b/pkg/controller/route/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/josephburnett/k8sflag/pkg/k8sflag:go_default_library", "//vendor/go.opencensus.io/stats:go_default_library", - "//vendor/go.opencensus.io/stats/view:go_default_library", "//vendor/go.opencensus.io/tag:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", @@ -29,15 +28,10 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/route/route.go b/pkg/controller/route/route.go index 931b50761ba1..8e28bd3d9932 100644 --- a/pkg/controller/route/route.go +++ b/pkg/controller/route/route.go @@ -17,11 +17,9 @@ limitations under the License. package route import ( - "context" "errors" "fmt" "reflect" - "time" "github.com/golang/glog" "github.com/josephburnett/k8sflag/pkg/k8sflag" @@ -30,15 +28,10 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" "github.com/elafros/elafros/pkg/apis/ela" "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" @@ -47,7 +40,6 @@ import ( listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) @@ -84,24 +76,12 @@ type RevisionRoute struct { // Controller implements the controller for Route resources. // +controller:group=ela,version=v1alpha1,kind=Route,resource=routes type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - elaclientset clientset.Interface + *controller.ControllerBase // lister indexes properties about Route lister listers.RouteLister synced cache.InformerSynced - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder - // don't start the workers until configuration cache have been synced configSynced cache.InformerSynced @@ -119,8 +99,8 @@ type Controller struct { // reconcileKey - function for mapping queue keys to resource names //TODO(vaikas): somewhat generic (generic behavior) func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, elaInformerFactory informers.SharedInformerFactory, config *rest.Config, @@ -135,33 +115,17 @@ func NewController( configInformer := elaInformerFactory.Elafros().V1alpha1().Configurations() ingressInformer := kubeInformerFactory.Extensions().V1beta1().Ingresses() - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, + ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Routes"), lister: informer.Lister(), synced: informer.Informer().HasSynced, configSynced: configInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Routes"), - recorder: recorder, controllerConfig: controllerConfig, enableScaleToZero: enableScaleToZero, } glog.Info("Setting up event handlers") - // Set up an event handler for when Route resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueRoute, - UpdateFunc: func(old, new interface{}) { - controller.enqueueRoute(new) - }, - }) configInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.addConfigurationEvent, UpdateFunc: controller.updateConfigurationEvent, @@ -176,145 +140,14 @@ func NewController( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -//TODO(grantr): generic func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Route controller") - - // Metrics setup: begin - // Create the tag keys that will be used to add tags to our measurements. - var err error - if statusTagKey, err = tag.NewKey("status"); err != nil { - return fmt.Errorf("failed to create tag key in OpenCensus: %v", err) - } - // Create view to see our measurements cumulatively. - countView := &view.View{ - Description: "Counter to keep track of items in the route work queue.", - Measure: processItemCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{statusTagKey}, - } - if err = view.Register(countView); err != nil { - return fmt.Errorf("failed to register the views in OpenCensus: %v", err) - } - defer view.Unregister(countView) - // Metrics setup: end - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - // Wait for the configuration caches to be synced before starting workers - glog.Info("Waiting for configuration informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.configSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Route resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -//TODO(grantr): generic -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the updateRouteEvent. -//TODO(grantr): generic -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err, processStatus := func(obj interface{}) (error, string) { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil, controller.PromLabelValueInvalid - } - // Run the updateRouteEvent passing it the namespace/name string of the - // Route resource to be synced. - if err := c.updateRouteEvent(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err), controller.PromLabelValueFailure - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil, controller.PromLabelValueSuccess - }(obj) - - if ctx, tagError := tag.New(context.Background(), tag.Insert(statusTagKey, processStatus)); tagError == nil { - // Increment the request count by one. - stats.Record(ctx, processItemCount.M(1)) - } - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueRoute takes a Route resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Route. -//TODO(grantr): generic -func (c *Controller) enqueueRoute(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.RunController(threadiness, stopCh, []cache.InformerSynced{c.synced, c.configSynced}, + c.updateRouteEvent, "Route") } // updateRouteEvent compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Route resource // with the current status of the resource. -//TODO(grantr): not generic func (c *Controller) updateRouteEvent(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -429,26 +262,26 @@ func (c *Controller) syncTrafficTargetsAndUpdateRouteStatus(route *v1alpha1.Rout updated, err := c.updateStatus(route) if err != nil { glog.Warningf("Failed to update service status: %s", err) - c.recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update status for route %q: %v", route.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update status for route %q: %v", route.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated status for route %q", route.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated status for route %q", route.Name) return updated, nil } func (c *Controller) reconcilePlaceholderService(route *v1alpha1.Route) error { service := MakeRouteK8SService(route) - if _, err := c.kubeclientset.Core().Services(route.Namespace).Create(service); err != nil { + if _, err := c.KubeClientSet.Core().Services(route.Namespace).Create(service); err != nil { if apierrs.IsAlreadyExists(err) { // Service already exist. return nil } glog.Infof("Failed to create service: %s", err) - c.recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create service %q: %v", service.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create service %q: %v", service.Name, err) return err } glog.Infof("Created service: %q", service.Name) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created service %q", service.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created service %q", service.Name) return nil } @@ -456,13 +289,13 @@ func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { ingressNamespace := route.Namespace ingressName := controller.GetElaK8SIngressName(route) ingress := MakeRouteIngress(route) - ingressClient := c.kubeclientset.Extensions().Ingresses(ingressNamespace) + ingressClient := c.KubeClientSet.Extensions().Ingresses(ingressNamespace) existing, err := ingressClient.Get(ingressName, metav1.GetOptions{}) if err != nil { if apierrs.IsNotFound(err) { if _, err = ingressClient.Create(ingress); err == nil { glog.Infof("Created ingress %q in namespace %q", ingressName, ingressNamespace) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Ingress %q in namespace %q", ingressName, ingressNamespace) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Ingress %q in namespace %q", ingressName, ingressNamespace) } } return err @@ -472,7 +305,7 @@ func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { existing.Spec = ingress.Spec if _, err = ingressClient.Update(existing); err == nil { glog.Infof("Updated ingress %q in namespace %q", ingressName, ingressNamespace) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Ingress %q in namespace %q", ingressName, ingressNamespace) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Ingress %q in namespace %q", ingressName, ingressNamespace) } return err } @@ -482,8 +315,8 @@ func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { func (c *Controller) getDirectTrafficTargets(route *v1alpha1.Route) ( map[string]*v1alpha1.Configuration, map[string]*v1alpha1.Revision, error) { ns := route.Namespace - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(ns) - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(ns) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(ns) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(ns) configMap := map[string]*v1alpha1.Configuration{} revMap := map[string]*v1alpha1.Revision{} @@ -513,7 +346,7 @@ func (c *Controller) getDirectTrafficTargets(route *v1alpha1.Route) ( func (c *Controller) extendConfigurationsWithIndirectTrafficTargets( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) error { ns := route.Namespace - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(ns) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(ns) // Get indirect configurations. for _, rev := range revMap { @@ -538,7 +371,7 @@ func (c *Controller) extendConfigurationsWithIndirectTrafficTargets( func (c *Controller) extendRevisionsWithIndirectTrafficTargets( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) error { ns := route.Namespace - revisionClient := c.elaclientset.ElafrosV1alpha1().Revisions(ns) + revisionClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(ns) for _, tt := range route.Spec.Traffic { if tt.ConfigurationName != "" { @@ -569,7 +402,7 @@ func (c *Controller) extendRevisionsWithIndirectTrafficTargets( func (c *Controller) setLabelForGivenConfigurations( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration) error { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(route.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(route.Namespace) // Validate for _, config := range configMap { @@ -578,7 +411,7 @@ func (c *Controller) setLabelForGivenConfigurations( if routeName != route.Name { errMsg := fmt.Sprintf("Configuration %q is already in use by %q, and cannot be used by %q", config.Name, routeName, route.Name) - c.recorder.Event(route, corev1.EventTypeWarning, "ConfigurationInUse", errMsg) + c.Recorder.Event(route, corev1.EventTypeWarning, "ConfigurationInUse", errMsg) return errors.New(errMsg) } } @@ -603,7 +436,7 @@ func (c *Controller) setLabelForGivenConfigurations( func (c *Controller) setLabelForGivenRevisions( route *v1alpha1.Route, revMap map[string]*v1alpha1.Revision) error { - revisionClient := c.elaclientset.ElafrosV1alpha1().Revisions(route.Namespace) + revisionClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(route.Namespace) // Validate revision if it already has a route label for _, rev := range revMap { @@ -611,7 +444,7 @@ func (c *Controller) setLabelForGivenRevisions( if routeName != route.Name { errMsg := fmt.Sprintf("Revision %q is already in use by %q, and cannot be used by %q", rev.Name, routeName, route.Name) - c.recorder.Event(route, corev1.EventTypeWarning, "RevisionInUse", errMsg) + c.Recorder.Event(route, corev1.EventTypeWarning, "RevisionInUse", errMsg) return errors.New(errMsg) } } @@ -635,7 +468,7 @@ func (c *Controller) setLabelForGivenRevisions( func (c *Controller) deleteLabelForOutsideOfGivenConfigurations( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration) error { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(route.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(route.Namespace) // Get Configurations set as traffic target before this sync. oldConfigsList, err := configClient.List( metav1.ListOptions{ @@ -664,7 +497,7 @@ func (c *Controller) deleteLabelForOutsideOfGivenConfigurations( func (c *Controller) deleteLabelForOutsideOfGivenRevisions( route *v1alpha1.Route, revMap map[string]*v1alpha1.Revision) error { - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(route.Namespace) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(route.Namespace) oldRevList, err := revClient.List( metav1.ListOptions{ @@ -790,7 +623,7 @@ func (c *Controller) computeEmptyRevisionRoutes( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) ([]RevisionRoute, error) { ns := route.Namespace elaNS := controller.GetElaNamespaceName(ns) - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(ns) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(ns) revRoutes := []RevisionRoute{} for _, tt := range route.Spec.Traffic { configName := tt.ConfigurationName @@ -826,7 +659,7 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m revMap map[string]*v1alpha1.Revision) ([]RevisionRoute, error) { // grab a client that's specific to RouteRule. ns := route.Namespace - routeClient := c.elaclientset.ConfigV1alpha2().RouteRules(ns) + routeClient := c.ElaClientSet.ConfigV1alpha2().RouteRules(ns) if routeClient == nil { glog.Errorf("Failed to create resource client") return nil, fmt.Errorf("Couldn't get a routeClient") @@ -858,17 +691,17 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m } routeRules = MakeIstioRoutes(route, nil, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Create(routeRules); err != nil { - c.recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) } else { routeRules.Spec = makeIstioRouteSpec(route, nil, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Update(routeRules); err != nil { - c.recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update Istio route rule %q: %s", routeRules.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update Istio route rule %q: %s", routeRules.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) } // Create route rule for named traffic targets @@ -884,16 +717,16 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m } routeRules = MakeIstioRoutes(route, &tt, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Create(routeRules); err != nil { - c.recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) } else { routeRules.Spec = makeIstioRouteSpec(route, &tt, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Update(routeRules); err != nil { return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) } } if err := c.removeOutdatedRouteRules(route); err != nil { @@ -903,7 +736,7 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m } func (c *Controller) updateStatus(route *v1alpha1.Route) (*v1alpha1.Route, error) { - routeClient := c.elaclientset.ElafrosV1alpha1().Routes(route.Namespace) + routeClient := c.ElaClientSet.ElafrosV1alpha1().Routes(route.Namespace) existing, err := routeClient.Get(route.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -971,7 +804,7 @@ func (c *Controller) consolidateTrafficTargets(route *v1alpha1.Route) { func (c *Controller) removeOutdatedRouteRules(u *v1alpha1.Route) error { ns := u.Namespace - routeClient := c.elaclientset.ConfigV1alpha2().RouteRules(ns) + routeClient := c.ElaClientSet.ConfigV1alpha2().RouteRules(ns) if routeClient == nil { glog.Error("Failed to create resource client") return errors.New("Couldn't get a routeClient") @@ -1061,7 +894,7 @@ func (c *Controller) updateIngressEvent(old, new interface{}) { } } ns := ingress.Namespace - routeClient := c.elaclientset.ElafrosV1alpha1().Routes(ns) + routeClient := c.ElaClientSet.ElafrosV1alpha1().Routes(ns) route, err := routeClient.Get(routeName, metav1.GetOptions{}) if err != nil { glog.Errorf("Error fetching route '%s/%s' upon ingress becoming: %v", diff --git a/pkg/controller/service/BUILD.bazel b/pkg/controller/service/BUILD.bazel index c71930f75bb9..cb94c9c09387 100644 --- a/pkg/controller/service/BUILD.bazel +++ b/pkg/controller/service/BUILD.bazel @@ -19,17 +19,12 @@ go_library( "//pkg/controller:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/go.opencensus.io/stats:go_default_library", - "//vendor/go.opencensus.io/stats/view:go_default_library", "//vendor/go.opencensus.io/tag:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/controller/service/service.go b/pkg/controller/service/service.go index a616f4848c15..a4bf92d14bc6 100644 --- a/pkg/controller/service/service.go +++ b/pkg/controller/service/service.go @@ -17,21 +17,15 @@ limitations under the License. package service import ( - "context" "fmt" "reflect" - "time" "github.com/golang/glog" - corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -43,7 +37,6 @@ import ( listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) @@ -62,9 +55,7 @@ const ( // Controller implements the controller for Service resources. // +controller:group=ela,version=v1alpha1,kind=Service,resource=services type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - elaclientset clientset.Interface + *controller.ControllerBase // lister indexes properties about Services lister listers.ServiceLister @@ -84,8 +75,8 @@ type Controller struct { // NewController initializes the controller and is called by the generated code // Registers eventhandlers to enqueue events func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, elaInformerFactory informers.SharedInformerFactory, config *rest.Config, @@ -96,30 +87,13 @@ func NewController( // obtain references to a shared index informer for the Services. informer := elaInformerFactory.Elafros().V1alpha1().Services() - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, - lister: informer.Lister(), - synced: informer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services"), - recorder: recorder, + ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Revisions"), + lister: informer.Lister(), + synced: informer.Informer().HasSynced, } - glog.Info("Setting up event handlers") - // Set up an event handler for when Service resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueService, - UpdateFunc: func(old, new interface{}) { - controller.enqueueService(new) - }, - }) return controller } @@ -128,130 +102,8 @@ func NewController( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Service controller") - - // Metrics setup: begin - // Create the tag keys that will be used to add tags to our measurements. - var err error - if statusTagKey, err = tag.NewKey("status"); err != nil { - return fmt.Errorf("failed to create tag key in OpenCensus: %v", err) - } - // Create view to see our measurements cumulatively. - countView := &view.View{ - Description: "Counter to keep track of items in the service work queue.", - Measure: processItemCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{statusTagKey}, - } - if err = view.Register(countView); err != nil { - return fmt.Errorf("failed to register the views in OpenCensus: %v", err) - } - defer view.Unregister(countView) - // Metrics setup: end - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Service resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the updateServiceEvent. -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err, processStatus := func(obj interface{}) (error, string) { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil, controller.PromLabelValueInvalid - } - // Run the updateServiceEvent passing it the namespace/name string of the - // Service resource to be synced. - if err := c.updateServiceEvent(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err), controller.PromLabelValueFailure - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil, controller.PromLabelValueSuccess - }(obj) - - if ctx, tagError := tag.New(context.Background(), tag.Insert(statusTagKey, processStatus)); tagError == nil { - // Increment the request count by one. - stats.Record(ctx, processItemCount.M(1)) - } - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueService takes a Service resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Service. -// TODO(grantr): generic -// TODO: assert the object passed in is of the correct type. -func (c *Controller) enqueueService(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.RunController(threadiness, stopCh, []cache.InformerSynced{c.synced}, + c.updateServiceEvent, "Service") } // updateServiceEvent compares the actual state with the desired, and attempts to @@ -298,7 +150,7 @@ func (c *Controller) updateServiceEvent(key string) error { } func (c *Controller) updateStatus(service *v1alpha1.Service) (*v1alpha1.Service, error) { - serviceClient := c.elaclientset.ElafrosV1alpha1().Services(service.Namespace) + serviceClient := c.ElaClientSet.ElafrosV1alpha1().Services(service.Namespace) existing, err := serviceClient.Get(service.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -313,7 +165,7 @@ func (c *Controller) updateStatus(service *v1alpha1.Service) (*v1alpha1.Service, } func (c *Controller) reconcileConfiguration(config *v1alpha1.Configuration) error { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(config.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(config.Namespace) existing, err := configClient.Get(config.Name, metav1.GetOptions{}) if err != nil { @@ -331,7 +183,7 @@ func (c *Controller) reconcileConfiguration(config *v1alpha1.Configuration) erro } func (c *Controller) reconcileRoute(route *v1alpha1.Route) error { - routeClient := c.elaclientset.ElafrosV1alpha1().Routes(route.Namespace) + routeClient := c.ElaClientSet.ElafrosV1alpha1().Routes(route.Namespace) existing, err := routeClient.Get(route.Name, metav1.GetOptions{}) if err != nil { From 26103999ab45742106b37e2a180d14480ceaea54 Mon Sep 17 00:00:00 2001 From: mdemirhan Date: Tue, 15 May 2018 14:31:54 -0700 Subject: [PATCH 5/5] Rename controller.ControllerBase to controller.Base based on PR feedback. --- pkg/controller/configuration/configuration.go | 4 ++-- pkg/controller/controller.go | 18 +++++++++--------- pkg/controller/revision/revision.go | 4 ++-- pkg/controller/route/route.go | 4 ++-- pkg/controller/service/service.go | 4 ++-- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/controller/configuration/configuration.go b/pkg/controller/configuration/configuration.go index d6f0dfe7fdbd..2963232cf842 100644 --- a/pkg/controller/configuration/configuration.go +++ b/pkg/controller/configuration/configuration.go @@ -42,7 +42,7 @@ const controllerAgentName = "configuration-controller" // Controller implements the controller for Configuration resources type Controller struct { - *controller.ControllerBase + *controller.Base buildClientSet buildclientset.Interface @@ -70,7 +70,7 @@ func NewController( revisionInformer := elaInformerFactory.Elafros().V1alpha1().Revisions() controller := &Controller{ - ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, elaInformerFactory, informer.Informer(), controllerAgentName, "Configurations"), buildClientSet: buildClientSet, lister: informer.Lister(), diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 21ac6ece6d5b..8887f05450c7 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -47,9 +47,9 @@ func init() { elascheme.AddToScheme(scheme.Scheme) } -// ControllerBase implements most of the boilerplate and common code +// Base implements most of the boilerplate and common code // we have in our controllers. -type ControllerBase struct { +type Base struct { // KubeClientSet allows us to talk to the k8s for core APIs KubeClientSet kubernetes.Interface @@ -76,16 +76,16 @@ type ControllerBase struct { WorkQueue workqueue.RateLimitingInterface } -// NewControllerBase instantiates a new instance of ControllerBase implementing +// NewBase instantiates a new instance of Base implementing // the common & boilerplate code between our controllers. -func NewControllerBase( +func NewBase( kubeClientSet kubernetes.Interface, elaClientSet clientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, elaInformerFactory informers.SharedInformerFactory, informer cache.SharedIndexInformer, controllerAgentName string, - workQueueName string) *ControllerBase { + workQueueName string) *Base { // Create event broadcaster glog.V(4).Info("Creating event broadcaster") @@ -94,7 +94,7 @@ func NewControllerBase( eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - base := &ControllerBase{ + base := &Base{ KubeClientSet: kubeClientSet, ElaClientSet: elaClientSet, KubeInformerFactory: kubeInformerFactory, @@ -116,7 +116,7 @@ func NewControllerBase( // enqueueWork takes a resource and converts it into a // namespace/name string which is then put onto the work queue. -func (c *ControllerBase) enqueueWork(obj interface{}) { +func (c *Base) enqueueWork(obj interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { @@ -130,7 +130,7 @@ func (c *ControllerBase) enqueueWork(obj interface{}) { // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (c *ControllerBase) RunController( +func (c *Base) RunController( threadiness int, stopCh <-chan struct{}, informersSynced []cache.InformerSynced, @@ -168,7 +168,7 @@ func (c *ControllerBase) RunController( // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. -func (c *ControllerBase) processNextWorkItem(syncHandler func(string) error) bool { +func (c *Base) processNextWorkItem(syncHandler func(string) error) bool { obj, shutdown := c.WorkQueue.Get() if shutdown { diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index 0896194c5f2d..a7144bfba08b 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -92,7 +92,7 @@ type resolver interface { // Controller implements the controller for Revision resources. // +controller:group=ela,version=v1alpha1,kind=Revision,resource=revisions type Controller struct { - *controller.ControllerBase + *controller.Base // lister indexes properties about Revision lister listers.RevisionLister @@ -164,7 +164,7 @@ func NewController( deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() controller := &Controller{ - ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, elaInformerFactory, informer.Informer(), controllerAgentName, "Revisions"), lister: informer.Lister(), synced: informer.Informer().HasSynced, diff --git a/pkg/controller/route/route.go b/pkg/controller/route/route.go index 8e28bd3d9932..e3c7bc12fc45 100644 --- a/pkg/controller/route/route.go +++ b/pkg/controller/route/route.go @@ -76,7 +76,7 @@ type RevisionRoute struct { // Controller implements the controller for Route resources. // +controller:group=ela,version=v1alpha1,kind=Route,resource=routes type Controller struct { - *controller.ControllerBase + *controller.Base // lister indexes properties about Route lister listers.RouteLister @@ -116,7 +116,7 @@ func NewController( ingressInformer := kubeInformerFactory.Extensions().V1beta1().Ingresses() controller := &Controller{ - ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, elaInformerFactory, informer.Informer(), controllerAgentName, "Routes"), lister: informer.Lister(), synced: informer.Informer().HasSynced, diff --git a/pkg/controller/service/service.go b/pkg/controller/service/service.go index a4bf92d14bc6..9ec913e2e7e2 100644 --- a/pkg/controller/service/service.go +++ b/pkg/controller/service/service.go @@ -55,7 +55,7 @@ const ( // Controller implements the controller for Service resources. // +controller:group=ela,version=v1alpha1,kind=Service,resource=services type Controller struct { - *controller.ControllerBase + *controller.Base // lister indexes properties about Services lister listers.ServiceLister @@ -88,7 +88,7 @@ func NewController( informer := elaInformerFactory.Elafros().V1alpha1().Services() controller := &Controller{ - ControllerBase: controller.NewControllerBase(kubeClientSet, elaClientSet, kubeInformerFactory, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, elaInformerFactory, informer.Informer(), controllerAgentName, "Revisions"), lister: informer.Lister(), synced: informer.Informer().HasSynced,