From 1192e0feaeb7030c12d8749d8c565a76c0b09d19 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 25 Jul 2018 18:22:04 +0000 Subject: [PATCH 01/11] first pass, convert to provider/reconcile model --- cmd/controller-manager/main.go | 8 +- cmd/controller/main.go | 4 +- pkg/apis/flows/v1alpha1/flow_types.go | 13 + pkg/controller/flow/provider.go | 70 +++++ pkg/controller/flow/{flow.go => reconcile.go} | 296 ++++++------------ pkg/controller/flow/reconcile_test.go | 91 ++++++ 6 files changed, 280 insertions(+), 202 deletions(-) create mode 100644 pkg/controller/flow/provider.go rename pkg/controller/flow/{flow.go => reconcile.go} (60%) create mode 100644 pkg/controller/flow/reconcile_test.go diff --git a/cmd/controller-manager/main.go b/cmd/controller-manager/main.go index fb88adffb1c..beab6651289 100644 --- a/cmd/controller-manager/main.go +++ b/cmd/controller-manager/main.go @@ -22,7 +22,9 @@ import ( buildv1alpha1 "github.com/knative/build/pkg/apis/build/v1alpha1" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + flowsv1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" "github.com/knative/eventing/pkg/controller/feed" + "github.com/knative/eventing/pkg/controller/flow" istiov1alpha3 "github.com/knative/serving/pkg/apis/istio/v1alpha3" servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1" "k8s.io/apimachinery/pkg/runtime" @@ -53,10 +55,11 @@ func main() { // Add custom types to this array to get them into the manager's scheme. schemeFuncs := []SchemeFunc{ buildv1alpha1.AddToScheme, - servingv1alpha1.AddToScheme, - feedsv1alpha1.AddToScheme, channelsv1alpha1.AddToScheme, + feedsv1alpha1.AddToScheme, + flowsv1alpha1.AddToScheme, istiov1alpha3.AddToScheme, + servingv1alpha1.AddToScheme, } for _, schemeFunc := range schemeFuncs { schemeFunc(mrg.GetScheme()) @@ -66,6 +69,7 @@ func main() { // manager run it. providers := []ProvideFunc{ feed.ProvideController, + flow.ProvideController, } for _, provider := range providers { diff --git a/cmd/controller/main.go b/cmd/controller/main.go index ab1e6fa215f..7ca6591b23d 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -39,7 +39,6 @@ import ( "github.com/knative/eventing/pkg/controller/bus" "github.com/knative/eventing/pkg/controller/channel" "github.com/knative/eventing/pkg/controller/clusterbus" - "github.com/knative/eventing/pkg/controller/flow" "github.com/knative/eventing/pkg/signals" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -82,6 +81,8 @@ func main() { glog.Fatalf("Error building serving clientset: %s", err.Error()) } + // TODO: Rip this out from all the controllers since we can get it + // from provider. // Build a rest.Config from configuration injected into the Pod by // Kubernetes. Clients will use the Pod's ServiceAccount principal. restConfig, err := rest.InClusterConfig() @@ -95,7 +96,6 @@ func main() { // Add new controllers here. ctors := []controller.Constructor{ - flow.NewController, bus.NewController, clusterbus.NewController, channel.NewController, diff --git a/pkg/apis/flows/v1alpha1/flow_types.go b/pkg/apis/flows/v1alpha1/flow_types.go index 42060d41d52..2e1e8a93462 100644 --- a/pkg/apis/flows/v1alpha1/flow_types.go +++ b/pkg/apis/flows/v1alpha1/flow_types.go @@ -278,6 +278,19 @@ func (fs *FlowStatus) PropagateActionTargetResolved(status corev1.ConditionStatu fs.checkAndMarkReady() } +func (fs *FlowStatus) InitializeConditions() { + for _, cond := range []FlowConditionType{ + FlowConditionReady, + } { + if fc := fs.GetCondition(cond); fc == nil { + fs.setCondition(&FlowCondition{ + Type: cond, + Status: corev1.ConditionUnknown, + }) + } + } +} + func (fs *FlowStatus) PropagateChannelStatus(cs channelsv1alpha1.ChannelStatus) { cc := cs.GetCondition(channelsv1alpha1.ChannelReady) diff --git a/pkg/controller/flow/provider.go b/pkg/controller/flow/provider.go new file mode 100644 index 00000000000..363d751cf03 --- /dev/null +++ b/pkg/controller/flow/provider.go @@ -0,0 +1,70 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flow + +import ( + "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const controllerAgentName = "flow-controller" + +type reconciler struct { + client client.Client + restConfig *rest.Config + recorder record.EventRecorder +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +// ProvideController returns a flow controller. +func ProvideController(mrg manager.Manager) (controller.Controller, error) { + // Setup a new controller to Reconcile Flows. + c, err := controller.New(controllerAgentName, mrg, controller.Options{ + Reconciler: &reconciler{ + recorder: mrg.GetRecorder(controllerAgentName), + }, + }) + if err != nil { + return nil, err + } + + // Watch Flow events and enqueue Flow object key. + if err := c.Watch(&source.Kind{Type: &v1alpha1.Flow{}}, &handler.EnqueueRequestForObject{}); err != nil { + return nil, err + } + + return c, nil +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + r.restConfig = c + return nil +} diff --git a/pkg/controller/flow/flow.go b/pkg/controller/flow/reconcile.go similarity index 60% rename from pkg/controller/flow/flow.go rename to pkg/controller/flow/reconcile.go index a2138a07d27..e22c04777d2 100644 --- a/pkg/controller/flow/flow.go +++ b/pkg/controller/flow/reconcile.go @@ -17,9 +17,10 @@ limitations under the License. package flow import ( + "context" "fmt" "log" - "time" + // "time" "github.com/golang/glog" corev1 "k8s.io/api/core/v1" @@ -27,39 +28,42 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtimetypes "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - kubeinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" + // runtimetypes "k8s.io/apimachinery/pkg/runtime" + // "k8s.io/apimachinery/pkg/util/runtime" + // "k8s.io/apimachinery/pkg/util/sets" + // "k8s.io/apimachinery/pkg/util/wait" + // kubeinformers "k8s.io/client-go/informers" + // "k8s.io/client-go/kubernetes" + // "k8s.io/client-go/kubernetes/scheme" + // typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + // "k8s.io/client-go/rest" + // "k8s.io/client-go/tools/cache" + // "k8s.io/client-go/tools/record" + // "k8s.io/client-go/util/workqueue" // TODO: Get rid of these, but needed as other controllers use them. - servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" - servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" + // servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" + // servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" - "github.com/knative/eventing/pkg/controller" + // "github.com/knative/eventing/pkg/controller" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" - clientset "github.com/knative/eventing/pkg/client/clientset/versioned" - flowscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" - informers "github.com/knative/eventing/pkg/client/informers/externalversions" - channelListers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" - feedListers "github.com/knative/eventing/pkg/client/listers/feeds/v1alpha1" - listers "github.com/knative/eventing/pkg/client/listers/flows/v1alpha1" + /* + clientset "github.com/knative/eventing/pkg/client/clientset/versioned" + flowscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + channelListers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + feedListers "github.com/knative/eventing/pkg/client/listers/feeds/v1alpha1" + listers "github.com/knative/eventing/pkg/client/listers/flows/v1alpha1" + */ + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -const controllerAgentName = "flow-controller" - // TODO: This should come from a configmap const defaultBusName = "stub" @@ -80,6 +84,7 @@ var ( ) // Controller is the controller implementation for Flow resources +/* type Controller struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface @@ -113,8 +118,10 @@ type Controller struct { // Kubernetes API. recorder record.EventRecorder } +*/ // NewController returns a new flow controller +/* func NewController( kubeclientset kubernetes.Interface, clientset clientset.Interface, @@ -172,159 +179,45 @@ func NewController( return controller } - -// Run will set up the event handlers for types we are interested in, as well -// as syncing informer caches and starting workers. It will block until stopCh -// is closed, at which point it will shutdown the workqueue and wait for -// workers to finish processing their current work items. -func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer runtime.HandleCrash() - defer c.workqueue.ShutDown() - - // Start the informer factories to begin populating the informer caches - glog.Info("Starting Flow controller") - - // Wait for the caches to be synced before starting workers - glog.Info("Waiting for Flow informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.flowsSynced); !ok { - return fmt.Errorf("failed to wait for Flow caches to sync") - } - - glog.Info("Waiting for Feed informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.feedsSynced); !ok { - return fmt.Errorf("failed to wait for Feed caches to sync") - } - - glog.Info("Waiting for channel informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.channelsSynced); !ok { - return fmt.Errorf("failed to wait for Channel caches to sync") - } - - glog.Info("Starting workers") - // Launch two workers to process Flow resources - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - glog.Info("Started workers") - <-stopCh - glog.Info("Shutting down workers") - - return nil -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem will read a single work item off the workqueue and -// attempt to process it, by calling Reconcile. -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - // We wrap this block in a func so we can defer c.workqueue.Done. - if err := func(obj interface{}) error { - // We call Done here so the workqueue knows we have finished - // processing this item. We also must remember to call Forget if we - // do not want this work item being re-queued. For example, we do - // not call Forget if a transient error occurs, instead the item is - // put back on the workqueue and attempted again after a back-off - // period. - defer c.workqueue.Done(obj) - // We expect strings to come off the workqueue. These are of the - // form namespace/name. We do this as the delayed nature of the - // workqueue means the items in the informer cache may actually be - // more up to date that when the item was initially put onto the - // workqueue. - key, ok := obj.(string) - if !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - // Run the Reconcile, passing it the namespace/name string of the - // Flow resource to be synced. - if err := c.Reconcile(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) - } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - glog.Infof("Successfully synced '%s'", key) - return nil - }(obj); err != nil { - runtime.HandleError(err) - } - - return true -} - -// enqueueFlow takes a Flow resource and converts it into a namespace/name -// string which is then put onto the work queue. This method should *not* be -// passed resources of any type other than Flow. -func (c *Controller) enqueueFlow(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) -} +*/ // Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Flow resource // with the current status of the resource. -func (c *Controller) Reconcile(key string) error { - // Convert the namespace/name string into a distinct namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + flow := &v1alpha1.Flow{} + err := r.client.Get(context.TODO(), request.NamespacedName, flow) + + if errors.IsNotFound(err) { + glog.Errorf("could not find flow %v\n", request) + return reconcile.Result{}, nil } - // Get the Flow resource with this namespace/name - original, err := c.flowsLister.Flows(namespace).Get(name) if err != nil { - // The Flow resource may no longer exist, in which case we stop - // processing. - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("flow '%s' in work queue no longer exists", key)) - return nil - } - return err + glog.Errorf("could not fetch Flow %v for %+v\n", err, request) + return reconcile.Result{}, err } - // Don't mutate the informer's copy of our object. - flow := original.DeepCopy() + original := flow.DeepCopy() + + flow.Status.InitializeConditions() // Reconcile this copy of the Flow and then write back any status // updates regardless of whether the reconcile error out. - err = c.reconcile(flow) + err = r.reconcile(flow) if equality.Semantic.DeepEqual(original.Status, flow.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. - } else if _, err := c.updateStatus(flow); err != nil { + } else if _, err := r.updateStatus(flow); err != nil { glog.Warningf("Failed to update flow status: %v", err) - return err + return reconcile.Result{}, err } - return err + return reconcile.Result{}, err } -func (c *Controller) reconcile(flow *v1alpha1.Flow) error { +func (r *reconciler) reconcile(flow *v1alpha1.Flow) error { // See if the flow has been deleted accessor, err := meta.Accessor(flow) if err != nil { @@ -333,7 +226,7 @@ func (c *Controller) reconcile(flow *v1alpha1.Flow) error { deletionTimestamp := accessor.GetDeletionTimestamp() glog.Infof("DeletionTimestamp: %v", deletionTimestamp) - target, err := c.resolveActionTarget(flow.Namespace, flow.Spec.Action) + target, err := r.resolveActionTarget(flow.Namespace, flow.Spec.Action) if err != nil { glog.Warningf("Failed to resolve target %v : %v", flow.Spec.Action, err) flow.Status.PropagateActionTargetResolved(corev1.ConditionFalse, "ActionTargetNotResolved", err.Error()) @@ -347,14 +240,14 @@ func (c *Controller) reconcile(flow *v1alpha1.Flow) error { // Reconcile the Channel. Creates a channel that is the target that the Feed will use. // TODO: We should reuse channels possibly. - channel, err := c.reconcileChannel(flow) + channel, err := r.reconcileChannel(flow) if err != nil { glog.Warningf("Failed to reconcile channel : %v", err) return err } flow.Status.PropagateChannelStatus(channel.Status) - subscription, err := c.reconcileSubscription(channel.Name, target, flow) + subscription, err := r.reconcileSubscription(channel.Name, target, flow) if err != nil { glog.Warningf("Failed to reconcile subscription : %v", err) return err @@ -364,7 +257,7 @@ func (c *Controller) reconcile(flow *v1alpha1.Flow) error { channelDNS := channel.Status.DomainInternal if channelDNS != "" { glog.Infof("Reconciling feed for flow %q targeting %q", flow.Name, channelDNS) - feed, err := c.reconcileFeed(channelDNS, flow) + feed, err := r.reconcileFeed(channelDNS, flow) if err != nil { glog.Warningf("Failed to reconcile feed: %v", err) } @@ -373,41 +266,34 @@ func (c *Controller) reconcile(flow *v1alpha1.Flow) error { return nil } -func (c *Controller) updateStatus(u *v1alpha1.Flow) (*v1alpha1.Flow, error) { - flowClient := c.clientset.FlowsV1alpha1().Flows(u.Namespace) - newu, err := flowClient.Get(u.Name, metav1.GetOptions{}) +func (r *reconciler) updateStatus(flow *v1alpha1.Flow) (*v1alpha1.Flow, error) { + newFlow := &v1alpha1.Flow{} + err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: flow.Namespace, Name: flow.Name}, newFlow) + if err != nil { return nil, err } - newu.Status = u.Status + newFlow.Status = flow.Status // Until #38113 is merged, we must use Update instead of UpdateStatus to // update the Status block of the Flow resource. UpdateStatus will not // allow changes to the Spec of the resource, which is ideal for ensuring // nothing other than resource status has been updated. - return flowClient.Update(newu) -} - -// AddFinalizer adds value to the list of finalizers on obj -func AddFinalizer(obj runtimetypes.Object, value string) error { - accessor, err := meta.Accessor(obj) + err = r.client.Update(context.TODO(), newFlow) if err != nil { - return err + return nil, err } - finalizers := sets.NewString(accessor.GetFinalizers()...) - finalizers.Insert(value) - accessor.SetFinalizers(finalizers.List()) - return nil + return newFlow, nil } // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Flow resource // with the current status of the resource. -func (c *Controller) resolveActionTarget(namespace string, action v1alpha1.FlowAction) (string, error) { +func (r *reconciler) resolveActionTarget(namespace string, action v1alpha1.FlowAction) (string, error) { glog.Infof("Resolving target: %v", action) if action.Target != nil { - return c.resolveObjectReference(namespace, action.Target) + return r.resolveObjectReference(namespace, action.Target) } if action.TargetURI != nil { return *action.TargetURI, nil @@ -418,8 +304,8 @@ func (c *Controller) resolveActionTarget(namespace string, action v1alpha1.FlowA // resolveObjectReference fetches an object based on ObjectRefence. It assumes the // object has a status["domainInternal"] string in it and returns it. -func (c *Controller) resolveObjectReference(namespace string, ref *corev1.ObjectReference) (string, error) { - resourceClient, err := CreateResourceInterface(c.restConfig, ref, namespace) +func (r *reconciler) resolveObjectReference(namespace string, ref *corev1.ObjectReference) (string, error) { + resourceClient, err := CreateResourceInterface(r.restConfig, ref, namespace) if err != nil { glog.Warningf("failed to create dynamic client resource: %v", err) return "", err @@ -446,12 +332,13 @@ func (c *Controller) resolveObjectReference(namespace string, ref *corev1.Object return serviceNameStr, nil } -func (c *Controller) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { +func (r *reconciler) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { channelName := flow.Name - channel, err := c.channelsLister.Channels(flow.Namespace).Get(channelName) + channel := &channelsv1alpha1.Channel{} + err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: flow.Namespace, Name: channelName}, channel) if errors.IsNotFound(err) { - channel, err = c.createChannel(flow) + channel, err = r.createChannel(flow) if err != nil { glog.Errorf("Failed to create channel %q : %v", channelName, err) return nil, err @@ -466,28 +353,33 @@ func (c *Controller) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Ch return channel, err } -func (c *Controller) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { +func (r *reconciler) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { channelName := flow.Name channel := &channelsv1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Name: channelName, Namespace: flow.Namespace, OwnerReferences: []metav1.OwnerReference{ - *c.NewControllerRef(flow), + *r.NewControllerRef(flow), }, }, Spec: channelsv1alpha1.ChannelSpec{ ClusterBus: defaultBusName, }, } - return c.clientset.ChannelsV1alpha1().Channels(flow.Namespace).Create(channel) + if err := r.client.Create(context.TODO(), channel); err != nil { + return nil, err + } + return channel, nil } -func (c *Controller) reconcileSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { +func (r *reconciler) reconcileSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { subscriptionName := flow.Name - subscription, err := c.subscriptionsLister.Subscriptions(flow.Namespace).Get(subscriptionName) + + subscription := &channelsv1alpha1.Subscription{} + err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: flow.Namespace, Name: subscriptionName}, subscription) if errors.IsNotFound(err) { - subscription, err = c.createSubscription(channelName, target, flow) + subscription, err = r.createSubscription(channelName, target, flow) if err != nil { glog.Errorf("Failed to create subscription %q : %v", subscriptionName, err) return nil, err @@ -502,14 +394,14 @@ func (c *Controller) reconcileSubscription(channelName string, target string, fl return subscription, err } -func (c *Controller) createSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { +func (r *reconciler) createSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { subscriptionName := flow.Name subscription := &channelsv1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Name: subscriptionName, Namespace: flow.Namespace, OwnerReferences: []metav1.OwnerReference{ - *c.NewControllerRef(flow), + *r.NewControllerRef(flow), }, }, Spec: channelsv1alpha1.SubscriptionSpec{ @@ -517,14 +409,19 @@ func (c *Controller) createSubscription(channelName string, target string, flow Subscriber: target, }, } - return c.clientset.ChannelsV1alpha1().Subscriptions(flow.Namespace).Create(subscription) + if err := r.client.Create(context.TODO(), subscription); err != nil { + return nil, err + } + return subscription, nil } -func (c *Controller) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { +func (r *reconciler) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { feedName := flow.Name - feed, err := c.feedsLister.Feeds(flow.Namespace).Get(feedName) + + feed := &feedsv1alpha1.Feed{} + err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: flow.Namespace, Name: feedName}, feed) if errors.IsNotFound(err) { - feed, err = c.createFeed(channelDNS, flow) + feed, err = r.createFeed(channelDNS, flow) if err != nil { glog.Errorf("Failed to create feed %q : %v", feedName, err) return nil, err @@ -540,14 +437,14 @@ func (c *Controller) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*fee } -func (c *Controller) createFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { +func (r *reconciler) createFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { feedName := flow.Name feed := &feedsv1alpha1.Feed{ ObjectMeta: metav1.ObjectMeta{ Name: feedName, Namespace: flow.Namespace, OwnerReferences: []metav1.OwnerReference{ - *c.NewControllerRef(flow), + *r.NewControllerRef(flow), }, }, Spec: feedsv1alpha1.FeedSpec{ @@ -570,10 +467,13 @@ func (c *Controller) createFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv feed.Spec.Trigger.ParametersFrom = flow.Spec.Trigger.ParametersFrom } - return c.clientset.FeedsV1alpha1().Feeds(flow.Namespace).Create(feed) + if err := r.client.Create(context.TODO(), feed); err != nil { + return nil, err + } + return feed, nil } -func (c *Controller) NewControllerRef(flow *v1alpha1.Flow) *metav1.OwnerReference { +func (r *reconciler) NewControllerRef(flow *v1alpha1.Flow) *metav1.OwnerReference { blockOwnerDeletion := false isController := false revRef := metav1.NewControllerRef(flow, flowControllerKind) diff --git a/pkg/controller/flow/reconcile_test.go b/pkg/controller/flow/reconcile_test.go new file mode 100644 index 00000000000..d6e2d3fba73 --- /dev/null +++ b/pkg/controller/flow/reconcile_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Veroute.on 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flow + +import ( + // "encoding/base64" + // "encoding/json" + // "fmt" + "testing" + + // channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + controllertesting "github.com/knative/eventing/pkg/controller/testing" + servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + /* + "github.com/knative/eventing/pkg/controller/feed/resources" + "github.com/knative/eventing/pkg/sources" + batchv1 "k8s.io/api/batch/v1" + */) + +/* +TODO +- initial: feed with job deadline exceeded + reconciled: feed failure, job exists, finalizer +*/ + +var ( + trueVal = true + falseVal = false + // deletionTime is used when objects are marked as deleted. Rfc3339Copy() + // truncates to seconds to match the loss of precision during serialization. + deletionTime = metav1.Now().Rfc3339Copy() +) + +const ( + targetDNS = "myservice.mynamespace.svc.cluster.local" +) + +func init() { + // Add types to scheme + feedsv1alpha1.AddToScheme(scheme.Scheme) + servingv1alpha1.AddToScheme(scheme.Scheme) +} + +var testCases = []controllertesting.TestCase{ + { + Name: "new feed: adds status, finalizer, creates job", + InitialState: []runtime.Object{ + // getEventSource(), + // getEventType(), + // getNewFeed(), + }, + ReconcileKey: "test/test-feed", + WantPresent: []runtime.Object{ + // getStartInProgressFeed(), + // getNewStartJob(), + //TODO job created event + }, + }, +} + +func TestAllCases(t *testing.T) { + recorder := record.NewBroadcaster().NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + for _, tc := range testCases { + r := &reconciler{ + client: tc.GetClient(), + recorder: recorder, + } + t.Run(tc.Name, tc.Runner(t, r, r.client)) + } +} From 188e6e5a99f184a7a19a6c2be1555a0d0698a655 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 25 Jul 2018 19:36:12 +0000 Subject: [PATCH 02/11] remove old cruft, dump feed on failure --- pkg/controller/flow/reconcile.go | 135 +------------------------------ test/e2e-tests.sh | 1 + 2 files changed, 5 insertions(+), 131 deletions(-) diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index e22c04777d2..a22148b8fa4 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -20,46 +20,16 @@ import ( "context" "fmt" "log" - // "time" "github.com/golang/glog" + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - // runtimetypes "k8s.io/apimachinery/pkg/runtime" - // "k8s.io/apimachinery/pkg/util/runtime" - // "k8s.io/apimachinery/pkg/util/sets" - // "k8s.io/apimachinery/pkg/util/wait" - // kubeinformers "k8s.io/client-go/informers" - // "k8s.io/client-go/kubernetes" - // "k8s.io/client-go/kubernetes/scheme" - // typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - // "k8s.io/client-go/rest" - // "k8s.io/client-go/tools/cache" - // "k8s.io/client-go/tools/record" - // "k8s.io/client-go/util/workqueue" - - // TODO: Get rid of these, but needed as other controllers use them. - // servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" - // servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" - - // "github.com/knative/eventing/pkg/controller" - - channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" - feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" - v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" - - /* - clientset "github.com/knative/eventing/pkg/client/clientset/versioned" - flowscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" - informers "github.com/knative/eventing/pkg/client/informers/externalversions" - channelListers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" - feedListers "github.com/knative/eventing/pkg/client/listers/feeds/v1alpha1" - listers "github.com/knative/eventing/pkg/client/listers/flows/v1alpha1" - */ - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -83,104 +53,6 @@ var ( flowControllerKind = v1alpha1.SchemeGroupVersion.WithKind("Flow") ) -// Controller is the controller implementation for Flow resources -/* -type Controller struct { - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - - // restConfig is used to create dynamic clients for - // resolving ObjectReference targets. - restConfig *rest.Config - - // clientset is a clientset for our own API group - clientset clientset.Interface - - flowsLister listers.FlowLister - flowsSynced cache.InformerSynced - - feedsLister feedListers.FeedLister - feedsSynced cache.InformerSynced - - channelsLister channelListers.ChannelLister - channelsSynced cache.InformerSynced - - subscriptionsLister channelListers.SubscriptionLister - subscriptionsSynced cache.InformerSynced - - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder -} -*/ - -// NewController returns a new flow controller -/* -func NewController( - kubeclientset kubernetes.Interface, - clientset clientset.Interface, - servingclientset servingclientset.Interface, - restConfig *rest.Config, - kubeInformerFactory kubeinformers.SharedInformerFactory, - flowsInformerFactory informers.SharedInformerFactory, - routeInformerFactory servinginformers.SharedInformerFactory) controller.Interface { - - // obtain a reference to a shared index informer for the Flow types. - flowInformer := flowsInformerFactory.Flows().V1alpha1() - - // obtain a reference to a shared index informer for the Feed types. - feedInformer := flowsInformerFactory.Feeds().V1alpha1() - - channelInformer := flowsInformerFactory.Channels().V1alpha1().Channels() - - subscriptionInformer := flowsInformerFactory.Channels().V1alpha1().Subscriptions() - - // Create event broadcaster - // Add flow-controller types to the default Kubernetes Scheme so Events can be - // logged for flow-controller types. - flowscheme.AddToScheme(scheme.Scheme) - glog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - - controller := &Controller{ - kubeclientset: kubeclientset, - restConfig: restConfig, - clientset: clientset, - flowsLister: flowInformer.Flows().Lister(), - flowsSynced: flowInformer.Flows().Informer().HasSynced, - feedsLister: feedInformer.Feeds().Lister(), - feedsSynced: feedInformer.Feeds().Informer().HasSynced, - channelsLister: channelInformer.Lister(), - channelsSynced: channelInformer.Informer().HasSynced, - subscriptionsLister: subscriptionInformer.Lister(), - subscriptionsSynced: subscriptionInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Flows"), - recorder: recorder, - } - - glog.Info("Setting up event handlers") - - // Set up an event handler for when Flow resources change - flowInformer.Flows().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueFlow, - UpdateFunc: func(old, new interface{}) { - controller.enqueueFlow(new) - }, - }) - - return controller -} -*/ - // Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Flow resource // with the current status of the resource. @@ -260,6 +132,7 @@ func (r *reconciler) reconcile(flow *v1alpha1.Flow) error { feed, err := r.reconcileFeed(channelDNS, flow) if err != nil { glog.Warningf("Failed to reconcile feed: %v", err) + return err } flow.Status.PropagateFeedStatus(feed.Status) } diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 15a19259fea..7d2b7c7254e 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -71,6 +71,7 @@ function wait_until_flow_ready() { echo -e "\n\nERROR: timeout waiting for flow $NAMESPACE/$NAME to be ready" kubectl get -n $NAMESPACE flows $NAME -oyaml kubectl get -n $NAMESPACE jobs $NAME-start -oyaml + kubectl get -n $NAMESPACE feeds $NAME -oyaml return 1 } From dd1ee0382af520dcc499dc9ab8e0113b0b8a4a60 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 25 Jul 2018 19:55:22 +0000 Subject: [PATCH 03/11] Add more unit tests --- pkg/apis/flows/v1alpha1/flow_types_test.go | 15 +++++++++++++++ pkg/controller/flow/reconcile.go | 3 +-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/pkg/apis/flows/v1alpha1/flow_types_test.go b/pkg/apis/flows/v1alpha1/flow_types_test.go index 1bdaaa031ff..a8551ba70c0 100644 --- a/pkg/apis/flows/v1alpha1/flow_types_test.go +++ b/pkg/apis/flows/v1alpha1/flow_types_test.go @@ -339,3 +339,18 @@ func TestFlowCondition_PropagateStatus(t *testing.T) { }) } } + +func TestFlowCondition_InitializeConditions(t *testing.T) { + flow := Flow{} + flow.Status.InitializeConditions() + cond := flow.Status.GetCondition(FlowConditionReady) + if cond == nil { + t.Fatalf("InitializeConditions didn't set FlowConditionReady") + } + if want, got := cond.Status, corev1.ConditionUnknown; want != got { + t.Fatalf("Status was not set correctly : \nwant:\t%#v\ngot:\t%#v", want, got) + } + if flow.Status.IsReady() { + t.Fatalf("InitializeConditions marked the flow Ready") + } +} diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index a22148b8fa4..9ee0598a30b 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -152,8 +152,7 @@ func (r *reconciler) updateStatus(flow *v1alpha1.Flow) (*v1alpha1.Flow, error) { // update the Status block of the Flow resource. UpdateStatus will not // allow changes to the Spec of the resource, which is ideal for ensuring // nothing other than resource status has been updated. - err = r.client.Update(context.TODO(), newFlow) - if err != nil { + if err = r.client.Update(context.TODO(), newFlow); err != nil { return nil, err } return newFlow, nil From f2e313276f657a35a4e3698f2517655f4729dbbe Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 25 Jul 2018 20:50:02 +0000 Subject: [PATCH 04/11] dump controller logs in failures --- pkg/controller/flow/reconcile_test.go | 57 ++++++++++++++++++++++++--- test/e2e-tests.sh | 4 ++ 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/pkg/controller/flow/reconcile_test.go b/pkg/controller/flow/reconcile_test.go index d6e2d3fba73..9b4fa02718b 100644 --- a/pkg/controller/flow/reconcile_test.go +++ b/pkg/controller/flow/reconcile_test.go @@ -19,11 +19,12 @@ package flow import ( // "encoding/base64" // "encoding/json" - // "fmt" + "fmt" "testing" - // channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + flowsv1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1" corev1 "k8s.io/api/core/v1" @@ -52,24 +53,28 @@ var ( ) const ( - targetDNS = "myservice.mynamespace.svc.cluster.local" + targetDNS = "myservice.mynamespace.svc.cluster.local" + eventType = "myeventtype" + eventSource = "myeventsource" ) func init() { // Add types to scheme feedsv1alpha1.AddToScheme(scheme.Scheme) + flowsv1alpha1.AddToScheme(scheme.Scheme) servingv1alpha1.AddToScheme(scheme.Scheme) + channelsv1alpha1.AddToScheme(scheme.Scheme) } var testCases = []controllertesting.TestCase{ { - Name: "new feed: adds status, finalizer, creates job", + Name: "new flow: adds status, finalizer, creates job", InitialState: []runtime.Object{ // getEventSource(), // getEventType(), - // getNewFeed(), + getNewFlow(), }, - ReconcileKey: "test/test-feed", + ReconcileKey: "test/test-flow", WantPresent: []runtime.Object{ // getStartInProgressFeed(), // getNewStartJob(), @@ -89,3 +94,43 @@ func TestAllCases(t *testing.T) { t.Run(tc.Name, tc.Runner(t, r, r.client)) } } + +func getNewFlow() *feedsv1alpha1.Feed { + return &feedsv1alpha1.Feed{ + TypeMeta: flowType(), + ObjectMeta: om("test", "test-feed"), + Spec: feedsv1alpha1.FeedSpec{ + Action: feedsv1alpha1.FeedAction{ + DNSName: targetDNS, + }, + Trigger: feedsv1alpha1.EventTrigger{ + EventType: eventType, + Resource: "", + Service: "", + Parameters: nil, + ParametersFrom: nil, + }, + }, + } +} + +func flowType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: flowsv1alpha1.SchemeGroupVersion.String(), + Kind: "Flow", + } +} + +func om(namespace, name string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name), + } +} + +func omDeleting(namespace, name string) metav1.ObjectMeta { + om := om(namespace, name) + om.DeletionTimestamp = &deletionTime + return om +} diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 7d2b7c7254e..957d3329015 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -72,6 +72,10 @@ function wait_until_flow_ready() { kubectl get -n $NAMESPACE flows $NAME -oyaml kubectl get -n $NAMESPACE jobs $NAME-start -oyaml kubectl get -n $NAMESPACE feeds $NAME -oyaml + echo -e "Dumping controller manager logs" + kubectl -n knative-eventing logs `kubectl -n knative-eventing get pods -oname | grep controller-manager` controller-manager + echo -e "Dumping controller logs" + kubectl -n knative-eventing logs `kubectl -n knative-eventing get pods -oname | grep eventing-controller` return 1 } From 321b43e10791b8a2b34ab299167ecf85a76ed1a8 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 25 Jul 2018 21:33:42 +0000 Subject: [PATCH 05/11] moar logs --- pkg/controller/flow/reconcile.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index 9ee0598a30b..f9aeabd9587 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -57,6 +57,7 @@ var ( // converge the two. It then updates the Status block of the Flow resource // with the current status of the resource. func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + glog.Infof("Reconciling flow %v", request) flow := &v1alpha1.Flow{} err := r.client.Get(context.TODO(), request.NamespacedName, flow) @@ -162,7 +163,7 @@ func (r *reconciler) updateStatus(flow *v1alpha1.Flow) (*v1alpha1.Flow, error) { // converge the two. It then updates the Status block of the Flow resource // with the current status of the resource. func (r *reconciler) resolveActionTarget(namespace string, action v1alpha1.FlowAction) (string, error) { - glog.Infof("Resolving target: %v", action) + glog.Infof("Resolving target: %+v", action) if action.Target != nil { return r.resolveObjectReference(namespace, action.Target) From a22bac62dd003ba74b6e2573eaa1c88d43a976dd Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 25 Jul 2018 22:26:30 +0000 Subject: [PATCH 06/11] add first unit test, more debug logs in e2e --- pkg/controller/flow/reconcile.go | 2 + pkg/controller/flow/reconcile_test.go | 56 ++++++++++++++++++++------- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index f9aeabd9587..3ef4a4e6c7e 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -135,6 +135,7 @@ func (r *reconciler) reconcile(flow *v1alpha1.Flow) error { glog.Warningf("Failed to reconcile feed: %v", err) return err } + glog.Infof("Reconciled feed status for flow %q targeting %q : %+v", flow.Name, channelDNS, feed.Status) flow.Status.PropagateFeedStatus(feed.Status) } return nil @@ -304,6 +305,7 @@ func (r *reconciler) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*fee return nil, err } + glog.Infof("Reconciled feed: %+v", feed) // Should make sure feed is what it should be. For now, just assume it's fine // if it exists. return feed, err diff --git a/pkg/controller/flow/reconcile_test.go b/pkg/controller/flow/reconcile_test.go index 9b4fa02718b..4a1c033c12c 100644 --- a/pkg/controller/flow/reconcile_test.go +++ b/pkg/controller/flow/reconcile_test.go @@ -50,6 +50,7 @@ var ( // deletionTime is used when objects are marked as deleted. Rfc3339Copy() // truncates to seconds to match the loss of precision during serialization. deletionTime = metav1.Now().Rfc3339Copy() + targetURI = "http://target.example.com" ) const ( @@ -68,17 +69,13 @@ func init() { var testCases = []controllertesting.TestCase{ { - Name: "new flow: adds status, finalizer, creates job", + Name: "new flow: adds status, action target resolved", InitialState: []runtime.Object{ - // getEventSource(), - // getEventType(), getNewFlow(), }, ReconcileKey: "test/test-flow", - WantPresent: []runtime.Object{ - // getStartInProgressFeed(), - // getNewStartJob(), - //TODO job created event + WantPresent: []runtime.Object{ + getActionTargetResolvedFlow(), }, }, } @@ -95,17 +92,46 @@ func TestAllCases(t *testing.T) { } } -func getNewFlow() *feedsv1alpha1.Feed { - return &feedsv1alpha1.Feed{ +func getActionTargetResolvedFlow() *flowsv1alpha1.Flow { + newFlow := getNewFlow() + newFlow.Status = flowsv1alpha1.FlowStatus{ + Conditions: []flowsv1alpha1.FlowCondition{{ + Type: flowsv1alpha1.FlowConditionReady, + Status: corev1.ConditionUnknown, + }, { + Type: flowsv1alpha1.FlowConditionActionTargetResolved, + Status: corev1.ConditionTrue, + Reason: "ActionTargetResolved", + Message: fmt.Sprintf("Resolved to: %q", targetURI), + }}, + } + /* + newFlow.Status = flowsv1alpha1.FlowStatus{ + Conditions: []flowsv1alpha1.FlowCondition{{ + Type: flowsv1alpha1.FlowConditionActionTargetResolved, + Status: corev1.ConditionTrue, + Reason: "ActionTargetResolved", + Message: fmt.Sprintf("Resolved to: %q", targetURI), + }, { + Type: flowsv1alpha1.FlowConditionReady, + Status: corev1.ConditionUnknown, + }}, + } + */ + return newFlow +} + +func getNewFlow() *flowsv1alpha1.Flow { + return &flowsv1alpha1.Flow{ TypeMeta: flowType(), - ObjectMeta: om("test", "test-feed"), - Spec: feedsv1alpha1.FeedSpec{ - Action: feedsv1alpha1.FeedAction{ - DNSName: targetDNS, + ObjectMeta: om("test", "test-flow"), + Spec: flowsv1alpha1.FlowSpec{ + Action: flowsv1alpha1.FlowAction{ + TargetURI: &targetURI, }, - Trigger: feedsv1alpha1.EventTrigger{ + Trigger: flowsv1alpha1.EventTrigger{ EventType: eventType, - Resource: "", + Resource: "myresource", Service: "", Parameters: nil, ParametersFrom: nil, From 73c19f2c8a1a4283b514eddd27dbef9460ee1f32 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Thu, 26 Jul 2018 18:03:30 +0000 Subject: [PATCH 07/11] requeue if flow is not ready --- pkg/controller/flow/reconcile.go | 4 ++- pkg/controller/flow/reconcile_test.go | 49 +++++++++++++++++++-------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index 3ef4a4e6c7e..409d95eb272 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -87,7 +87,9 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err glog.Warningf("Failed to update flow status: %v", err) return reconcile.Result{}, err } - return reconcile.Result{}, err + + // Requeue if the resource is not ready: + return reconcile.Result{Requeue: !flow.Status.IsReady()}, err } func (r *reconciler) reconcile(flow *v1alpha1.Flow) error { diff --git a/pkg/controller/flow/reconcile_test.go b/pkg/controller/flow/reconcile_test.go index 4a1c033c12c..68e2b6b8fcf 100644 --- a/pkg/controller/flow/reconcile_test.go +++ b/pkg/controller/flow/reconcile_test.go @@ -57,6 +57,7 @@ const ( targetDNS = "myservice.mynamespace.svc.cluster.local" eventType = "myeventtype" eventSource = "myeventsource" + flowName = "test-flow" ) func init() { @@ -76,6 +77,7 @@ var testCases = []controllertesting.TestCase{ ReconcileKey: "test/test-flow", WantPresent: []runtime.Object{ getActionTargetResolvedFlow(), + getNewChannel(), }, }, } @@ -105,26 +107,13 @@ func getActionTargetResolvedFlow() *flowsv1alpha1.Flow { Message: fmt.Sprintf("Resolved to: %q", targetURI), }}, } - /* - newFlow.Status = flowsv1alpha1.FlowStatus{ - Conditions: []flowsv1alpha1.FlowCondition{{ - Type: flowsv1alpha1.FlowConditionActionTargetResolved, - Status: corev1.ConditionTrue, - Reason: "ActionTargetResolved", - Message: fmt.Sprintf("Resolved to: %q", targetURI), - }, { - Type: flowsv1alpha1.FlowConditionReady, - Status: corev1.ConditionUnknown, - }}, - } - */ return newFlow } func getNewFlow() *flowsv1alpha1.Flow { return &flowsv1alpha1.Flow{ TypeMeta: flowType(), - ObjectMeta: om("test", "test-flow"), + ObjectMeta: om("test", flowName), Spec: flowsv1alpha1.FlowSpec{ Action: flowsv1alpha1.FlowAction{ TargetURI: &targetURI, @@ -140,6 +129,21 @@ func getNewFlow() *flowsv1alpha1.Flow { } } +func getNewChannel() *channelsv1alpha1.Channel { + channel := &channelsv1alpha1.Channel{ + TypeMeta: channelType(), + ObjectMeta: om("test", flowName), + Spec: channelsv1alpha1.ChannelSpec{ + ClusterBus: "stub", + }, + } + channel.ObjectMeta.OwnerReferences = append(channel.ObjectMeta.OwnerReferences, getOwnerReference()) + + // selflink is not filled in when we create the object, so clear it + channel.ObjectMeta.SelfLink = "" + return channel +} + func flowType() metav1.TypeMeta { return metav1.TypeMeta{ APIVersion: flowsv1alpha1.SchemeGroupVersion.String(), @@ -147,6 +151,13 @@ func flowType() metav1.TypeMeta { } } +func channelType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: channelsv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + } +} + func om(namespace, name string) metav1.ObjectMeta { return metav1.ObjectMeta{ Namespace: namespace, @@ -160,3 +171,13 @@ func omDeleting(namespace, name string) metav1.ObjectMeta { om.DeletionTimestamp = &deletionTime return om } + +func getOwnerReference() metav1.OwnerReference { + return metav1.OwnerReference{ + APIVersion: flowsv1alpha1.SchemeGroupVersion.String(), + Kind: "Flow", + Name: flowName, + Controller: &falseVal, + BlockOwnerDeletion: &falseVal, + } +} From 5e82d659e6b38b0cdf43bcb82664c4fe270f3112 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Thu, 26 Jul 2018 18:36:11 +0000 Subject: [PATCH 08/11] add a unit test --- pkg/controller/flow/reconcile_test.go | 68 ++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/pkg/controller/flow/reconcile_test.go b/pkg/controller/flow/reconcile_test.go index 68e2b6b8fcf..5c580869844 100644 --- a/pkg/controller/flow/reconcile_test.go +++ b/pkg/controller/flow/reconcile_test.go @@ -17,8 +17,6 @@ limitations under the License. package flow import ( - // "encoding/base64" - // "encoding/json" "fmt" "testing" @@ -29,20 +27,12 @@ import ( servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" - /* - "github.com/knative/eventing/pkg/controller/feed/resources" - "github.com/knative/eventing/pkg/sources" - batchv1 "k8s.io/api/batch/v1" - */) - -/* -TODO -- initial: feed with job deadline exceeded - reconciled: feed failure, job exists, finalizer -*/ +) var ( trueVal = true @@ -75,9 +65,12 @@ var testCases = []controllertesting.TestCase{ getNewFlow(), }, ReconcileKey: "test/test-flow", + // Not ready, requeue. + WantResult: reconcile.Result{Requeue: true}, WantPresent: []runtime.Object{ getActionTargetResolvedFlow(), getNewChannel(), + getNewSubscription(), }, }, } @@ -144,6 +137,41 @@ func getNewChannel() *channelsv1alpha1.Channel { return channel } +func getNewSubscription() *channelsv1alpha1.Subscription { + subscription := &channelsv1alpha1.Subscription{ + TypeMeta: subscriptionType(), + ObjectMeta: om("test", flowName), + Spec: channelsv1alpha1.SubscriptionSpec{ + Channel: flowName, + Subscriber: targetURI, + }, + } + subscription.ObjectMeta.OwnerReferences = append(subscription.ObjectMeta.OwnerReferences, getOwnerReference()) + + // selflink is not filled in when we create the object, so clear it + subscription.ObjectMeta.SelfLink = "" + return subscription +} + +func getNewFeed() *feedsv1alpha1.Feed { + return &feedsv1alpha1.Feed{ + TypeMeta: feedType(), + ObjectMeta: om("test", "test-flow"), + Spec: feedsv1alpha1.FeedSpec{ + Action: feedsv1alpha1.FeedAction{ + DNSName: targetURI, + }, + Trigger: feedsv1alpha1.EventTrigger{ + EventType: eventType, + Resource: "", + Service: "", + Parameters: nil, + ParametersFrom: nil, + }, + }, + } +} + func flowType() metav1.TypeMeta { return metav1.TypeMeta{ APIVersion: flowsv1alpha1.SchemeGroupVersion.String(), @@ -151,6 +179,13 @@ func flowType() metav1.TypeMeta { } } +func feedType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: feedsv1alpha1.SchemeGroupVersion.String(), + Kind: "Feed", + } +} + func channelType() metav1.TypeMeta { return metav1.TypeMeta{ APIVersion: channelsv1alpha1.SchemeGroupVersion.String(), @@ -158,6 +193,13 @@ func channelType() metav1.TypeMeta { } } +func subscriptionType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: channelsv1alpha1.SchemeGroupVersion.String(), + Kind: "Subscription", + } +} + func om(namespace, name string) metav1.ObjectMeta { return metav1.ObjectMeta{ Namespace: namespace, From 98b56fe7baaba0fa4348168f16393b47b8123f6f Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Fri, 27 Jul 2018 01:45:12 +0000 Subject: [PATCH 09/11] watch for channel,subscription and feed and reconcile if they change --- pkg/controller/flow/provider.go | 15 +++++++++++++++ pkg/controller/flow/reconcile.go | 4 ++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/controller/flow/provider.go b/pkg/controller/flow/provider.go index 363d751cf03..f6bbfea6e40 100644 --- a/pkg/controller/flow/provider.go +++ b/pkg/controller/flow/provider.go @@ -56,6 +56,21 @@ func ProvideController(mrg manager.Manager) (controller.Controller, error) { return nil, err } + // In addition to watching Flow objects, watch for objects that a Flow creates and own and when changes + // are made to them, enqueue owning Flow object for reconcile loop. + if err := c.Watch(&source.Kind{Type: &channelsv1alpha1.Channel{}}, + &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Flow{}, IsController: true}); err != nil { + return nil, err + } + if err := c.Watch(&source.Kind{Type: &channelsv1alpha1.Subscription{}}, + &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Flow{}, IsController: true}); err != nil { + return nil, err + } + if err := c.Watch(&source.Kind{Type: &feedsv1alpha1.Feed{}}, + &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Flow{}, IsController: true}); err != nil { + return nil, err + } + return c, nil } diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index 409d95eb272..36745a64ac6 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -89,7 +89,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err } // Requeue if the resource is not ready: - return reconcile.Result{Requeue: !flow.Status.IsReady()}, err + return reconcile.Result{}, err } func (r *reconciler) reconcile(flow *v1alpha1.Flow) error { @@ -352,7 +352,7 @@ func (r *reconciler) createFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv func (r *reconciler) NewControllerRef(flow *v1alpha1.Flow) *metav1.OwnerReference { blockOwnerDeletion := false - isController := false + isController := true revRef := metav1.NewControllerRef(flow, flowControllerKind) revRef.BlockOwnerDeletion = &blockOwnerDeletion revRef.Controller = &isController From 949cea267f24e857d3ac25ec9e1efc1e49c85e30 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Fri, 27 Jul 2018 01:52:36 +0000 Subject: [PATCH 10/11] fix unit tests --- pkg/controller/flow/provider.go | 2 ++ pkg/controller/flow/reconcile_test.go | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/controller/flow/provider.go b/pkg/controller/flow/provider.go index f6bbfea6e40..23bf71edce6 100644 --- a/pkg/controller/flow/provider.go +++ b/pkg/controller/flow/provider.go @@ -17,6 +17,8 @@ limitations under the License. package flow import ( + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" "github.com/knative/eventing/pkg/apis/flows/v1alpha1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" diff --git a/pkg/controller/flow/reconcile_test.go b/pkg/controller/flow/reconcile_test.go index 5c580869844..828abe5db9f 100644 --- a/pkg/controller/flow/reconcile_test.go +++ b/pkg/controller/flow/reconcile_test.go @@ -65,8 +65,7 @@ var testCases = []controllertesting.TestCase{ getNewFlow(), }, ReconcileKey: "test/test-flow", - // Not ready, requeue. - WantResult: reconcile.Result{Requeue: true}, + WantResult: reconcile.Result{}, WantPresent: []runtime.Object{ getActionTargetResolvedFlow(), getNewChannel(), @@ -219,7 +218,7 @@ func getOwnerReference() metav1.OwnerReference { APIVersion: flowsv1alpha1.SchemeGroupVersion.String(), Kind: "Flow", Name: flowName, - Controller: &falseVal, + Controller: &trueVal, BlockOwnerDeletion: &falseVal, } } From c82b13d623df92a5bb3be502776f5697e9ec3943 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Fri, 27 Jul 2018 02:11:06 +0000 Subject: [PATCH 11/11] address review comments --- pkg/controller/flow/reconcile.go | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index 36745a64ac6..106b0b7b753 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -40,15 +40,6 @@ const defaultBusName = "stub" // What field do we assume Object Reference exports as a resolvable target const targetFieldName = "domainInternal" -const ( - // SuccessSynced is used as part of the Event 'reason' when a Flow is synced - SuccessSynced = "Synced" - - // MessageResourceSynced is the message used for an Event fired when a Flow - // is synced successfully - MessageResourceSynced = "Flow synced successfully" -) - var ( flowControllerKind = v1alpha1.SchemeGroupVersion.WithKind("Flow") ) @@ -114,7 +105,9 @@ func (r *reconciler) reconcile(flow *v1alpha1.Flow) error { glog.Infof("Resolved Target to: %q", target) // Reconcile the Channel. Creates a channel that is the target that the Feed will use. - // TODO: We should reuse channels possibly. + // TODO: We should reuse channels possibly. By this I mean that instead of creating a + // channel for each subdscription, we could look at existing channels and reuse one + // and only create a subscription to a channel instead. channel, err := r.reconcileChannel(flow) if err != nil { glog.Warningf("Failed to reconcile channel : %v", err) @@ -162,9 +155,8 @@ func (r *reconciler) updateStatus(flow *v1alpha1.Flow) (*v1alpha1.Flow, error) { return newFlow, nil } -// syncHandler compares the actual state with the desired, and attempts to -// converge the two. It then updates the Status block of the Flow resource -// with the current status of the resource. +// resolveActionTarget resolves the Action.Target. If it's an ObjectReference +// will resolve it, and if it's an TargetURI will just return it. func (r *reconciler) resolveActionTarget(namespace string, action v1alpha1.FlowAction) (string, error) { glog.Infof("Resolving target: %+v", action) @@ -178,7 +170,7 @@ func (r *reconciler) resolveActionTarget(namespace string, action v1alpha1.FlowA return "", fmt.Errorf("No resolvable action target: %+v", action) } -// resolveObjectReference fetches an object based on ObjectRefence. It assumes the +// resolveObjectReference fetches an object based on ObjectReference. It assumes the // object has a status["domainInternal"] string in it and returns it. func (r *reconciler) resolveObjectReference(namespace string, ref *corev1.ObjectReference) (string, error) { resourceClient, err := CreateResourceInterface(r.restConfig, ref, namespace) @@ -224,9 +216,9 @@ func (r *reconciler) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Ch return nil, err } - // Should make sure channel is what it should be. For now, just assume it's fine + // TODO: Make sure channel is what it should be. For now, just assume it's fine // if it exists. - return channel, err + return channel, nil } func (r *reconciler) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { @@ -265,9 +257,9 @@ func (r *reconciler) reconcileSubscription(channelName string, target string, fl return nil, err } - // Should make sure subscription is what it should be. For now, just assume it's fine + // TODO: Make sure subscription is what it should be. For now, just assume it's fine // if it exists. - return subscription, err + return subscription, nil } func (r *reconciler) createSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { @@ -308,9 +300,9 @@ func (r *reconciler) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*fee } glog.Infof("Reconciled feed: %+v", feed) - // Should make sure feed is what it should be. For now, just assume it's fine + // TODO: Make sure feed is what it should be. For now, just assume it's fine // if it exists. - return feed, err + return feed, nil }