diff --git a/Gopkg.lock b/Gopkg.lock index 05cfa2c1b6f..2349c0bf3ed 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -261,7 +261,7 @@ revision = "5c1d8c8469d1ed34b2aecf4c2305b3a57fff2ee3" [[projects]] - digest = "1:c0361b10d998857122ab480346b688f7dfabf5e9b34e52d786e8e6fd02b602d9" + digest = "1:8ad05f1ef3232281106ee79f26006479e1623724c6db853b24d6ef127ffcda14" name = "github.com/knative/serving" packages = [ "pkg/apis/istio", @@ -280,6 +280,9 @@ "pkg/client/informers/externalversions/serving/v1alpha1", "pkg/client/listers/istio/v1alpha3", "pkg/client/listers/serving/v1alpha1", + "pkg/configmap", + "pkg/logging", + "pkg/logging/logkey", ] pruneopts = "NUT" revision = "2326173b13899c32896caf526a318a8b30033b3e" @@ -1011,8 +1014,12 @@ "github.com/knative/serving/pkg/client/clientset/versioned", "github.com/knative/serving/pkg/client/informers/externalversions", "github.com/knative/serving/pkg/client/listers/istio/v1alpha3", + "github.com/knative/serving/pkg/configmap", + "github.com/knative/serving/pkg/logging", + "github.com/knative/serving/pkg/logging/logkey", "github.com/mattbaird/jsonpatch", "github.com/prometheus/client_golang/prometheus/promhttp", + "go.uber.org/zap", "golang.org/x/net/context", "golang.org/x/oauth2", "google.golang.org/grpc/codes", diff --git a/cmd/controller/main.go b/cmd/controller/main.go index ab1e6fa215f..d21cfa422a5 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -19,6 +19,7 @@ package main import ( "context" "flag" + "log" "net/http" "time" @@ -42,13 +43,18 @@ import ( "github.com/knative/eventing/pkg/controller/flow" "github.com/knative/eventing/pkg/signals" + "github.com/knative/serving/pkg/configmap" + "github.com/knative/serving/pkg/logging" + "github.com/knative/serving/pkg/logging/logkey" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" ) const ( threadsPerController = 2 metricsScrapeAddr = ":9090" metricsScrapePath = "/metrics" + logLevelKey = "controller" ) var ( @@ -57,7 +63,20 @@ var ( ) func main() { + flag.Parse() + cm, err := configmap.Load("/etc/config-logging") + if err != nil { + log.Fatalf("Error loading logging configuration: %v", err) + } + config, err := logging.NewConfigFromMap(cm) + if err != nil { + log.Fatalf("Error parsing logging configuration: %v", err) + } + logger, atomicLevel := logging.NewLoggerFromConfig(config, logLevelKey) + defer logger.Sync() + logger = logger.With(zap.String(logkey.ControllerType, "controller")) + logger.Info("Starting the knative controller") // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() @@ -72,7 +91,7 @@ func main() { glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } - client, err := clientset.NewForConfig(cfg) + eventingClient, err := clientset.NewForConfig(cfg) if err != nil { glog.Fatalf("Error building clientset: %s", err.Error()) } @@ -90,9 +109,17 @@ func main() { } kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) - informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) + informerFactory := informers.NewSharedInformerFactory(eventingClient, time.Second*30) servingInformerFactory := servinginformers.NewSharedInformerFactory(servingClient, time.Second*30) + opt := controller.Options{ + KubeClientSet: kubeClient, + ServingClientSet: servingClient, + BuildClientSet: buildClient, + ConfigMapWatcher: configMapWatcher, + Logger: logger, + } + // Add new controllers here. ctors := []controller.Constructor{ flow.NewController, @@ -105,7 +132,7 @@ func main() { controllers := make([]controller.Interface, 0, len(ctors)) for _, ctor := range ctors { controllers = append(controllers, - ctor(kubeClient, client, servingClient, restConfig, kubeInformerFactory, informerFactory, servingInformerFactory)) + ctor(kubeClient, eventingClient, servingClient, restConfig, kubeInformerFactory, informerFactory, servingInformerFactory)) } go kubeInformerFactory.Start(stopCh) @@ -133,6 +160,13 @@ func main() { } }() + // Watch the logging config map and dynamically update logging levels. + configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace) + configMapWatcher.Watch(logging.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logLevelKey)) + if err = configMapWatcher.Start(stopCh); err != nil { + logger.Fatalf("failed to start configuration manager: %v", err) + } + <-stopCh // Close the http server gracefully diff --git a/config/500-controller.yaml b/config/500-controller.yaml index 9078d8ce619..7c567d7d7a2 100644 --- a/config/500-controller.yaml +++ b/config/500-controller.yaml @@ -30,6 +30,15 @@ spec: - name: eventing-controller image: github.com/knative/eventing/cmd/controller args: [ - "-logtostderr", - "-stderrthreshold", "INFO", - ] + # Disable glog writing into stderr. Our code doesn't use glog + # and seeing k8s logs in addition to ours is not useful. + - "-logtostderr=false" + - "-stderrthreshold=FATAL" + volumeMounts: + - name: config-logging + mountPath: /etc/config-logging + volumes: + - name: config-logging + configMap: + name: config-logging + \ No newline at end of file diff --git a/config/config-logging.yaml b/config/config-logging.yaml new file mode 100644 index 00000000000..8856471ff92 --- /dev/null +++ b/config/config-logging.yaml @@ -0,0 +1,48 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-logging + namespace: knative-eventing +data: + # Common configuration for all Knative codebase + zap-logger-config: | + { + "level": "info", + "development": false, + "outputPaths": ["stdout"], + "errorOutputPaths": ["stderr"], + "encoding": "json", + "encoderConfig": { + "timeKey": "ts", + "levelKey": "level", + "nameKey": "logger", + "callerKey": "caller", + "messageKey": "msg", + "stacktraceKey": "stacktrace", + "lineEnding": "", + "levelEncoder": "", + "timeEncoder": "iso8601", + "durationEncoder": "", + "callerEncoder": "" + } + } + + # Log level overrides + # For all components changes are be picked up immediately. + loglevel.controller: "info" + loglevel.webhook: "info" + diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index efdc0dfb61d..96b6b1eeb03 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -17,21 +17,196 @@ limitations under the License. package controller import ( - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" + "time" + + clientset "github.com/knative/eventing/pkg/client/clientset/versioned" + "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" + "github.com/knative/serving/pkg/configmap" + "github.com/knative/serving/pkg/logging/logkey" - clientset "github.com/knative/eventing/pkg/client/clientset/versioned" - informers "github.com/knative/eventing/pkg/client/informers/externalversions" + "go.uber.org/zap" + + "fmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" ) type Interface interface { Run(threadiness int, stopCh <-chan struct{}) error } +// Base implements most of the boilerplate and common code +// we have in our controllers. +type Base struct { + // KubeClientSet allows us to talk to the k8s for core APIs + KubeClientSet kubernetes.Interface + + // ServingClientSet allows us to configure Serving objects + ServingClientSet servingclientset.Interface + + // EventingClientSet allows us to configure Eventing objects + EventingClientSet clientset.Interface + + // ConfigMapWatcher allows us to watch for ConfigMap changes. + ConfigMapWatcher configmap.Watcher + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // WorkQueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + WorkQueue workqueue.RateLimitingInterface + + // Sugared logger is easier to use but is not as performant as the + // raw logger. In performance critical paths, call logger.Desugar() + // and use the returned raw logger instead. In addition to the + // performance benefits, raw logger also preserves type-safety at + // the expense of slightly greater verbosity. + Logger *zap.SugaredLogger +} + +// Options defines the common controller options passed to NewBase. +// We define this to reduce the boilerplate argument list when +// creating derivative controllers. +type Options struct { + KubeClientSet kubernetes.Interface + ServingClientSet servingclientset.Interface + EventingClientSet clientset.Interface + ConfigMapWatcher configmap.Watcher + Logger *zap.SugaredLogger +} + +// NewBase instantiates a new instance of Base implementing +// the common & boilerplate code between our controllers. +func NewBase(opt Options, controllerAgentName, workQueueName string) *Base { + // Enrich the logs with controller name + logger := opt.Logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName)) + + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: opt.KubeClientSet.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder( + scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + base := &Base{ + KubeClientSet: opt.KubeClientSet, + ServingClientSet: opt.ServingClientSet, + EventingClientSet: opt.EventingClientSet, + ConfigMapWatcher: opt.ConfigMapWatcher, + Recorder: recorder, + WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), + Logger: logger, + } + + return base +} + +// RunController starts the controller's worker threads, the number of which is threadiness. It then blocks until stopCh +// is closed, at which point it shuts down its internal work queue and waits for workers to finish processing their +// current work items. +func (c *Base) RunController( + threadiness int, + stopCh <-chan struct{}, + syncHandler func(string) error, + controllerName string) error { + + defer runtime.HandleCrash() + defer c.WorkQueue.ShutDown() + + logger := c.Logger + logger.Infof("Starting %s controller", controllerName) + + // Launch workers to process Revision resources + logger.Info("Starting workers") + for i := 0; i < threadiness; i++ { + go wait.Until(func() { + for c.processNextWorkItem(syncHandler) { + } + }, time.Second, stopCh) + } + + logger.Info("Started workers") + <-stopCh + logger.Info("Shutting down workers") + + return nil +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *Base) processNextWorkItem(syncHandler func(string) error) bool { + obj, shutdown := c.WorkQueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.base.WorkQueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.WorkQueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.WorkQueue.Forget(obj) + c.Logger.Errorf("expected string in workqueue but got %#v", obj) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // resource to be synced. + if err := syncHandler(key); err != nil { + return fmt.Errorf("error syncing %q: %v", key, err) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.WorkQueue.Forget(obj) + c.Logger.Infof("Successfully synced %q", key) + return nil + }(obj) + + if err != nil { + c.Logger.Error(zap.Error(err)) + return true + } + + return true +} + +// GetWorkQueue helps implement Interface for derivatives. +func (b *Base) GetWorkQueue() workqueue.RateLimitingInterface { + return b.WorkQueue +} + type Constructor func( kubernetes.Interface, clientset.Interface, diff --git a/pkg/controller/flow/flow.go b/pkg/controller/flow/flow.go index d7aa8d88f00..275b1b684ab 100644 --- a/pkg/controller/flow/flow.go +++ b/pkg/controller/flow/flow.go @@ -18,10 +18,7 @@ package flow import ( "fmt" - "log" - "time" - "github.com/golang/glog" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -30,19 +27,13 @@ import ( runtimetypes "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" // TODO: Get rid of these, but needed as other controllers use them. - servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" + "github.com/knative/serving/pkg/logging/logkey" "github.com/knative/eventing/pkg/controller" @@ -50,12 +41,14 @@ import ( feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" - clientset "github.com/knative/eventing/pkg/client/clientset/versioned" - flowscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + "context" informers "github.com/knative/eventing/pkg/client/informers/externalversions" channelListers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" feedListers "github.com/knative/eventing/pkg/client/listers/feeds/v1alpha1" listers "github.com/knative/eventing/pkg/client/listers/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller/flow/resources" + "github.com/knative/serving/pkg/logging" + "go.uber.org/zap" ) const controllerAgentName = "flow-controller" @@ -75,22 +68,14 @@ const ( MessageResourceSynced = "Flow synced successfully" ) -var ( - flowControllerKind = v1alpha1.SchemeGroupVersion.WithKind("Flow") -) - // Controller is the controller implementation for Flow resources type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface + *controller.Base // restConfig is used to create dynamic clients for // resolving ObjectReference targets. restConfig *rest.Config - // clientset is a clientset for our own API group - clientset clientset.Interface - flowsLister listers.FlowLister flowsSynced cache.InformerSynced @@ -102,27 +87,15 @@ type Controller struct { subscriptionsLister channelListers.SubscriptionLister subscriptionsSynced cache.InformerSynced - - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder } // NewController returns a new flow controller func NewController( - kubeclientset kubernetes.Interface, - clientset clientset.Interface, - servingclientset servingclientset.Interface, + opt controller.Options, restConfig *rest.Config, - kubeInformerFactory kubeinformers.SharedInformerFactory, + _ kubeinformers.SharedInformerFactory, flowsInformerFactory informers.SharedInformerFactory, - routeInformerFactory servinginformers.SharedInformerFactory) controller.Interface { + _ servinginformers.SharedInformerFactory) controller.Interface { // obtain a reference to a shared index informer for the Flow types. flowInformer := flowsInformerFactory.Flows().V1alpha1() @@ -134,20 +107,10 @@ func NewController( subscriptionInformer := flowsInformerFactory.Channels().V1alpha1().Subscriptions() - // Create event broadcaster - // Add flow-controller types to the default Kubernetes Scheme so Events can be - // logged for flow-controller types. - flowscheme.AddToScheme(scheme.Scheme) - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - - controller := &Controller{ - kubeclientset: kubeclientset, + c := &Controller{ + Base: controller.NewBase(opt, controllerAgentName, "Flows"), + restConfig: restConfig, - clientset: clientset, flowsLister: flowInformer.Flows().Lister(), flowsSynced: flowInformer.Flows().Informer().HasSynced, feedsLister: feedInformer.Feeds().Lister(), @@ -156,21 +119,19 @@ func NewController( channelsSynced: channelInformer.Informer().HasSynced, subscriptionsLister: subscriptionInformer.Lister(), subscriptionsSynced: subscriptionInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Flows"), - recorder: recorder, } - glog.Info("Setting up event handlers") + c.Logger.Info("Setting up event handlers") // Set up an event handler for when Flow resources change flowInformer.Flows().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueFlow, + AddFunc: c.enqueueFlow, UpdateFunc: func(old, new interface{}) { - controller.enqueueFlow(new) + c.enqueueFlow(new) }, }) - return controller + return c } // Run will set up the event handlers for types we are interested in, as well @@ -178,96 +139,36 @@ func NewController( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Flow controller") - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for Flow informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.flowsSynced); !ok { - return fmt.Errorf("failed to wait for Flow caches to sync") - } - - glog.Info("Waiting for Feed informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.feedsSynced); !ok { - return fmt.Errorf("failed to wait for Feed caches to sync") - } - - glog.Info("Waiting for channel informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.channelsSynced); !ok { - return fmt.Errorf("failed to wait for Channel caches to sync") - } - - glog.Info("Starting workers") - // Launch two workers to process Flow resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil + return c.RunController(threadiness, stopCh, c.Reconcile, "Flow") + + // TODO(nicholss): is this ok to not wait? the RunController for Base does not wait for cache sync. + + //// Wait for the caches to be synced before starting workers + //glog.Info("Waiting for Flow informer caches to sync") + //if ok := cache.WaitForCacheSync(stopCh, c.flowsSynced); !ok { + // return fmt.Errorf("failed to wait for Flow caches to sync") + //} + // + //glog.Info("Waiting for Feed informer caches to sync") + //if ok := cache.WaitForCacheSync(stopCh, c.feedsSynced); !ok { + // return fmt.Errorf("failed to wait for Feed caches to sync") + //} + // + //glog.Info("Waiting for channel informer caches to sync") + //if ok := cache.WaitForCacheSync(stopCh, c.channelsSynced); !ok { + // return fmt.Errorf("failed to wait for Channel caches to sync") + //} } -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling Reconcile. -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - if err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - key, ok := obj.(string) - if !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - // Run the Reconcile, passing it the namespace/name string of the - // Flow resource to be synced. - if err := c.Reconcile(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced '%s'", key) - return nil - }(obj); err != nil { - runtime.HandleError(err) - } +// TODO: move this to serving logkey +const ( + // Service is the key used for service name in structured logs + logkeyFlow = "knative.dev/service" +) - return true +// loggerWithServiceInfo enriches the logs with service name and namespace. +func loggerWithServiceInfo(logger *zap.SugaredLogger, ns string, name string) *zap.SugaredLogger { + return logger.With(zap.String(logkey.Namespace, ns), zap.String(logkeyFlow, name)) } // enqueueFlow takes a Flow resource and converts it into a namespace/name @@ -280,7 +181,7 @@ func (c *Controller) enqueueFlow(obj interface{}) { runtime.HandleError(err) return } - c.workqueue.AddRateLimited(key) + c.WorkQueue.AddRateLimited(key) } // Reconcile compares the actual state with the desired, and attempts to @@ -290,17 +191,23 @@ func (c *Controller) Reconcile(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + c.Logger.Errorf("invalid resource key: %s", key) + //runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) // TODO(nicholss) ok to delete? return nil } + // Wrap our logger with the additional context of the configuration that we are reconciling. + logger := loggerWithServiceInfo(c.Logger, namespace, name) + ctx := logging.WithLogger(context.TODO(), logger) + // Get the Flow resource with this namespace/name original, err := c.flowsLister.Flows(namespace).Get(name) if err != nil { // The Flow resource may no longer exist, in which case we stop // processing. if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("flow '%s' in work queue no longer exists", key)) + logger.Errorf("flow %q in work queue no longer exists", key) + //runtime.HandleError(fmt.Errorf("flow '%s' in work queue no longer exists", key)) // TODO(nicholss) ok to delete? return nil } return err @@ -311,59 +218,61 @@ func (c *Controller) Reconcile(key string) error { // Reconcile this copy of the Flow and then write back any status // updates regardless of whether the reconcile error out. - err = c.reconcile(flow) + err = c.reconcile(ctx, flow) if equality.Semantic.DeepEqual(original.Status, flow.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. } else if _, err := c.updateStatus(flow); err != nil { - glog.Warningf("Failed to update flow status: %v", err) + logger.Warnf("Failed to update flow status: %v", zap.Error(err)) return err } return err } -func (c *Controller) reconcile(flow *v1alpha1.Flow) error { +func (c *Controller) reconcile(ctx context.Context, flow *v1alpha1.Flow) error { + logger := logging.FromContext(ctx) + // See if the flow has been deleted accessor, err := meta.Accessor(flow) if err != nil { - log.Fatalf("Failed to get metadata: %s", err) + logger.Fatalf("Failed to get metadata: %s", zap.Error(err)) } deletionTimestamp := accessor.GetDeletionTimestamp() - glog.Infof("DeletionTimestamp: %v", deletionTimestamp) + logger.Infof("DeletionTimestamp: %v", deletionTimestamp) - target, err := c.resolveActionTarget(flow.Namespace, flow.Spec.Action) + target, err := c.resolveActionTarget(ctx, flow.Namespace, flow.Spec.Action) if err != nil { - glog.Warningf("Failed to resolve target %v : %v", flow.Spec.Action, err) + logger.Warnf("Failed to resolve target %v : %v", flow.Spec.Action, zap.Error(err)) return err } // Ok, so target is the underlying k8s service (or URI if so specified) that we want to target - glog.Infof("Resolved Target to: %q", target) + logger.Infof("Resolved Target to: %q", target) // Reconcile the Channel. Creates a channel that is the target that the Feed will use. // TODO: We should reuse channels possibly. - channel, err := c.reconcileChannel(flow) + channel, err := c.reconcileChannel(ctx, flow) if err != nil { - glog.Warningf("Failed to reconcile channel : %v", err) + logger.Warnf("Failed to reconcile channel : %v", zap.Error(err)) return err } flow.Status.PropagateChannelStatus(channel.Status) subscription, err := c.reconcileSubscription(channel.Name, target, flow) if err != nil { - glog.Warningf("Failed to reconcile subscription : %v", err) + logger.Warnf("Failed to reconcile subscription : %v", zap.Error(err)) return err } flow.Status.PropagateSubscriptionStatus(subscription.Status) channelDNS := channel.Status.DomainInternal if channelDNS != "" { - glog.Infof("Reconciling feed for flow %q targeting %q", flow.Name, channelDNS) - feed, err := c.reconcileFeed(channelDNS, flow) + logger.Infof("Reconciling feed for flow %q targeting %q", flow.Name, channelDNS) + feed, err := c.reconcileFeed(ctx, channelDNS, flow) if err != nil { - glog.Warningf("Failed to reconcile feed: %v", err) + logger.Warnf("Failed to reconcile feed: %v", zap.Error(err)) } flow.Status.PropagateFeedStatus(feed.Status) } @@ -371,7 +280,7 @@ func (c *Controller) reconcile(flow *v1alpha1.Flow) error { } func (c *Controller) updateStatus(u *v1alpha1.Flow) (*v1alpha1.Flow, error) { - flowClient := c.clientset.FlowsV1alpha1().Flows(u.Namespace) + flowClient := c.EventingClientSet.FlowsV1alpha1().Flows(u.Namespace) newu, err := flowClient.Get(u.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -400,11 +309,13 @@ func AddFinalizer(obj runtimetypes.Object, value string) error { // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Flow resource // with the current status of the resource. -func (c *Controller) resolveActionTarget(namespace string, action v1alpha1.FlowAction) (string, error) { - glog.Infof("Resolving target: %v", action) +func (c *Controller) resolveActionTarget(ctx context.Context, namespace string, action v1alpha1.FlowAction) (string, error) { + logger := logging.FromContext(ctx) + + logger.Infof("Resolving target: %v", action) if action.Target != nil { - return c.resolveObjectReference(namespace, action.Target) + return c.resolveObjectReference(ctx, namespace, action.Target) } if action.TargetURI != nil { return *action.TargetURI, nil @@ -415,18 +326,21 @@ func (c *Controller) resolveActionTarget(namespace string, action v1alpha1.FlowA // resolveObjectReference fetches an object based on ObjectRefence. It assumes the // object has a status["domainInternal"] string in it and returns it. -func (c *Controller) resolveObjectReference(namespace string, ref *corev1.ObjectReference) (string, error) { +func (c *Controller) resolveObjectReference(ctx context.Context, namespace string, ref *corev1.ObjectReference) (string, error) { + logger := logging.FromContext(ctx) + resourceClient, err := CreateResourceInterface(c.restConfig, ref, namespace) if err != nil { - glog.Warningf("failed to create dynamic client resource: %v", err) + logger.Warnf("failed to create dynamic client resource: %v", zap.Error(err)) return "", err } obj, err := resourceClient.Get(ref.Name, metav1.GetOptions{}) if err != nil { - glog.Warningf("failed to get object: %v", err) + c.Logger.Warnf("failed to get object: %v", zap.Error(err)) return "", err } + status, ok := obj.Object["status"] if !ok { return "", fmt.Errorf("%q does not contain status", ref.Name) @@ -443,18 +357,20 @@ func (c *Controller) resolveObjectReference(namespace string, ref *corev1.Object return serviceNameStr, nil } -func (c *Controller) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { +func (c *Controller) reconcileChannel(ctx context.Context, flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { + logger := logging.FromContext(ctx) + channelName := flow.Name channel, err := c.channelsLister.Channels(flow.Namespace).Get(channelName) if errors.IsNotFound(err) { channel, err = c.createChannel(flow) if err != nil { - glog.Errorf("Failed to create channel %q : %v", channelName, err) + logger.Errorf("Failed to create channel %q : %v", channelName, zap.Error(err)) return nil, err } } else if err != nil { - glog.Errorf("Failed to reconcile channel %q failed to get channels : %v", channelName, err) + logger.Errorf("Failed to reconcile channel %q failed to get channels : %v", channelName, zap.Error(err)) return nil, err } @@ -464,20 +380,8 @@ func (c *Controller) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Ch } func (c *Controller) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { - channelName := flow.Name - channel := &channelsv1alpha1.Channel{ - ObjectMeta: metav1.ObjectMeta{ - Name: channelName, - Namespace: flow.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *c.NewControllerRef(flow), - }, - }, - Spec: channelsv1alpha1.ChannelSpec{ - ClusterBus: defaultBusName, - }, - } - return c.clientset.ChannelsV1alpha1().Channels(flow.Namespace).Create(channel) + channel := resources.MakeChannel(defaultBusName, flow) + return c.EventingClientSet.ChannelsV1alpha1().Channels(flow.Namespace).Create(channel) } func (c *Controller) reconcileSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { @@ -486,11 +390,11 @@ func (c *Controller) reconcileSubscription(channelName string, target string, fl if errors.IsNotFound(err) { subscription, err = c.createSubscription(channelName, target, flow) if err != nil { - glog.Errorf("Failed to create subscription %q : %v", subscriptionName, err) + c.Logger.Errorf("Failed to create subscription %q : %v", subscriptionName, zap.Error(err)) return nil, err } } else if err != nil { - glog.Errorf("Failed to reconcile subscription %q failed to get subscriptions : %v", subscriptionName, err) + c.Logger.Errorf("Failed to reconcile subscription %q failed to get subscriptions : %v", subscriptionName, zap.Error(err)) return nil, err } @@ -500,34 +404,23 @@ func (c *Controller) reconcileSubscription(channelName string, target string, fl } func (c *Controller) createSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { - subscriptionName := flow.Name - subscription := &channelsv1alpha1.Subscription{ - ObjectMeta: metav1.ObjectMeta{ - Name: subscriptionName, - Namespace: flow.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *c.NewControllerRef(flow), - }, - }, - Spec: channelsv1alpha1.SubscriptionSpec{ - Channel: channelName, - Subscriber: target, - }, - } - return c.clientset.ChannelsV1alpha1().Subscriptions(flow.Namespace).Create(subscription) + subscription := resources.MakeSubscription(channelName, target, flow) + return c.EventingClientSet.ChannelsV1alpha1().Subscriptions(flow.Namespace).Create(subscription) } -func (c *Controller) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { +func (c *Controller) reconcileFeed(ctx context.Context, channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { + logger := logging.FromContext(ctx) + feedName := flow.Name feed, err := c.feedsLister.Feeds(flow.Namespace).Get(feedName) if errors.IsNotFound(err) { feed, err = c.createFeed(channelDNS, flow) if err != nil { - glog.Errorf("Failed to create feed %q : %v", feedName, err) + logger.Errorf("Failed to create feed %q : %v", feedName, zap.Error(err)) return nil, err } } else if err != nil { - glog.Errorf("Failed to reconcile feed %q failed to get feeds : %v", feedName, err) + logger.Errorf("Failed to reconcile feed %q failed to get feeds : %v", feedName, zap.Error(err)) return nil, err } @@ -538,43 +431,6 @@ func (c *Controller) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*fee } func (c *Controller) createFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { - feedName := flow.Name - feed := &feedsv1alpha1.Feed{ - ObjectMeta: metav1.ObjectMeta{ - Name: feedName, - Namespace: flow.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *c.NewControllerRef(flow), - }, - }, - Spec: feedsv1alpha1.FeedSpec{ - Action: feedsv1alpha1.FeedAction{DNSName: channelDNS}, - Trigger: feedsv1alpha1.EventTrigger{ - EventType: flow.Spec.Trigger.EventType, - Resource: flow.Spec.Trigger.Resource, - Service: flow.Spec.Trigger.Service, - }, - }, - } - if flow.Spec.ServiceAccountName != "" { - feed.Spec.ServiceAccountName = flow.Spec.ServiceAccountName - } - - if flow.Spec.Trigger.Parameters != nil { - feed.Spec.Trigger.Parameters = flow.Spec.Trigger.Parameters - } - if flow.Spec.Trigger.ParametersFrom != nil { - feed.Spec.Trigger.ParametersFrom = flow.Spec.Trigger.ParametersFrom - } - - return c.clientset.FeedsV1alpha1().Feeds(flow.Namespace).Create(feed) -} - -func (c *Controller) NewControllerRef(flow *v1alpha1.Flow) *metav1.OwnerReference { - blockOwnerDeletion := false - isController := false - revRef := metav1.NewControllerRef(flow, flowControllerKind) - revRef.BlockOwnerDeletion = &blockOwnerDeletion - revRef.Controller = &isController - return revRef + feed := resources.MakeFeed(channelDNS, flow) + return c.EventingClientSet.FeedsV1alpha1().Feeds(flow.Namespace).Create(feed) } diff --git a/pkg/controller/flow/resources/channel.go b/pkg/controller/flow/resources/channel.go new file mode 100644 index 00000000000..7a6479df378 --- /dev/null +++ b/pkg/controller/flow/resources/channel.go @@ -0,0 +1,42 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeChannel(defaultBusName string, flow *v1alpha1.Flow) *channelsv1alpha1.Channel { + channelName := flow.Name + channel := &channelsv1alpha1.Channel{ + ObjectMeta: metav1.ObjectMeta{ + Name: channelName, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(flow), + }, + }, + Spec: channelsv1alpha1.ChannelSpec{ + ClusterBus: defaultBusName, + }, + } + return channel +} diff --git a/pkg/controller/flow/resources/feed.go b/pkg/controller/flow/resources/feed.go new file mode 100644 index 00000000000..7be3091e301 --- /dev/null +++ b/pkg/controller/flow/resources/feed.go @@ -0,0 +1,64 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "encoding/base64" + "encoding/json" + + feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + "github.com/knative/eventing/pkg/sources" + + "fmt" + + "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeFeed(channelDNS string, flow *v1alpha1.Flow) *feedsv1alpha1.Feed { + feed := &feedsv1alpha1.Feed{ + ObjectMeta: metav1.ObjectMeta{ + Name: flow.Name, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(flow), + }, + }, + Spec: feedsv1alpha1.FeedSpec{ + Action: feedsv1alpha1.FeedAction{DNSName: channelDNS}, + Trigger: feedsv1alpha1.EventTrigger{ + EventType: flow.Spec.Trigger.EventType, + Resource: flow.Spec.Trigger.Resource, + Service: flow.Spec.Trigger.Service, + }, + }, + } + if flow.Spec.ServiceAccountName != "" { + feed.Spec.ServiceAccountName = flow.Spec.ServiceAccountName + } + + if flow.Spec.Trigger.Parameters != nil { + feed.Spec.Trigger.Parameters = flow.Spec.Trigger.Parameters + } + if flow.Spec.Trigger.ParametersFrom != nil { + feed.Spec.Trigger.ParametersFrom = flow.Spec.Trigger.ParametersFrom + } + return feed +} diff --git a/pkg/controller/flow/resources/subscription.go b/pkg/controller/flow/resources/subscription.go new file mode 100644 index 00000000000..9885f46f78b --- /dev/null +++ b/pkg/controller/flow/resources/subscription.go @@ -0,0 +1,43 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeSubscription(channelName string, target string, flow *v1alpha1.Flow) *channelsv1alpha1.Subscription { + subscriptionName := flow.Name + subscription := &channelsv1alpha1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subscriptionName, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(flow), + }, + }, + Spec: channelsv1alpha1.SubscriptionSpec{ + Channel: channelName, + Subscriber: target, + }, + } + return subscription +} diff --git a/pkg/controller/owner_references.go b/pkg/controller/owner_references.go new file mode 100644 index 00000000000..1093c73429f --- /dev/null +++ b/pkg/controller/owner_references.go @@ -0,0 +1,60 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + + channelsv1alpha "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + feedsv1alpha "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + flowsv1alpha "github.com/knative/eventing/pkg/apis/flows/v1alpha1" +) + +func kind(obj metav1.Object) schema.GroupVersionKind { + switch obj.(type) { + // Channels + case *channelsv1alpha.Bus: + return channelsv1alpha.SchemeGroupVersion.WithKind("Bus") + case *channelsv1alpha.Channel: + return channelsv1alpha.SchemeGroupVersion.WithKind("Channel") + case *channelsv1alpha.ClusterBus: + return channelsv1alpha.SchemeGroupVersion.WithKind("ClusterBus") + // Feeds + case *feedsv1alpha.ClusterEventType: + return channelsv1alpha.SchemeGroupVersion.WithKind("ClusterEventType") + case *feedsv1alpha.ClusterEventSource: + return channelsv1alpha.SchemeGroupVersion.WithKind("ClusterEventSource") + case *feedsv1alpha.EventType: + return channelsv1alpha.SchemeGroupVersion.WithKind("EventType") + case *feedsv1alpha.EventSource: + return channelsv1alpha.SchemeGroupVersion.WithKind("EventSource") + // Flows + case *flowsv1alpha.Flow: + return channelsv1alpha.SchemeGroupVersion.WithKind("Flow") + + default: + panic(fmt.Sprintf("Unsupported object type %T", obj)) + } +} + +// NewControllerRef creates an OwnerReference pointing to the given Resource. +func NewControllerRef(obj metav1.Object) *metav1.OwnerReference { + return metav1.NewControllerRef(obj, kind(obj)) +} diff --git a/vendor/github.com/knative/serving/pkg/configmap/default.go b/vendor/github.com/knative/serving/pkg/configmap/default.go new file mode 100644 index 00000000000..0f30cf6d928 --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/configmap/default.go @@ -0,0 +1,128 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "errors" + "sync" + + corev1 "k8s.io/api/core/v1" + informers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" +) + +// defaultImpl provides a default informer-based implementation of Watcher. +type defaultImpl struct { + sif informers.SharedInformerFactory + informer corev1informers.ConfigMapInformer + ns string + + // Guards mutations to defaultImpl fields + m sync.Mutex + + observers map[string][]Observer + started bool +} + +// Asserts that defaultImpl implements Watcher. +var _ Watcher = (*defaultImpl)(nil) + +// Watch implements Watcher +func (di *defaultImpl) Watch(name string, w Observer) { + di.m.Lock() + defer di.m.Unlock() + + if di.observers == nil { + di.observers = make(map[string][]Observer) + } + + wl, _ := di.observers[name] + di.observers[name] = append(wl, w) +} + +// Start implements Watcher +func (di *defaultImpl) Start(stopCh <-chan struct{}) error { + if err := di.registerCallbackAndStartInformer(stopCh); err != nil { + return err + } + + // Wait until it has been synced (WITHOUT holding the mutex, so callbacks happen) + if ok := cache.WaitForCacheSync(stopCh, di.informer.Informer().HasSynced); !ok { + return errors.New("Error waiting for ConfigMap informer to sync.") + } + + return di.checkObservedResourcesExist() +} + +func (di *defaultImpl) registerCallbackAndStartInformer(stopCh <-chan struct{}) error { + di.m.Lock() + defer di.m.Unlock() + if di.started { + return errors.New("Watcher already started!") + } + di.started = true + + di.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: di.addConfigMapEvent, + UpdateFunc: di.updateConfigMapEvent, + }) + + // Start the shared informer factory (non-blocking) + di.sif.Start(stopCh) + return nil +} + +func (di *defaultImpl) checkObservedResourcesExist() error { + di.m.Lock() + defer di.m.Unlock() + // Check that all objects with Observers exist in our informers. + for k := range di.observers { + _, err := di.informer.Lister().ConfigMaps(di.ns).Get(k) + if err != nil { + return err + } + } + return nil +} + +func (di *defaultImpl) addConfigMapEvent(obj interface{}) { + // If the ConfigMap update is outside of our namespace, then quickly disregard it. + configMap := obj.(*corev1.ConfigMap) + if configMap.Namespace != di.ns { + // Outside of our namespace. + // This shouldn't happen due to our filtered informer. + return + } + + // Within our namespace, take the lock and see if there are any registered observers. + di.m.Lock() + defer di.m.Unlock() + wl, ok := di.observers[configMap.Name] + if !ok { + return // No observers. + } + + // Iterate over the observers and invoke their callbacks. + for _, w := range wl { + w(configMap) + } +} + +func (di *defaultImpl) updateConfigMapEvent(old, new interface{}) { + di.addConfigMapEvent(new) +} diff --git a/vendor/github.com/knative/serving/pkg/configmap/doc.go b/vendor/github.com/knative/serving/pkg/configmap/doc.go new file mode 100644 index 00000000000..d861a3801f5 --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/configmap/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package configmap exists to facilitate consuming Kubernetes ConfigMap +// resources in various ways, including: +// - Watching them for changes over time, and +// - Loading them from a VolumeMount. +package configmap diff --git a/vendor/github.com/knative/serving/pkg/configmap/fixed.go b/vendor/github.com/knative/serving/pkg/configmap/fixed.go new file mode 100644 index 00000000000..d915e7ae084 --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/configmap/fixed.go @@ -0,0 +1,55 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "log" + + corev1 "k8s.io/api/core/v1" +) + +// fixedImpl provides a fixed informer-based implementation of Watcher. +type fixedImpl struct { + cfgs map[string]*corev1.ConfigMap +} + +// Asserts that fixedImpl implements Watcher. +var _ Watcher = (*fixedImpl)(nil) + +// Watch implements Watcher +func (di *fixedImpl) Watch(name string, w Observer) { + cm, ok := di.cfgs[name] + if ok { + w(cm) + } else { + log.Printf("Name %q is not found.", name) + } +} + +// Start implements Watcher +func (di *fixedImpl) Start(stopCh <-chan struct{}) error { + return nil +} + +// NewFixedWatcher returns an Watcher that exposes the fixed collection of ConfigMaps. +func NewFixedWatcher(cms ...*corev1.ConfigMap) Watcher { + cmm := make(map[string]*corev1.ConfigMap) + for _, cm := range cms { + cmm[cm.Name] = cm + } + return &fixedImpl{cfgs: cmm} +} diff --git a/vendor/github.com/knative/serving/pkg/configmap/load.go b/vendor/github.com/knative/serving/pkg/configmap/load.go new file mode 100644 index 00000000000..ee40fbf0cc0 --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/configmap/load.go @@ -0,0 +1,58 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "io/ioutil" + "os" + "path" + "path/filepath" +) + +// Load reads the "Data" of a ConfigMap from a particular VolumeMount. +func Load(p string) (map[string]string, error) { + data := make(map[string]string) + err := filepath.Walk(p, func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + for info.Mode()&os.ModeSymlink != 0 { + dirname := filepath.Dir(p) + p, err = os.Readlink(p) + if err != nil { + return err + } + if !filepath.IsAbs(p) { + p = path.Join(dirname, p) + } + info, err = os.Lstat(p) + if err != nil { + return err + } + } + if info.IsDir() { + return nil + } + b, err := ioutil.ReadFile(p) + if err != nil { + return err + } + data[info.Name()] = string(b) + return nil + }) + return data, err +} diff --git a/vendor/github.com/knative/serving/pkg/configmap/watcher.go b/vendor/github.com/knative/serving/pkg/configmap/watcher.go new file mode 100644 index 00000000000..ccb75f36ceb --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/configmap/watcher.go @@ -0,0 +1,54 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "time" + + corev1 "k8s.io/api/core/v1" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" +) + +// Observer is the signature of the callbacks that notify an observer of the latest +// state of a particular configuration. An observer should not modify the provided +// ConfigMap, and should `.DeepCopy()` it for persistence (or otherwise process its +// contents). +type Observer func(*corev1.ConfigMap) + +// Watcher defined the interface that a configmap implementation must implement. +type Watcher interface { + // Watch is called to register a callback to be notified when a named ConfigMap changes. + Watch(string, Observer) + + // Start is called to initiate the watches and provide a channel to signal when we should + // stop watching. When Start returns, all registered Observers will be called with the + // initial state of the ConfigMaps they are watching. + Start(<-chan struct{}) error +} + +// NewDefaultWatcher creates a new default configmap.Watcher instance. +func NewDefaultWatcher(kc kubernetes.Interface, ns string) Watcher { + sif := kubeinformers.NewFilteredSharedInformerFactory( + kc, 5*time.Minute, ns, nil) + + return &defaultImpl{ + sif: sif, + informer: sif.Core().V1().ConfigMaps(), + ns: ns, + } +} diff --git a/vendor/github.com/knative/serving/pkg/logging/config.go b/vendor/github.com/knative/serving/pkg/logging/config.go new file mode 100644 index 00000000000..a1086f537b9 --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/logging/config.go @@ -0,0 +1,133 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/knative/serving/pkg/logging/logkey" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" +) + +const ( + ConfigName = "config-logging" +) + +// NewLogger creates a logger with the supplied configuration. +// In addition to the logger, it returns AtomicLevel that can +// be used to change the logging level at runtime. +// If configuration is empty, a fallback configuration is used. +// If configuration cannot be used to instantiate a logger, +// the same fallback configuration is used. +func NewLogger(configJSON string, levelOverride string) (*zap.SugaredLogger, zap.AtomicLevel) { + logger, atomicLevel, err := newLoggerFromConfig(configJSON, levelOverride) + if err == nil { + return logger.Sugar(), atomicLevel + } + + loggingCfg := zap.NewProductionConfig() + if len(levelOverride) > 0 { + if level, err := levelFromString(levelOverride); err == nil { + loggingCfg.Level = zap.NewAtomicLevelAt(*level) + } + } + + logger, err2 := loggingCfg.Build() + if err2 != nil { + panic(err2) + } + + logger.Error("Failed to parse the logging config. Falling back to default logger.", + zap.Error(err), zap.String(logkey.JSONConfig, configJSON)) + return logger.Sugar(), loggingCfg.Level +} + +// NewLoggerFromConfig creates a logger using the provided Config +func NewLoggerFromConfig(config *Config, name string) (*zap.SugaredLogger, zap.AtomicLevel) { + logger, level := NewLogger(config.LoggingConfig, config.LoggingLevel[name].String()) + return logger.Named(name), level +} + +func newLoggerFromConfig(configJSON string, levelOverride string) (*zap.Logger, zap.AtomicLevel, error) { + if len(configJSON) == 0 { + return nil, zap.AtomicLevel{}, errors.New("empty logging configuration") + } + + var loggingCfg zap.Config + if err := json.Unmarshal([]byte(configJSON), &loggingCfg); err != nil { + return nil, zap.AtomicLevel{}, err + } + + if len(levelOverride) > 0 { + if level, err := levelFromString(levelOverride); err == nil { + loggingCfg.Level = zap.NewAtomicLevelAt(*level) + } + } + + logger, err := loggingCfg.Build() + if err != nil { + return nil, zap.AtomicLevel{}, err + } + + logger.Info("Successfully created the logger.", zap.String(logkey.JSONConfig, configJSON)) + logger.Sugar().Infof("Logging level set to %v", loggingCfg.Level) + return logger, loggingCfg.Level, nil +} + +// Config contains the configuration defined in the logging ConfigMap. +type Config struct { + LoggingConfig string + LoggingLevel map[string]zapcore.Level +} + +// NewConfigFromMap creates a LoggingConfig from the supplied map +func NewConfigFromMap(data map[string]string) (*Config, error) { + lc := &Config{} + if zlc, ok := data["zap-logger-config"]; ok { + lc.LoggingConfig = zlc + } + lc.LoggingLevel = make(map[string]zapcore.Level) + for _, component := range []string{"controller", "queueproxy", "webhook", "activator", "autoscaler"} { + if ll, ok := data["loglevel."+component]; ok { + if len(ll) > 0 { + level, err := levelFromString(ll) + if err != nil { + return nil, err + } + lc.LoggingLevel[component] = *level + } + } + } + return lc, nil +} + +// NewConfigFromConfigMap creates a LoggingConfig from the supplied ConfigMap +func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) { + return NewConfigFromMap(configMap.Data) +} + +func levelFromString(level string) (*zapcore.Level, error) { + var zapLevel zapcore.Level + if err := zapLevel.UnmarshalText([]byte(level)); err != nil { + return nil, fmt.Errorf("invalid logging level: %v", level) + } + return &zapLevel, nil +} diff --git a/vendor/github.com/knative/serving/pkg/logging/logger.go b/vendor/github.com/knative/serving/pkg/logging/logger.go new file mode 100644 index 00000000000..903dc5825ef --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/logging/logger.go @@ -0,0 +1,57 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "context" + + "go.uber.org/zap" +) + +type loggerKey struct{} + +// This logger is used when there is no logger attached to the context. +// Rather than returning nil and causing a panic, we will use the fallback +// logger. Fallback logger is tagged with logger=fallback to make sure +// that code that doesn't set the logger correctly can be caught at runtime. +var fallbackLogger *zap.SugaredLogger + +func init() { + if logger, err := zap.NewProduction(); err != nil { + // We failed to create a fallback logger. Our fallback + // unfortunately falls back to noop. + fallbackLogger = zap.NewNop().Sugar() + } else { + fallbackLogger = logger.Named("fallback").Sugar() + } +} + +// WithLogger returns a copy of parent context in which the +// value associated with logger key is the supplied logger. +func WithLogger(ctx context.Context, logger *zap.SugaredLogger) context.Context { + return context.WithValue(ctx, loggerKey{}, logger) +} + +// FromContext returns the logger stored in context. +// Returns nil if no logger is set in context, or if the stored value is +// not of correct type. +func FromContext(ctx context.Context) *zap.SugaredLogger { + if logger, ok := ctx.Value(loggerKey{}).(*zap.SugaredLogger); ok { + return logger + } + return fallbackLogger +} diff --git a/vendor/github.com/knative/serving/pkg/logging/logkey/constants.go b/vendor/github.com/knative/serving/pkg/logging/logkey/constants.go new file mode 100644 index 00000000000..664d4ae5a3a --- /dev/null +++ b/vendor/github.com/knative/serving/pkg/logging/logkey/constants.go @@ -0,0 +1,70 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logkey + +const ( + // ControllerType is the key used for controller type in structured logs + ControllerType = "knative.dev/controller" + + // Namespace is the key used for namespace in structured logs + Namespace = "knative.dev/namespace" + + // Service is the key used for service name in structured logs + Service = "knative.dev/service" + + // Configuration is the key used for configuration name in structured logs + Configuration = "knative.dev/configuration" + + // Revision is the key used for revision name in structured logs + Revision = "knative.dev/revision" + + // Route is the key used for route name in structured logs + Route = "knative.dev/route" + + // Build is the key used for build name in structured logs + Build = "knative.dev/build" + + // JSONConfig is the key used for JSON configurations (not to be confused by the Configuration object) + JSONConfig = "knative.dev/jsonconfig" + + // Kind is the key used to represent kind of an object in logs + Kind = "knative.dev/kind" + + // Name is the key used to represent name of an object in logs + Name = "knative.dev/name" + + // Operation is the key used to represent an operation in logs + Operation = "knative.dev/operation" + + // Resource is the key used to represent a resource in logs + Resource = "knative.dev/resource" + + // SubResource is a generic key used to represent a sub-resource in logs + SubResource = "knative.dev/subresource" + + // UserInfo is the key used to represent a user information in logs + UserInfo = "knative.dev/userinfo" + + // Pod is the key used to represent a pod's name in logs + Pod = "knative.dev/pod" + + // Deployment is the key used to represent a deployment's name in logs + Deployment = "knative.dev/deployment" + + // KubernetesService is the key used to represent a Kubernetes service name in logs + KubernetesService = "knative.dev/k8sservice" +)