Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,23 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/ela/v1alpha1:go_default_library",
"//pkg/client/clientset/versioned:go_default_library",
"//pkg/client/clientset/versioned/scheme:go_default_library",
"//pkg/client/informers/externalversions:go_default_library",
"//vendor/github.com/ghodss/yaml:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

Expand Down
5 changes: 0 additions & 5 deletions pkg/controller/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

Expand Down
200 changes: 28 additions & 172 deletions pkg/controller/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,47 @@ package configuration

import (
"fmt"
"time"

buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1"
buildclientset "github.com/elafros/build/pkg/client/clientset/versioned"
"github.com/elafros/elafros/pkg/apis/ela"
"github.com/elafros/elafros/pkg/apis/ela/v1alpha1"
clientset "github.com/elafros/elafros/pkg/client/clientset/versioned"
informers "github.com/elafros/elafros/pkg/client/informers/externalversions"
listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1"
"github.com/elafros/elafros/pkg/controller"

"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1"
buildclientset "github.com/elafros/build/pkg/client/clientset/versioned"
"github.com/elafros/elafros/pkg/apis/ela"
"github.com/elafros/elafros/pkg/apis/ela/v1alpha1"
clientset "github.com/elafros/elafros/pkg/client/clientset/versioned"
informers "github.com/elafros/elafros/pkg/client/informers/externalversions"
listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1"
)

const controllerAgentName = "configuration-controller"

// Controller implements the controller for Configuration resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
elaclientset clientset.Interface
buildclientset buildclientset.Interface
*controller.Base

buildClientSet buildclientset.Interface

// lister indexes properties about Configuration
lister listers.ConfigurationLister
synced cache.InformerSynced

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder

// don't start the workers until revisions cache have been synced
revisionsSynced cache.InformerSynced
}

// NewController creates a new Configuration controller
//TODO(grantr): somewhat generic (generic behavior)
func NewController(
kubeclientset kubernetes.Interface,
elaclientset clientset.Interface,
buildclientset buildclientset.Interface,
kubeClientSet kubernetes.Interface,
elaClientSet clientset.Interface,
buildClientSet buildclientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
elaInformerFactory informers.SharedInformerFactory,
config *rest.Config,
Expand All @@ -89,33 +69,16 @@ func NewController(
informer := elaInformerFactory.Elafros().V1alpha1().Configurations()
revisionInformer := elaInformerFactory.Elafros().V1alpha1().Revisions()

// Create event broadcaster
glog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &Controller{
kubeclientset: kubeclientset,
elaclientset: elaclientset,
buildclientset: buildclientset,
Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory,
elaInformerFactory, informer.Informer(), controllerAgentName, "Configurations"),
buildClientSet: buildClientSet,
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
revisionsSynced: revisionInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Configurations"),
recorder: recorder,
}

glog.Info("Setting up event handlers")
// Set up an event handler for when Configuration resources change
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueConfiguration,
UpdateFunc: func(old, new interface{}) {
controller.enqueueConfiguration(new)
},
})

revisionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addRevisionEvent,
UpdateFunc: controller.updateRevisionEvent,
Expand All @@ -127,121 +90,14 @@ func NewController(
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
//TODO(grantr): generic
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

// Start the informer factories to begin populating the informer caches
glog.Info("Starting Configuration controller")

// Wait for the caches to be synced before starting workers
glog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

// Wait for the revisions caches to be synced before starting workers
glog.Info("Waiting for revisions informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.revisionsSynced); !ok {
return fmt.Errorf("failed to wait for revisions caches to sync")
}

glog.Info("Starting workers")
// Launch workers to process Configuration resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

glog.Info("Started workers")
<-stopCh
glog.Info("Shutting down workers")

return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
//TODO(grantr): generic
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
//TODO(grantr): generic
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()

if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Configuration resource to be synced.
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing %q: %v", key, err)
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
glog.Infof("Successfully synced %q", key)
return nil
}(obj)

if err != nil {
runtime.HandleError(err)
return true
}

return true
}

// enqueueConfiguration takes a Configuration resource and
// converts it into a namespace/name string which is then put onto the work
// queue. This method should *not* be passed resources of any type other than
// Configuration.
//TODO(grantr): generic
func (c *Controller) enqueueConfiguration(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
return c.RunController(threadiness, stopCh,
[]cache.InformerSynced{c.synced, c.revisionsSynced}, c.syncHandler, "Configuration")
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Configuration
// resource with the current status of the resource.
//TODO(grantr): not generic
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -288,14 +144,14 @@ func (c *Controller) syncHandler(key string) error {
Spec: *config.Spec.Build,
}
build.OwnerReferences = append(build.OwnerReferences, *controllerRef)
created, err := c.buildclientset.BuildV1alpha1().Builds(build.Namespace).Create(build)
created, err := c.buildClientSet.BuildV1alpha1().Builds(build.Namespace).Create(build)
if err != nil {
glog.Errorf("Failed to create Build:\n%+v\n%s", build, err)
c.recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err)
c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err)
return err
}
glog.Infof("Created Build:\n%+v", created.Name)
c.recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name)
c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name)
spec.BuildName = created.Name
}

Expand All @@ -304,7 +160,7 @@ func (c *Controller) syncHandler(key string) error {
return err
}

revClient := c.elaclientset.ElafrosV1alpha1().Revisions(config.Namespace)
revClient := c.ElaClientSet.ElafrosV1alpha1().Revisions(config.Namespace)
created, err := revClient.Get(revName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
Expand Down Expand Up @@ -339,10 +195,10 @@ func (c *Controller) syncHandler(key string) error {
created, err = revClient.Create(rev)
if err != nil {
glog.Errorf("Failed to create Revision:\n%+v\n%s", rev, err)
c.recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err)
c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err)
return err
}
c.recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name)
c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name)
glog.Infof("Created Revision:\n%+v", created)
} else {
glog.Infof("Revision already created %s: %s", created.ObjectMeta.Name, err)
Expand Down Expand Up @@ -374,7 +230,7 @@ func generateRevisionName(u *v1alpha1.Configuration) (string, error) {
}

func (c *Controller) updateStatus(u *v1alpha1.Configuration) (*v1alpha1.Configuration, error) {
configClient := c.elaclientset.ElafrosV1alpha1().Configurations(u.Namespace)
configClient := c.ElaClientSet.ElafrosV1alpha1().Configurations(u.Namespace)
newu, err := configClient.Get(u.Name, metav1.GetOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -423,7 +279,7 @@ func (c *Controller) addRevisionEvent(obj interface{}) {
glog.Errorf("Error updating configuration '%s/%s': %v",
namespace, configName, err)
}
c.recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate",
c.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate",
"Latest revision of configuration is not ready")

} else {
Expand All @@ -442,10 +298,10 @@ func (c *Controller) addRevisionEvent(obj interface{}) {
namespace, configName, err)
}
if !alreadyReady {
c.recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady",
c.Recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady",
"Configuration becomes ready")
}
c.recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate",
c.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestReadyUpdate",
"LatestReadyRevisionName updated to %q", revision.Name)

}
Expand Down
Loading