From 79030d68cb887916b2c1760d86328c31667162ec Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Thu, 19 Jul 2018 18:37:18 -0400 Subject: [PATCH] WIP: add configmap for flow controller --- pkg/controller/flow/flow.go | 45 ++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/pkg/controller/flow/flow.go b/pkg/controller/flow/flow.go index 624b80e4589..ca850ca7e99 100644 --- a/pkg/controller/flow/flow.go +++ b/pkg/controller/flow/flow.go @@ -19,6 +19,7 @@ package flow import ( "fmt" "log" + "sync" "time" "github.com/golang/glog" @@ -43,6 +44,7 @@ import ( // TODO: Get rid of these, but needed as other controllers use them. servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" + servingconfigmaps "github.com/knative/serving/pkg/configmap" "github.com/knative/eventing/pkg/controller" @@ -60,8 +62,10 @@ import ( const controllerAgentName = "flow-controller" -// TODO: This should come from a configmap -const defaultBusName = "stub" +// defaultClusterBusConfigMapKey is the name of the key in this controller's +// ConfigMap that contains the name of the default cluster bus for the flow +// controller to use. +const defaultClusterBusConfigMapKey = "default-cluster-bus" // What field do we assume Object Reference exports as a resolvable target const targetFieldName = "domainInternal" @@ -109,6 +113,18 @@ type Controller struct { // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue workqueue.RateLimitingInterface + + // configMutex guards the controller state that comes from the controller's configmap. + configMutex sync.RWMutex + + // controllerConfigMapWatcher is used to watch the knative-system/flow-controller-config configMap. + controllerConfigMapWatcher servingconfigmaps.Watcher + + // defaultBusName is the default bus name to use to create channels; it is + // updated if the knative-system/flow-controller-config ConfigMap exists and + // contains the 'default-cluster-bus-name' key. + defaultClusterBusName string + // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder @@ -170,6 +186,9 @@ func NewController( }, }) + // TODO: const for knative-system + controller.controllerConfigMapWatcher = servingconfigmaps.NewDefaultWatcher(kubeclientset, "knative-system") + return controller } @@ -184,6 +203,9 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { // Start the informer factories to begin populating the informer caches glog.Info("Starting Flow controller") + glog.Info("Watching controller config") + c.controllerConfigMapWatcher.Watch("flow-controller-config", c.receiveControllerConfig) + // Wait for the caches to be synced before starting workers glog.Info("Waiting for Flow informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.flowsSynced); !ok { @@ -213,6 +235,23 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { return nil } +func (c *Controller) receiveControllerConfig(configMap *corev1.ConfigMap) { + c.configMutex.Lock() + defer c.configMutex.Unlock() + + if value, ok := configMap.Data[defaultClusterBusConfigMapKey]; ok { + c.defaultClusterBusName = value + } else { + c.defaultClusterBusName = "stub" + } +} + +func (c *Controller) getDefaultClusterBusName() string { + c.configMutex.RLock() + defer c.configMutex.RUnlock() + return c.defaultClusterBusName +} + // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. @@ -474,7 +513,7 @@ func (c *Controller) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Chann }, }, Spec: channelsv1alpha1.ChannelSpec{ - ClusterBus: defaultBusName, + ClusterBus: c.getDefaultClusterBusName(), }, } return c.clientset.ChannelsV1alpha1().Channels(flow.Namespace).Create(channel)