Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/ela-controller/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# The OWNERS file is used by prow to automatically merge approved PRs.

approvers:
- mattmoor
- grantr
- tcnghia
6 changes: 3 additions & 3 deletions cmd/ela-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func main() {
// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
controllers := []controller.Interface{
configuration.NewController(kubeClient, elaClient, buildClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig),
revision.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, buildInformerFactory, cfg, &revControllerConfig),
configuration.NewController(kubeClient, elaClient, buildClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger),
revision.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, buildInformerFactory, cfg, &revControllerConfig, logger),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to wrap logger into revControllerConfig? Same for Configuration and Route. No ConfigurationControllerConfig nor RouteControllerConfig exist yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at revControllerConfig, it seems like it is specifically targeted at user configuration from the config maps. Logger doesn't seem like a natural fit there. Also, adding it to the config might cause confusion within revision controller for example, because the logger is also contained in controller.Base. Let me know if this makes sense.

route.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, autoscaleEnableScaleToZero, logger),
service.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig),
service.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger),
}

go kubeInformerFactory.Start(stopCh)
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ go_library(
"//pkg/client/clientset/versioned:go_default_library",
"//pkg/client/clientset/versioned/scheme:go_default_library",
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/logging:go_default_library",
"//pkg/logging/logkey:go_default_library",
"//vendor/github.com/ghodss/yaml:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/go.uber.org/zap:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ go_library(
"//pkg/client/informers/externalversions:go_default_library",
"//pkg/client/listers/ela/v1alpha1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/logging:go_default_library",
"//pkg/logging/logkey:go_default_library",
"//vendor/github.com/elafros/build/pkg/apis/build/v1alpha1:go_default_library",
"//vendor/github.com/elafros/build/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/go.uber.org/zap:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down Expand Up @@ -43,6 +45,7 @@ go_test(
"//vendor/github.com/elafros/build/pkg/apis/build/v1alpha1:go_default_library",
"//vendor/github.com/elafros/build/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/go.uber.org/zap:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
Expand Down
71 changes: 43 additions & 28 deletions pkg/controller/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package configuration

import (
"context"
"fmt"

buildv1alpha1 "github.com/elafros/build/pkg/apis/build/v1alpha1"
Expand All @@ -27,7 +28,9 @@ import (
informers "github.com/elafros/elafros/pkg/client/informers/externalversions"
listers "github.com/elafros/elafros/pkg/client/listers/ela/v1alpha1"
"github.com/elafros/elafros/pkg/controller"
"github.com/golang/glog"
"github.com/elafros/elafros/pkg/logging"
"github.com/elafros/elafros/pkg/logging/logkey"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,7 +65,8 @@ func NewController(
kubeInformerFactory kubeinformers.SharedInformerFactory,
elaInformerFactory informers.SharedInformerFactory,
config *rest.Config,
controllerConfig controller.Config) controller.Interface {
controllerConfig controller.Config,
logger *zap.SugaredLogger) controller.Interface {

// obtain references to a shared index informer for the Configuration
// and Revision type.
Expand All @@ -71,14 +75,14 @@ func NewController(

controller := &Controller{
Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory,
elaInformerFactory, informer.Informer(), controllerAgentName, "Configurations"),
elaInformerFactory, informer.Informer(), controllerAgentName, "Configurations", logger),
buildClientSet: buildClientSet,
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
revisionsSynced: revisionInformer.Informer().HasSynced,
}

glog.Info("Setting up event handlers")
controller.Logger.Info("Setting up event handlers")
revisionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addRevisionEvent,
UpdateFunc: controller.updateRevisionEvent,
Expand All @@ -95,6 +99,11 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
[]cache.InformerSynced{c.synced, c.revisionsSynced}, c.syncHandler, "Configuration")
}

// loggerWithConfigInfo enriches the logs with configuration name and namespace.
func loggerWithConfigInfo(logger *zap.SugaredLogger, ns string, name string) *zap.SugaredLogger {
return logger.With(zap.String(logkey.Namespace, ns), zap.String(logkey.Configuration, name))
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Configuration
// resource with the current status of the resource.
Expand All @@ -106,6 +115,8 @@ func (c *Controller) syncHandler(key string) error {
return nil
}

logger := loggerWithConfigInfo(c.Logger, namespace, name)

// Get the Configuration resource with this namespace/name
config, err := c.lister.Configurations(namespace).Get(name)
if err != nil {
Expand All @@ -124,12 +135,12 @@ func (c *Controller) syncHandler(key string) error {
// Configuration business logic
if config.GetGeneration() == config.Status.ObservedGeneration {
// TODO(vaikas): Check to see if Status.LatestCreatedRevisionName is ready and update Status.LatestReady
glog.Infof("Skipping reconcile since already reconciled %d == %d",
logger.Infof("Skipping reconcile since already reconciled %d == %d",
config.Spec.Generation, config.Status.ObservedGeneration)
return nil
}

glog.Infof("Running reconcile Configuration for %s\n%+v\n%+v\n",
logger.Infof("Running reconcile Configuration for %s\n%+v\n%+v\n",
config.Name, config, config.Spec.RevisionTemplate)
spec := config.Spec.RevisionTemplate.Spec
controllerRef := controller.NewConfigurationControllerRef(config)
Expand All @@ -146,11 +157,11 @@ func (c *Controller) syncHandler(key string) error {
build.OwnerReferences = append(build.OwnerReferences, *controllerRef)
created, err := c.buildClientSet.BuildV1alpha1().Builds(build.Namespace).Create(build)
if err != nil {
glog.Errorf("Failed to create Build:\n%+v\n%s", build, err)
logger.Errorf("Failed to create Build:\n%+v\n%s", build, err)
c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Build %q: %v", build.Name, err)
return err
}
glog.Infof("Created Build:\n%+v", created.Name)
logger.Infof("Created Build:\n%+v", created.Name)
c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Build %q", created.Name)
spec.BuildName = created.Name
}
Expand All @@ -164,7 +175,7 @@ func (c *Controller) syncHandler(key string) error {
created, err := revClient.Get(revName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
glog.Errorf("Revisions Get for %q failed: %s", revName, err)
logger.Error("Revisions Get failed", zap.Error(err), zap.String(logkey.Revision, revName))
return err
}

Expand Down Expand Up @@ -194,14 +205,14 @@ func (c *Controller) syncHandler(key string) error {

created, err = revClient.Create(rev)
if err != nil {
glog.Errorf("Failed to create Revision:\n%+v\n%s", rev, err)
logger.Errorf("Failed to create Revision:\n%+v\n%s", rev, err)
c.Recorder.Eventf(config, corev1.EventTypeWarning, "CreationFailed", "Failed to create Revision %q: %v", rev.Name, err)
return err
}
c.Recorder.Eventf(config, corev1.EventTypeNormal, "Created", "Created Revision %q", rev.Name)
glog.Infof("Created Revision:\n%+v", created)
logger.Infof("Created Revision:\n%+v", created)
} else {
glog.Infof("Revision already created %s: %s", created.ObjectMeta.Name, err)
logger.Infof("Revision already created %s: %s", created.ObjectMeta.Name, err)
}
// Update the Status of the configuration with the latest generation that
// we just reconciled against so we don't keep generating revisions.
Expand All @@ -210,10 +221,10 @@ func (c *Controller) syncHandler(key string) error {
config.Status.LatestCreatedRevisionName = created.ObjectMeta.Name
config.Status.ObservedGeneration = config.Spec.Generation

glog.Infof("Updating the configuration status:\n%+v", config)
logger.Infof("Updating the configuration status:\n%+v", config)

if _, err = c.updateStatus(config); err != nil {
glog.Errorf("Failed to update the configuration %s", err)
logger.Error("Failed to update the configuration", zap.Error(err))
return err
}

Expand Down Expand Up @@ -253,49 +264,49 @@ func (c *Controller) addRevisionEvent(obj interface{}) {
return
}

logger := loggerWithConfigInfo(c.Logger, namespace, configName).With(zap.String(logkey.Revision, revisionName))
ctx := logging.WithLogger(context.TODO(), logger)

config, err := c.lister.Configurations(namespace).Get(configName)
if err != nil {
glog.Errorf("Error fetching configuration '%s/%s' upon revision becoming ready: %v",
namespace, configName, err)
logger.Error("Error fetching configuration upon revision becoming ready", zap.Error(err))
return
}

if revision.Name != config.Status.LatestCreatedRevisionName {
// The revision isn't the latest created one, so ignore this event.
glog.Infof("Revision %q is not the latest created one", revisionName)
logger.Info("Revision is not the latest created one")
return
}

// Don't modify the informer's copy.
config = config.DeepCopy()

if !revision.Status.IsReady() {
glog.Infof("Revision %q of configuration %q is not ready", revisionName, config.Name)
logger.Infof("Revision %q of configuration %q is not ready", revisionName, config.Name)

//add LatestRevision condition to be false with the status from the revision
c.markConfigurationLatestRevisionStatus(config, revision)
c.markConfigurationLatestRevisionStatus(ctx, config, revision)

if _, err := c.updateStatus(config); err != nil {
glog.Errorf("Error updating configuration '%s/%s': %v",
namespace, configName, err)
logger.Error("Error updating configuration", zap.Error(err))
}
c.Recorder.Eventf(config, corev1.EventTypeNormal, "LatestRevisionUpdate",
"Latest revision of configuration is not ready")

} else {
glog.Infof("Revision %q is ready", revisionName)
logger.Info("Revision is ready")

alreadyReady := config.Status.IsReady()
if !alreadyReady {
c.markConfigurationReady(config, revision)
c.markConfigurationReady(ctx, config, revision)
}
glog.Infof("Setting LatestReadyRevisionName of Configuration %q to revision %q",
logger.Infof("Setting LatestReadyRevisionName of Configuration %q to revision %q",
config.Name, revision.Name)
config.Status.LatestReadyRevisionName = revision.Name

if _, err := c.updateStatus(config); err != nil {
glog.Errorf("Error updating configuration '%s/%s': %v",
namespace, configName, err)
logger.Error("Error updating configuration", zap.Error(err))
}
if !alreadyReady {
c.Recorder.Eventf(config, corev1.EventTypeNormal, "ConfigurationReady",
Expand Down Expand Up @@ -324,8 +335,10 @@ func getLatestRevisionStatusCondition(revision *v1alpha1.Revision) *v1alpha1.Rev
// Mark ConfigurationConditionReady of Configuration ready as the given latest
// created revision is ready.
func (c *Controller) markConfigurationReady(
ctx context.Context,
config *v1alpha1.Configuration, revision *v1alpha1.Revision) {
glog.Infof("Marking Configuration %q ready", config.Name)
logger := logging.FromContext(ctx)
logger.Info("Marking Configuration ready")
config.Status.RemoveCondition(v1alpha1.ConfigurationConditionLatestRevisionReady)
config.Status.SetCondition(
&v1alpha1.ConfigurationCondition{
Expand All @@ -338,11 +351,13 @@ func (c *Controller) markConfigurationReady(
// Mark ConfigurationConditionLatestRevisionReady of Configuration to false with the status
// from the revision
func (c *Controller) markConfigurationLatestRevisionStatus(
ctx context.Context,
config *v1alpha1.Configuration, revision *v1alpha1.Revision) {
logger := logging.FromContext(ctx)
config.Status.RemoveCondition(v1alpha1.ConfigurationConditionReady)
cond := getLatestRevisionStatusCondition(revision)
if cond == nil {
glog.Infof("Revision status is not updated yet")
logger.Info("Revision status is not updated yet")
return
}
config.Status.SetCondition(
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/configuration/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -151,6 +152,7 @@ func newTestController(t *testing.T, elaObjects ...runtime.Object) (
elaInformer,
&rest.Config{},
ctrl.Config{},
zap.NewNop().Sugar(),
).(*Controller)

return
Expand Down
34 changes: 24 additions & 10 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
clientset "github.com/elafros/elafros/pkg/client/clientset/versioned"
elascheme "github.com/elafros/elafros/pkg/client/clientset/versioned/scheme"
informers "github.com/elafros/elafros/pkg/client/informers/externalversions"
"github.com/golang/glog"
"github.com/elafros/elafros/pkg/logging/logkey"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -74,6 +75,13 @@ type Base struct {
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
WorkQueue workqueue.RateLimitingInterface

// Sugared logger is easier to use but is not as performant as the
// raw logger. In performance critical paths, call logger.Desugar()
// and use the returned raw logger instead. In addition to the
// performance benefits, raw logger also preserves type-safety at
// the expense of slightly greater verbosity.
Logger *zap.SugaredLogger
}

// NewBase instantiates a new instance of Base implementing
Expand All @@ -85,12 +93,16 @@ func NewBase(
elaInformerFactory informers.SharedInformerFactory,
informer cache.SharedIndexInformer,
controllerAgentName string,
workQueueName string) *Base {
workQueueName string,
logger *zap.SugaredLogger) *Base {

// Enrich the logs with controller name
logger = logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName))

// Create event broadcaster
glog.V(4).Info("Creating event broadcaster")
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

Expand All @@ -101,6 +113,7 @@ func NewBase(
ElaInformerFactory: elaInformerFactory,
Recorder: recorder,
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Logger: logger,
}

// Set up an event handler for when the resource types of interest change
Expand Down Expand Up @@ -140,28 +153,29 @@ func (c *Base) RunController(
defer runtime.HandleCrash()
defer c.WorkQueue.ShutDown()

glog.Infof("Starting %s controller", controllerName)
logger := c.Logger
logger.Infof("Starting %s controller", controllerName)

// Wait for the caches to be synced before starting workers
glog.Info("Waiting for informer caches to sync")
logger.Info("Waiting for informer caches to sync")
for i, synced := range informersSynced {
if ok := cache.WaitForCacheSync(stopCh, synced); !ok {
return fmt.Errorf("failed to wait for cache at index %v to sync", i)
}
}

// Launch workers to process Revision resources
glog.Info("Starting workers")
logger.Info("Starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(func() {
for c.processNextWorkItem(syncHandler) {
}
}, time.Second, stopCh)
}

glog.Info("Started workers")
logger.Info("Started workers")
<-stopCh
glog.Info("Shutting down workers")
logger.Info("Shutting down workers")

return nil
}
Expand Down Expand Up @@ -207,7 +221,7 @@ func (c *Base) processNextWorkItem(syncHandler func(string) error) bool {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.WorkQueue.Forget(obj)
glog.Infof("Successfully synced %q", key)
c.Logger.Infof("Successfully synced %q", key)
return nil
}(obj)

Expand Down
Loading