diff --git a/pkg/controller/BUILD.bazel b/pkg/controller/BUILD.bazel index 9aaa973ae4f9..f49d433669e5 100644 --- a/pkg/controller/BUILD.bazel +++ b/pkg/controller/BUILD.bazel @@ -14,13 +14,23 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/ela/v1alpha1:go_default_library", + "//pkg/client/clientset/versioned:go_default_library", "//pkg/client/clientset/versioned/scheme:go_default_library", + "//pkg/client/informers/externalversions:go_default_library", "//vendor/github.com/ghodss/yaml:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/configuration/BUILD.bazel b/pkg/controller/configuration/BUILD.bazel index ab0547a585c6..aa1326d24414 100644 --- a/pkg/controller/configuration/BUILD.bazel +++ b/pkg/controller/configuration/BUILD.bazel @@ -19,15 +19,10 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/configuration/configuration.go b/pkg/controller/configuration/configuration.go index e7d8144c1be1..2963232cf842 100644 --- a/pkg/controller/configuration/configuration.go +++ b/pkg/controller/configuration/configuration.go @@ -18,67 +18,47 @@ package configuration import ( "fmt" - "time" + buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1" + buildclientset "github.com/elafros/build/pkg/client/clientset/versioned" + "github.com/elafros/elafros/pkg/apis/ela" + "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" + informers "github.com/elafros/elafros/pkg/client/informers/externalversions" + listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" - "github.com/golang/glog" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - - buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1" - buildclientset "github.com/elafros/build/pkg/client/clientset/versioned" - "github.com/elafros/elafros/pkg/apis/ela" - "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" - clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" - informers "github.com/elafros/elafros/pkg/client/informers/externalversions" - listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" ) const controllerAgentName = "configuration-controller" // Controller implements the controller for Configuration resources type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - elaclientset clientset.Interface - buildclientset buildclientset.Interface + *controller.Base + + buildClientSet buildclientset.Interface // lister indexes properties about Configuration lister listers.ConfigurationLister synced cache.InformerSynced - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder - // don't start the workers until revisions cache have been synced revisionsSynced cache.InformerSynced } // NewController creates a new Configuration controller -//TODO(grantr): somewhat generic (generic behavior) func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, - buildclientset buildclientset.Interface, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, + buildClientSet buildclientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, elaInformerFactory informers.SharedInformerFactory, config *rest.Config, @@ -89,33 +69,16 @@ func NewController( informer := elaInformerFactory.Elafros().V1alpha1().Configurations() revisionInformer := elaInformerFactory.Elafros().V1alpha1().Revisions() - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, - buildclientset: buildclientset, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Configurations"), + buildClientSet: buildClientSet, lister: informer.Lister(), synced: informer.Informer().HasSynced, revisionsSynced: revisionInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Configurations"), - recorder: recorder, } glog.Info("Setting up event handlers") - // Set up an event handler for when Configuration resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueConfiguration, - UpdateFunc: func(old, new interface{}) { - controller.enqueueConfiguration(new) - }, - }) - revisionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.addRevisionEvent, UpdateFunc: controller.updateRevisionEvent, @@ -127,121 +90,14 @@ func NewController( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -//TODO(grantr): generic func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Configuration controller") - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - // Wait for the revisions caches to be synced before starting workers - glog.Info("Waiting for revisions informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.revisionsSynced); !ok { - return fmt.Errorf("failed to wait for revisions caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Configuration resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -//TODO(grantr): generic -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. -//TODO(grantr): generic -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - // Run the syncHandler, passing it the namespace/name string of the - // Configuration resource to be synced. - if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil - }(obj) - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueConfiguration takes a Configuration resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Configuration. -//TODO(grantr): generic -func (c *Controller) enqueueConfiguration(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.RunController(threadiness, stopCh, + []cache.InformerSynced{c.synced, c.revisionsSynced}, c.syncHandler, "Configuration") } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Configuration // resource with the current status of the resource. -//TODO(grantr): not generic func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -288,14 +144,14 @@ func (c *Controller) syncHandler(key string) error { Spec: *config.Spec.Build, } build.OwnerReferences = append(build.OwnerReferences, *controllerRef) - created, err := c.buildclientset.BuildV1alpha1().Builds(build.Namespace).Create(build) + created, err := c.buildClientSet.BuildV1alpha1().Builds(build.Namespace).Create(build) if err != nil { glog.Errorf("Failed to create Build:\n%+v\n%s", build, err) - c.recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err) + c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err) return err } glog.Infof("Created Build:\n%+v", created.Name) - c.recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name) + c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name) spec.BuildName = created.Name } @@ -304,7 +160,7 @@ func (c *Controller) syncHandler(key string) error { return err } - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(config.Namespace) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(config.Namespace) created, err := revClient.Get(revName, metav1.GetOptions{}) if err != nil { if !errors.IsNotFound(err) { @@ -339,10 +195,10 @@ func (c *Controller) syncHandler(key string) error { created, err = revClient.Create(rev) if err != nil { glog.Errorf("Failed to create Revision:\n%+v\n%s", rev, err) - c.recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err) + c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err) return err } - c.recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name) + c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name) glog.Infof("Created Revision:\n%+v", created) } else { glog.Infof("Revision already created %s: %s", created.ObjectMeta.Name, err) @@ -374,7 +230,7 @@ func generateRevisionName(u *v1alpha1.Configuration) (string, error) { } func (c *Controller) updateStatus(u *v1alpha1.Configuration) (*v1alpha1.Configuration, error) { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(u.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(u.Namespace) newu, err := configClient.Get(u.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -423,7 +279,7 @@ func (c *Controller) addRevisionEvent(obj interface{}) { glog.Errorf("Error updating configuration '%s/%s': %v", namespace, configName, err) } - c.recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate", + c.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate", "Latest revision of configuration is not ready") } else { @@ -442,10 +298,10 @@ func (c *Controller) addRevisionEvent(obj interface{}) { namespace, configName, err) } if !alreadyReady { - c.recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady", + c.Recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady", "Configuration becomes ready") } - c.recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate", + c.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate", "LatestReadyRevisionName updated to %q", revision.Name) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ff249fd3274d..8887f05450c7 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -17,11 +17,26 @@ limitations under the License. package controller import ( - "k8s.io/client-go/kubernetes/scheme" + "fmt" + "time" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" elascheme "github.com/elafros/elafros/pkg/client/clientset/versioned/scheme" + informers "github.com/elafros/elafros/pkg/client/informers/externalversions" + "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" ) +// Interface defines the controller interface type Interface interface { Run(threadiness int, stopCh <-chan struct{}) error } @@ -31,3 +46,175 @@ func init() { // logged for ela types. elascheme.AddToScheme(scheme.Scheme) } + +// Base implements most of the boilerplate and common code +// we have in our controllers. +type Base struct { + // KubeClientSet allows us to talk to the k8s for core APIs + KubeClientSet kubernetes.Interface + + // ElaClientSet allows us to configure Ela objects + ElaClientSet clientset.Interface + + // KubeInformerFactory provides shared informers for resources + // in all known API group versions + KubeInformerFactory kubeinformers.SharedInformerFactory + + // ElaInformerFactory provides shared informers for resources + // in all known API group versions + ElaInformerFactory informers.SharedInformerFactory + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // WorkQueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + WorkQueue workqueue.RateLimitingInterface +} + +// NewBase instantiates a new instance of Base implementing +// the common & boilerplate code between our controllers. +func NewBase( + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, + kubeInformerFactory kubeinformers.SharedInformerFactory, + elaInformerFactory informers.SharedInformerFactory, + informer cache.SharedIndexInformer, + controllerAgentName string, + workQueueName string) *Base { + + // Create event broadcaster + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + base := &Base{ + KubeClientSet: kubeClientSet, + ElaClientSet: elaClientSet, + KubeInformerFactory: kubeInformerFactory, + ElaInformerFactory: elaInformerFactory, + Recorder: recorder, + WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), + } + + // Set up an event handler for when the resource types of interest change + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: base.enqueueWork, + UpdateFunc: func(old, new interface{}) { + base.enqueueWork(new) + }, + }) + + return base +} + +// enqueueWork takes a resource and converts it into a +// namespace/name string which is then put onto the work queue. +func (c *Base) enqueueWork(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return + } + c.WorkQueue.AddRateLimited(key) +} + +// RunController will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *Base) RunController( + threadiness int, + stopCh <-chan struct{}, + informersSynced []cache.InformerSynced, + syncHandler func(string) error, + controllerName string) error { + + defer runtime.HandleCrash() + defer c.WorkQueue.ShutDown() + + glog.Infof("Starting %s controller", controllerName) + + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for informer caches to sync") + for i, synced := range informersSynced { + if ok := cache.WaitForCacheSync(stopCh, synced); !ok { + return fmt.Errorf("failed to wait for cache at index %v to sync", i) + } + } + + // Launch workers to process Revision resources + glog.Info("Starting workers") + for i := 0; i < threadiness; i++ { + go wait.Until(func() { + for c.processNextWorkItem(syncHandler) { + } + }, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *Base) processNextWorkItem(syncHandler func(string) error) bool { + obj, shutdown := c.WorkQueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.base.WorkQueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.WorkQueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.WorkQueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Configuration resource to be synced. + if err := syncHandler(key); err != nil { + return fmt.Errorf("error syncing %q: %v", key, err) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.WorkQueue.Forget(obj) + glog.Infof("Successfully synced %q", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} diff --git a/pkg/controller/revision/BUILD.bazel b/pkg/controller/revision/BUILD.bazel index f2b56a9ab58f..ed747bec250c 100644 --- a/pkg/controller/revision/BUILD.bazel +++ b/pkg/controller/revision/BUILD.bazel @@ -29,9 +29,6 @@ go_library( "//vendor/github.com/google/go-containerregistry/v1/remote:go_default_library", "//vendor/github.com/josephburnett/k8sflag/pkg/k8sflag:go_default_library", "//vendor/github.com/mattmoor/k8schain:go_default_library", - "//vendor/go.opencensus.io/stats:go_default_library", - "//vendor/go.opencensus.io/stats/view:go_default_library", - "//vendor/go.opencensus.io/tag:go_default_library", "//vendor/k8s.io/api/apps/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -40,15 +37,10 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) @@ -67,6 +59,7 @@ go_test( "//pkg/client/informers/externalversions:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/testing:go_default_library", + "//pkg/queue:go_default_library", "//vendor/github.com/elafros/build/pkg/apis/build/v1alpha1:go_default_library", "//vendor/github.com/elafros/build/pkg/client/clientset/versioned/fake:go_default_library", "//vendor/github.com/elafros/build/pkg/client/informers/externalversions:go_default_library", diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index ab1ce1fe7b64..a7144bfba08b 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -17,7 +17,6 @@ limitations under the License. package revision import ( - "context" "fmt" "log" "net/http" @@ -27,36 +26,28 @@ import ( "github.com/elafros/elafros/pkg/apis/ela" "github.com/josephburnett/k8sflag/pkg/k8sflag" + clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" "github.com/golang/glog" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + kubeinformers "k8s.io/client-go/informers" + informers "github.com/elafros/elafros/pkg/client/informers/externalversions" "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1" buildinformers "github.com/elafros/build/pkg/client/informers/externalversions" "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" - clientset "github.com/elafros/elafros/pkg/client/clientset/versioned" - informers "github.com/elafros/elafros/pkg/client/informers/externalversions" listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" ) const ( @@ -84,11 +75,6 @@ var ( elaPodReplicaCount = int32(1) elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 1} elaPodMaxSurge = intstr.IntOrString{Type: intstr.Int, IntVal: 1} - processItemCount = stats.Int64( - "controller_revision_queue_process_count", - "Counter to keep track of items in the revision work queue.", - stats.UnitNone) - statusTagKey tag.Key ) // Helper to make sure we log error messages returned by Reconcile(). @@ -106,26 +92,12 @@ type resolver interface { // Controller implements the controller for Revision resources. // +controller:group=ela,version=v1alpha1,kind=Revision,resource=revisions type Controller struct { - // kubeClient allows us to talk to the k8s for core APIs - kubeclientset kubernetes.Interface - - // elaClient allows us to configure Ela objects - elaclientset clientset.Interface + *controller.Base // lister indexes properties about Revision lister listers.RevisionLister synced cache.InformerSynced - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder - buildtracker *buildTracker resolver resolver @@ -177,51 +149,32 @@ type ControllerConfig struct { // config - client configuration for talking to the apiserver // si - informer factory shared across all controllers for listening to events and indexing resource properties // queue - message queue for handling new events. unique to this controller. -//TODO(vaikas): somewhat generic (generic behavior) func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, elaInformerFactory informers.SharedInformerFactory, buildInformerFactory buildinformers.SharedInformerFactory, config *rest.Config, controllerConfig *ControllerConfig) controller.Interface { - // obtain references to a shared index informer for the Revision and - // Endpoint type. + // obtain references to a shared index informer for the Revision and Endpoint type. informer := elaInformerFactory.Elafros().V1alpha1().Revisions() endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Revisions"), lister: informer.Lister(), synced: informer.Informer().HasSynced, endpointsSynced: endpointsInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Revisions"), - recorder: recorder, buildtracker: &buildTracker{builds: map[key]set{}}, - resolver: &digestResolver{client: kubeclientset, transport: http.DefaultTransport}, + resolver: &digestResolver{client: kubeClientSet, transport: http.DefaultTransport}, controllerConfig: controllerConfig, } glog.Info("Setting up event handlers") - // Set up an event handler for when Revision resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueRevision, - UpdateFunc: func(old, new interface{}) { - controller.enqueueRevision(new) - }, - }) - // Obtain a reference to a shared index informer for the Build type. buildInformer := buildInformerFactory.Build().V1alpha1().Builds() buildInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -246,145 +199,14 @@ func NewController( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -//TODO(grantr): generic func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Revision controller") - - // Metrics setup: begin - // Create the tag keys that will be used to add tags to our measurements. - var err error - if statusTagKey, err = tag.NewKey("status"); err != nil { - return fmt.Errorf("failed to create tag key in OpenCensus: %v", err) - } - // Create view to see our measurements cumulatively. - countView := &view.View{ - Description: "Counter to keep track of items in the revision work queue.", - Measure: processItemCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{statusTagKey}, - } - if err = view.Register(countView); err != nil { - return fmt.Errorf("failed to register the views in OpenCensus: %v", err) - } - defer view.Unregister(countView) - // Metrics setup: end - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for endpoints informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.endpointsSynced); !ok { - return fmt.Errorf("failed to wait for endpoints caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Revision resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -//TODO(grantr): generic -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the syncHandler. -//TODO(vaikas): generic -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err, processStatus := func(obj interface{}) (error, string) { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil, controller.PromLabelValueInvalid - } - // Run the syncHandler, passing it the namespace/name string of the - // Revision resource to be synced. - if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err), controller.PromLabelValueFailure - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil, controller.PromLabelValueSuccess - }(obj) - - if ctx, tagError := tag.New(context.Background(), tag.Insert(statusTagKey, processStatus)); tagError == nil { - // Increment the request count by one. - stats.Record(ctx, processItemCount.M(1)) - } - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueRevision takes a Revision resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Revision. -//TODO(grantr): generic -func (c *Controller) enqueueRevision(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.RunController(threadiness, stopCh, []cache.InformerSynced{c.synced, c.endpointsSynced}, + c.syncHandler, "Revision") } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Revision resource // with the current status of the resource. -//TODO(grantr): not generic func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -432,7 +254,7 @@ func (c *Controller) syncHandler(key string) error { } } - ns, err := controller.GetOrCreateRevisionNamespace(namespace, c.kubeclientset) + ns, err := controller.GetOrCreateRevisionNamespace(namespace, c.KubeClientSet) if err != nil { log.Printf("Failed to create namespace: %s", err) panic("Failed to create namespace") @@ -546,7 +368,7 @@ func (c *Controller) markBuildComplete(rev *v1alpha1.Revision, bc *buildv1alpha1 Type: v1alpha1.RevisionConditionBuildSucceeded, Status: corev1.ConditionTrue, }) - c.recorder.Event(rev, corev1.EventTypeNormal, "BuildComplete", bc.Message) + c.Recorder.Event(rev, corev1.EventTypeNormal, "BuildComplete", bc.Message) case buildv1alpha1.BuildFailed, buildv1alpha1.BuildInvalid: rev.Status.SetCondition( &v1alpha1.RevisionCondition{ @@ -562,7 +384,7 @@ func (c *Controller) markBuildComplete(rev *v1alpha1.Revision, bc *buildv1alpha1 Reason: bc.Reason, Message: bc.Message, }) - c.recorder.Event(rev, corev1.EventTypeWarning, "BuildFailed", bc.Message) + c.Recorder.Event(rev, corev1.EventTypeWarning, "BuildFailed", bc.Message) } // This will trigger a reconciliation that will cause us to stop tracking the build. _, err := c.updateStatus(rev) @@ -676,7 +498,7 @@ func (c *Controller) addDeploymentProgressEvent(obj interface{}) { glog.Errorf("Error recording revision completion: %s", err) return } - c.recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", "Revision %s not ready due to Deployment timeout", revName) + c.Recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", "Revision %s not ready due to Deployment timeout", revName) return } @@ -721,7 +543,7 @@ func (c *Controller) addEndpointsEvent(obj interface{}) { glog.Errorf("Error marking revision ready for '%s/%s': %v", namespace, revName, err) return } - c.recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name) + c.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name) return } @@ -734,7 +556,7 @@ func (c *Controller) addEndpointsEvent(obj interface{}) { glog.Errorf("Error marking revision failed for '%s/%s': %v", namespace, revName, err) return } - c.recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name) + c.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name) return } @@ -872,7 +694,7 @@ func (c *Controller) createK8SResources(rev *v1alpha1.Revision, ns string) error func (c *Controller) deleteDeployment(rev *v1alpha1.Revision, ns string) error { deploymentName := controller.GetRevisionDeploymentName(rev) - dc := c.kubeclientset.AppsV1().Deployments(ns) + dc := c.KubeClientSet.AppsV1().Deployments(ns) if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil && apierrs.IsNotFound(err) { return nil } @@ -890,7 +712,7 @@ func (c *Controller) deleteDeployment(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) reconcileDeployment(rev *v1alpha1.Revision, ns string) error { - dc := c.kubeclientset.AppsV1().Deployments(ns) + dc := c.KubeClientSet.AppsV1().Deployments(ns) // First, check if deployment exists already. deploymentName := controller.GetRevisionDeploymentName(rev) @@ -952,7 +774,7 @@ func (c *Controller) reconcileDeployment(rev *v1alpha1.Revision, ns string) erro } func (c *Controller) deleteService(rev *v1alpha1.Revision, ns string) error { - sc := c.kubeclientset.Core().Services(ns) + sc := c.KubeClientSet.Core().Services(ns) serviceName := controller.GetElaK8SServiceNameForRevision(rev) log.Printf("Deleting service %q", serviceName) @@ -968,7 +790,7 @@ func (c *Controller) deleteService(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) reconcileService(rev *v1alpha1.Revision, ns string) (string, error) { - sc := c.kubeclientset.Core().Services(ns) + sc := c.KubeClientSet.Core().Services(ns) serviceName := controller.GetElaK8SServiceNameForRevision(rev) if _, err := sc.Get(serviceName, metav1.GetOptions{}); err != nil { @@ -994,7 +816,7 @@ func (c *Controller) reconcileService(rev *v1alpha1.Revision, ns string) (string func (c *Controller) reconcileFluentdConfigMap(rev *v1alpha1.Revision) error { ns := rev.Namespace - cmc := c.kubeclientset.Core().ConfigMaps(ns) + cmc := c.KubeClientSet.Core().ConfigMaps(ns) _, err := cmc.Get(fluentdConfigMapName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -1017,7 +839,7 @@ func (c *Controller) reconcileFluentdConfigMap(rev *v1alpha1.Revision) error { func (c *Controller) deleteAutoscalerService(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - sc := c.kubeclientset.Core().Services(AutoscalerNamespace) + sc := c.KubeClientSet.Core().Services(AutoscalerNamespace) if _, err := sc.Get(autoscalerName, metav1.GetOptions{}); err != nil && apierrs.IsNotFound(err) { return nil } @@ -1035,7 +857,7 @@ func (c *Controller) deleteAutoscalerService(rev *v1alpha1.Revision) error { func (c *Controller) reconcileAutoscalerService(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - sc := c.kubeclientset.Core().Services(AutoscalerNamespace) + sc := c.KubeClientSet.Core().Services(AutoscalerNamespace) _, err := sc.Get(autoscalerName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -1058,7 +880,7 @@ func (c *Controller) reconcileAutoscalerService(rev *v1alpha1.Revision) error { func (c *Controller) deleteAutoscalerDeployment(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - dc := c.kubeclientset.AppsV1().Deployments(AutoscalerNamespace) + dc := c.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) _, err := dc.Get(autoscalerName, metav1.GetOptions{}) if err != nil && apierrs.IsNotFound(err) { return nil @@ -1077,7 +899,7 @@ func (c *Controller) deleteAutoscalerDeployment(rev *v1alpha1.Revision) error { func (c *Controller) reconcileAutoscalerDeployment(rev *v1alpha1.Revision) error { autoscalerName := controller.GetRevisionAutoscalerName(rev) - dc := c.kubeclientset.AppsV1().Deployments(AutoscalerNamespace) + dc := c.KubeClientSet.AppsV1().Deployments(AutoscalerNamespace) _, err := dc.Get(autoscalerName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { @@ -1112,7 +934,7 @@ func (c *Controller) removeFinalizers(rev *v1alpha1.Revision, ns string) error { } } accessor.SetFinalizers(finalizers) - prClient := c.elaclientset.ElafrosV1alpha1().Revisions(rev.Namespace) + prClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) prClient.Update(rev) log.Printf("The finalizer 'controller' is removed.") @@ -1120,7 +942,7 @@ func (c *Controller) removeFinalizers(rev *v1alpha1.Revision, ns string) error { } func (c *Controller) updateStatus(rev *v1alpha1.Revision) (*v1alpha1.Revision, error) { - prClient := c.elaclientset.ElafrosV1alpha1().Revisions(rev.Namespace) + prClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(rev.Namespace) newRev, err := prClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { return nil, err diff --git a/pkg/controller/route/BUILD.bazel b/pkg/controller/route/BUILD.bazel index 686152ed3aad..e612a0b155bb 100644 --- a/pkg/controller/route/BUILD.bazel +++ b/pkg/controller/route/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/josephburnett/k8sflag/pkg/k8sflag:go_default_library", "//vendor/go.opencensus.io/stats:go_default_library", - "//vendor/go.opencensus.io/stats/view:go_default_library", "//vendor/go.opencensus.io/tag:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", @@ -29,15 +28,10 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/route/route.go b/pkg/controller/route/route.go index 931b50761ba1..e3c7bc12fc45 100644 --- a/pkg/controller/route/route.go +++ b/pkg/controller/route/route.go @@ -17,11 +17,9 @@ limitations under the License. package route import ( - "context" "errors" "fmt" "reflect" - "time" "github.com/golang/glog" "github.com/josephburnett/k8sflag/pkg/k8sflag" @@ -30,15 +28,10 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" "github.com/elafros/elafros/pkg/apis/ela" "github.com/elafros/elafros/pkg/apis/ela/v1alpha1" @@ -47,7 +40,6 @@ import ( listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) @@ -84,24 +76,12 @@ type RevisionRoute struct { // Controller implements the controller for Route resources. // +controller:group=ela,version=v1alpha1,kind=Route,resource=routes type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - elaclientset clientset.Interface + *controller.Base // lister indexes properties about Route lister listers.RouteLister synced cache.InformerSynced - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder - // don't start the workers until configuration cache have been synced configSynced cache.InformerSynced @@ -119,8 +99,8 @@ type Controller struct { // reconcileKey - function for mapping queue keys to resource names //TODO(vaikas): somewhat generic (generic behavior) func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, elaInformerFactory informers.SharedInformerFactory, config *rest.Config, @@ -135,33 +115,17 @@ func NewController( configInformer := elaInformerFactory.Elafros().V1alpha1().Configurations() ingressInformer := kubeInformerFactory.Extensions().V1beta1().Ingresses() - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Routes"), lister: informer.Lister(), synced: informer.Informer().HasSynced, configSynced: configInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Routes"), - recorder: recorder, controllerConfig: controllerConfig, enableScaleToZero: enableScaleToZero, } glog.Info("Setting up event handlers") - // Set up an event handler for when Route resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueRoute, - UpdateFunc: func(old, new interface{}) { - controller.enqueueRoute(new) - }, - }) configInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.addConfigurationEvent, UpdateFunc: controller.updateConfigurationEvent, @@ -176,145 +140,14 @@ func NewController( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -//TODO(grantr): generic func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Route controller") - - // Metrics setup: begin - // Create the tag keys that will be used to add tags to our measurements. - var err error - if statusTagKey, err = tag.NewKey("status"); err != nil { - return fmt.Errorf("failed to create tag key in OpenCensus: %v", err) - } - // Create view to see our measurements cumulatively. - countView := &view.View{ - Description: "Counter to keep track of items in the route work queue.", - Measure: processItemCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{statusTagKey}, - } - if err = view.Register(countView); err != nil { - return fmt.Errorf("failed to register the views in OpenCensus: %v", err) - } - defer view.Unregister(countView) - // Metrics setup: end - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - // Wait for the configuration caches to be synced before starting workers - glog.Info("Waiting for configuration informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.configSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Route resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -//TODO(grantr): generic -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the updateRouteEvent. -//TODO(grantr): generic -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err, processStatus := func(obj interface{}) (error, string) { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil, controller.PromLabelValueInvalid - } - // Run the updateRouteEvent passing it the namespace/name string of the - // Route resource to be synced. - if err := c.updateRouteEvent(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err), controller.PromLabelValueFailure - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil, controller.PromLabelValueSuccess - }(obj) - - if ctx, tagError := tag.New(context.Background(), tag.Insert(statusTagKey, processStatus)); tagError == nil { - // Increment the request count by one. - stats.Record(ctx, processItemCount.M(1)) - } - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueRoute takes a Route resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Route. -//TODO(grantr): generic -func (c *Controller) enqueueRoute(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.RunController(threadiness, stopCh, []cache.InformerSynced{c.synced, c.configSynced}, + c.updateRouteEvent, "Route") } // updateRouteEvent compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Route resource // with the current status of the resource. -//TODO(grantr): not generic func (c *Controller) updateRouteEvent(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -429,26 +262,26 @@ func (c *Controller) syncTrafficTargetsAndUpdateRouteStatus(route *v1alpha1.Rout updated, err := c.updateStatus(route) if err != nil { glog.Warningf("Failed to update service status: %s", err) - c.recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update status for route %q: %v", route.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update status for route %q: %v", route.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated status for route %q", route.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated status for route %q", route.Name) return updated, nil } func (c *Controller) reconcilePlaceholderService(route *v1alpha1.Route) error { service := MakeRouteK8SService(route) - if _, err := c.kubeclientset.Core().Services(route.Namespace).Create(service); err != nil { + if _, err := c.KubeClientSet.Core().Services(route.Namespace).Create(service); err != nil { if apierrs.IsAlreadyExists(err) { // Service already exist. return nil } glog.Infof("Failed to create service: %s", err) - c.recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create service %q: %v", service.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create service %q: %v", service.Name, err) return err } glog.Infof("Created service: %q", service.Name) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created service %q", service.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created service %q", service.Name) return nil } @@ -456,13 +289,13 @@ func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { ingressNamespace := route.Namespace ingressName := controller.GetElaK8SIngressName(route) ingress := MakeRouteIngress(route) - ingressClient := c.kubeclientset.Extensions().Ingresses(ingressNamespace) + ingressClient := c.KubeClientSet.Extensions().Ingresses(ingressNamespace) existing, err := ingressClient.Get(ingressName, metav1.GetOptions{}) if err != nil { if apierrs.IsNotFound(err) { if _, err = ingressClient.Create(ingress); err == nil { glog.Infof("Created ingress %q in namespace %q", ingressName, ingressNamespace) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Ingress %q in namespace %q", ingressName, ingressNamespace) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Ingress %q in namespace %q", ingressName, ingressNamespace) } } return err @@ -472,7 +305,7 @@ func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { existing.Spec = ingress.Spec if _, err = ingressClient.Update(existing); err == nil { glog.Infof("Updated ingress %q in namespace %q", ingressName, ingressNamespace) - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Ingress %q in namespace %q", ingressName, ingressNamespace) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Ingress %q in namespace %q", ingressName, ingressNamespace) } return err } @@ -482,8 +315,8 @@ func (c *Controller) reconcileIngress(route *v1alpha1.Route) error { func (c *Controller) getDirectTrafficTargets(route *v1alpha1.Route) ( map[string]*v1alpha1.Configuration, map[string]*v1alpha1.Revision, error) { ns := route.Namespace - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(ns) - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(ns) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(ns) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(ns) configMap := map[string]*v1alpha1.Configuration{} revMap := map[string]*v1alpha1.Revision{} @@ -513,7 +346,7 @@ func (c *Controller) getDirectTrafficTargets(route *v1alpha1.Route) ( func (c *Controller) extendConfigurationsWithIndirectTrafficTargets( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) error { ns := route.Namespace - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(ns) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(ns) // Get indirect configurations. for _, rev := range revMap { @@ -538,7 +371,7 @@ func (c *Controller) extendConfigurationsWithIndirectTrafficTargets( func (c *Controller) extendRevisionsWithIndirectTrafficTargets( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) error { ns := route.Namespace - revisionClient := c.elaclientset.ElafrosV1alpha1().Revisions(ns) + revisionClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(ns) for _, tt := range route.Spec.Traffic { if tt.ConfigurationName != "" { @@ -569,7 +402,7 @@ func (c *Controller) extendRevisionsWithIndirectTrafficTargets( func (c *Controller) setLabelForGivenConfigurations( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration) error { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(route.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(route.Namespace) // Validate for _, config := range configMap { @@ -578,7 +411,7 @@ func (c *Controller) setLabelForGivenConfigurations( if routeName != route.Name { errMsg := fmt.Sprintf("Configuration %q is already in use by %q, and cannot be used by %q", config.Name, routeName, route.Name) - c.recorder.Event(route, corev1.EventTypeWarning, "ConfigurationInUse", errMsg) + c.Recorder.Event(route, corev1.EventTypeWarning, "ConfigurationInUse", errMsg) return errors.New(errMsg) } } @@ -603,7 +436,7 @@ func (c *Controller) setLabelForGivenConfigurations( func (c *Controller) setLabelForGivenRevisions( route *v1alpha1.Route, revMap map[string]*v1alpha1.Revision) error { - revisionClient := c.elaclientset.ElafrosV1alpha1().Revisions(route.Namespace) + revisionClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(route.Namespace) // Validate revision if it already has a route label for _, rev := range revMap { @@ -611,7 +444,7 @@ func (c *Controller) setLabelForGivenRevisions( if routeName != route.Name { errMsg := fmt.Sprintf("Revision %q is already in use by %q, and cannot be used by %q", rev.Name, routeName, route.Name) - c.recorder.Event(route, corev1.EventTypeWarning, "RevisionInUse", errMsg) + c.Recorder.Event(route, corev1.EventTypeWarning, "RevisionInUse", errMsg) return errors.New(errMsg) } } @@ -635,7 +468,7 @@ func (c *Controller) setLabelForGivenRevisions( func (c *Controller) deleteLabelForOutsideOfGivenConfigurations( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration) error { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(route.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(route.Namespace) // Get Configurations set as traffic target before this sync. oldConfigsList, err := configClient.List( metav1.ListOptions{ @@ -664,7 +497,7 @@ func (c *Controller) deleteLabelForOutsideOfGivenConfigurations( func (c *Controller) deleteLabelForOutsideOfGivenRevisions( route *v1alpha1.Route, revMap map[string]*v1alpha1.Revision) error { - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(route.Namespace) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(route.Namespace) oldRevList, err := revClient.List( metav1.ListOptions{ @@ -790,7 +623,7 @@ func (c *Controller) computeEmptyRevisionRoutes( route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) ([]RevisionRoute, error) { ns := route.Namespace elaNS := controller.GetElaNamespaceName(ns) - revClient := c.elaclientset.ElafrosV1alpha1().Revisions(ns) + revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(ns) revRoutes := []RevisionRoute{} for _, tt := range route.Spec.Traffic { configName := tt.ConfigurationName @@ -826,7 +659,7 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m revMap map[string]*v1alpha1.Revision) ([]RevisionRoute, error) { // grab a client that's specific to RouteRule. ns := route.Namespace - routeClient := c.elaclientset.ConfigV1alpha2().RouteRules(ns) + routeClient := c.ElaClientSet.ConfigV1alpha2().RouteRules(ns) if routeClient == nil { glog.Errorf("Failed to create resource client") return nil, fmt.Errorf("Couldn't get a routeClient") @@ -858,17 +691,17 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m } routeRules = MakeIstioRoutes(route, nil, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Create(routeRules); err != nil { - c.recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) } else { routeRules.Spec = makeIstioRouteSpec(route, nil, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Update(routeRules); err != nil { - c.recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update Istio route rule %q: %s", routeRules.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update Istio route rule %q: %s", routeRules.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) } // Create route rule for named traffic targets @@ -884,16 +717,16 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m } routeRules = MakeIstioRoutes(route, &tt, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Create(routeRules); err != nil { - c.recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) + c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) } else { routeRules.Spec = makeIstioRouteSpec(route, &tt, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Update(routeRules); err != nil { return nil, err } - c.recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) + c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) } } if err := c.removeOutdatedRouteRules(route); err != nil { @@ -903,7 +736,7 @@ func (c *Controller) createOrUpdateRouteRules(route *v1alpha1.Route, configMap m } func (c *Controller) updateStatus(route *v1alpha1.Route) (*v1alpha1.Route, error) { - routeClient := c.elaclientset.ElafrosV1alpha1().Routes(route.Namespace) + routeClient := c.ElaClientSet.ElafrosV1alpha1().Routes(route.Namespace) existing, err := routeClient.Get(route.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -971,7 +804,7 @@ func (c *Controller) consolidateTrafficTargets(route *v1alpha1.Route) { func (c *Controller) removeOutdatedRouteRules(u *v1alpha1.Route) error { ns := u.Namespace - routeClient := c.elaclientset.ConfigV1alpha2().RouteRules(ns) + routeClient := c.ElaClientSet.ConfigV1alpha2().RouteRules(ns) if routeClient == nil { glog.Error("Failed to create resource client") return errors.New("Couldn't get a routeClient") @@ -1061,7 +894,7 @@ func (c *Controller) updateIngressEvent(old, new interface{}) { } } ns := ingress.Namespace - routeClient := c.elaclientset.ElafrosV1alpha1().Routes(ns) + routeClient := c.ElaClientSet.ElafrosV1alpha1().Routes(ns) route, err := routeClient.Get(routeName, metav1.GetOptions{}) if err != nil { glog.Errorf("Error fetching route '%s/%s' upon ingress becoming: %v", diff --git a/pkg/controller/service/BUILD.bazel b/pkg/controller/service/BUILD.bazel index c71930f75bb9..cb94c9c09387 100644 --- a/pkg/controller/service/BUILD.bazel +++ b/pkg/controller/service/BUILD.bazel @@ -19,17 +19,12 @@ go_library( "//pkg/controller:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/go.opencensus.io/stats:go_default_library", - "//vendor/go.opencensus.io/stats/view:go_default_library", "//vendor/go.opencensus.io/tag:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/controller/service/service.go b/pkg/controller/service/service.go index a616f4848c15..9ec913e2e7e2 100644 --- a/pkg/controller/service/service.go +++ b/pkg/controller/service/service.go @@ -17,21 +17,15 @@ limitations under the License. package service import ( - "context" "fmt" "reflect" - "time" "github.com/golang/glog" - corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -43,7 +37,6 @@ import ( listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1" "github.com/elafros/elafros/pkg/controller" "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) @@ -62,9 +55,7 @@ const ( // Controller implements the controller for Service resources. // +controller:group=ela,version=v1alpha1,kind=Service,resource=services type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - elaclientset clientset.Interface + *controller.Base // lister indexes properties about Services lister listers.ServiceLister @@ -84,8 +75,8 @@ type Controller struct { // NewController initializes the controller and is called by the generated code // Registers eventhandlers to enqueue events func NewController( - kubeclientset kubernetes.Interface, - elaclientset clientset.Interface, + kubeClientSet kubernetes.Interface, + elaClientSet clientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, elaInformerFactory informers.SharedInformerFactory, config *rest.Config, @@ -96,30 +87,13 @@ func NewController( // obtain references to a shared index informer for the Services. informer := elaInformerFactory.Elafros().V1alpha1().Services() - // Create event broadcaster - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &Controller{ - kubeclientset: kubeclientset, - elaclientset: elaclientset, - lister: informer.Lister(), - synced: informer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services"), - recorder: recorder, + Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory, + elaInformerFactory, informer.Informer(), controllerAgentName, "Revisions"), + lister: informer.Lister(), + synced: informer.Informer().HasSynced, } - glog.Info("Setting up event handlers") - // Set up an event handler for when Service resources change - informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueService, - UpdateFunc: func(old, new interface{}) { - controller.enqueueService(new) - }, - }) return controller } @@ -128,130 +102,8 @@ func NewController( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Service controller") - - // Metrics setup: begin - // Create the tag keys that will be used to add tags to our measurements. - var err error - if statusTagKey, err = tag.NewKey("status"); err != nil { - return fmt.Errorf("failed to create tag key in OpenCensus: %v", err) - } - // Create view to see our measurements cumulatively. - countView := &view.View{ - Description: "Counter to keep track of items in the service work queue.", - Measure: processItemCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{statusTagKey}, - } - if err = view.Register(countView); err != nil { - return fmt.Errorf("failed to register the views in OpenCensus: %v", err) - } - defer view.Unregister(countView) - // Metrics setup: end - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - glog.Info("Starting workers") - // Launch workers to process Service resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling the updateServiceEvent. -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - err, processStatus := func(obj interface{}) (error, string) { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - var key string - var ok bool - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil, controller.PromLabelValueInvalid - } - // Run the updateServiceEvent passing it the namespace/name string of the - // Service resource to be synced. - if err := c.updateServiceEvent(key); err != nil { - return fmt.Errorf("error syncing %q: %v", key, err), controller.PromLabelValueFailure - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced %q", key) - return nil, controller.PromLabelValueSuccess - }(obj) - - if ctx, tagError := tag.New(context.Background(), tag.Insert(statusTagKey, processStatus)); tagError == nil { - // Increment the request count by one. - stats.Record(ctx, processItemCount.M(1)) - } - - if err != nil { - runtime.HandleError(err) - return true - } - - return true -} - -// enqueueService takes a Service resource and -// converts it into a namespace/name string which is then put onto the work -// queue. This method should *not* be passed resources of any type other than -// Service. -// TODO(grantr): generic -// TODO: assert the object passed in is of the correct type. -func (c *Controller) enqueueService(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) + return c.RunController(threadiness, stopCh, []cache.InformerSynced{c.synced}, + c.updateServiceEvent, "Service") } // updateServiceEvent compares the actual state with the desired, and attempts to @@ -298,7 +150,7 @@ func (c *Controller) updateServiceEvent(key string) error { } func (c *Controller) updateStatus(service *v1alpha1.Service) (*v1alpha1.Service, error) { - serviceClient := c.elaclientset.ElafrosV1alpha1().Services(service.Namespace) + serviceClient := c.ElaClientSet.ElafrosV1alpha1().Services(service.Namespace) existing, err := serviceClient.Get(service.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -313,7 +165,7 @@ func (c *Controller) updateStatus(service *v1alpha1.Service) (*v1alpha1.Service, } func (c *Controller) reconcileConfiguration(config *v1alpha1.Configuration) error { - configClient := c.elaclientset.ElafrosV1alpha1().Configurations(config.Namespace) + configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(config.Namespace) existing, err := configClient.Get(config.Name, metav1.GetOptions{}) if err != nil { @@ -331,7 +183,7 @@ func (c *Controller) reconcileConfiguration(config *v1alpha1.Configuration) erro } func (c *Controller) reconcileRoute(route *v1alpha1.Route) error { - routeClient := c.elaclientset.ElafrosV1alpha1().Routes(route.Namespace) + routeClient := c.ElaClientSet.ElafrosV1alpha1().Routes(route.Namespace) existing, err := routeClient.Get(route.Name, metav1.GetOptions{}) if err != nil {