From 6b0fbffb77c7dcabfad7fae2ab37faa12ddbcf99 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 8 Mar 2019 12:32:34 -0800 Subject: [PATCH 1/6] Move the logging util out of contrib, in preparation for use by other Eventing controllers using it. --- contrib/gcppubsub/pkg/controller/channel/reconcile.go | 2 +- .../pkg/controller/clusterchannelprovisioner/reconcile.go | 2 +- contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go | 2 +- contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go | 2 +- contrib/gcppubsub/pkg/util/creds.go | 2 +- contrib/gcppubsub/pkg/util/status.go | 2 +- {contrib/gcppubsub/pkg/util => pkg/utils}/logging/logging.go | 0 7 files changed, 6 insertions(+), 6 deletions(-) rename {contrib/gcppubsub/pkg/util => pkg/utils}/logging/logging.go (100%) diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile.go b/contrib/gcppubsub/pkg/controller/channel/reconcile.go index f1f6c384575..a49d8b8bfd2 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile.go @@ -22,10 +22,10 @@ import ( ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/reconciler/names" + "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" "golang.org/x/oauth2/google" v1 "k8s.io/api/core/v1" diff --git a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go index bf388ca648a..f6a3bdc6086 100644 --- a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go +++ b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go @@ -19,9 +19,9 @@ package clusterchannelprovisioner import ( "context" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go index 7e369e45072..978923e3ece 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go @@ -28,10 +28,10 @@ import ( ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" diff --git a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go index 6178f42228b..87a18dcc467 100644 --- a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go +++ b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go @@ -24,9 +24,9 @@ import ( "cloud.google.com/go/pubsub" "github.com/knative/eventing/contrib/gcppubsub/pkg/dispatcher/receiver/cache" "github.com/knative/eventing/contrib/gcppubsub/pkg/util" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/contrib/gcppubsub/pkg/util/creds.go b/contrib/gcppubsub/pkg/util/creds.go index 89f73118bf3..3bc8bfedaef 100644 --- a/contrib/gcppubsub/pkg/util/creds.go +++ b/contrib/gcppubsub/pkg/util/creds.go @@ -21,7 +21,7 @@ import ( "errors" "fmt" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" + "github.com/knative/eventing/pkg/utils/logging" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/contrib/gcppubsub/pkg/util/status.go b/contrib/gcppubsub/pkg/util/status.go index c4b5713f104..2e1ee827e25 100644 --- a/contrib/gcppubsub/pkg/util/status.go +++ b/contrib/gcppubsub/pkg/util/status.go @@ -20,8 +20,8 @@ import ( "context" "encoding/json" - "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/contrib/gcppubsub/pkg/util/logging/logging.go b/pkg/utils/logging/logging.go similarity index 100% rename from contrib/gcppubsub/pkg/util/logging/logging.go rename to pkg/utils/logging/logging.go From c662187eecad4067a4d70d33f2b19a22ad29c1f8 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 8 Mar 2019 12:42:56 -0800 Subject: [PATCH 2/6] Thread through context. --- .../v1alpha1/subscription/subscription.go | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/pkg/reconciler/v1alpha1/subscription/subscription.go b/pkg/reconciler/v1alpha1/subscription/subscription.go index 98f6ab4c6a2..240a06bb32c 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription.go @@ -21,13 +21,6 @@ import ( "fmt" "net/url" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/source" - "github.com/golang/glog" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -42,8 +35,14 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) const ( @@ -96,9 +95,10 @@ func ProvideController(mgr manager.Manager) (controller.Controller, error) { // converge the two. It then updates the Status block of the Subscription resource // with the current status of the resource. func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() glog.Infof("Reconciling subscription %v", request) subscription := &v1alpha1.Subscription{} - err := r.client.Get(context.TODO(), request.NamespacedName, subscription) + err := r.client.Get(ctx, request.NamespacedName, subscription) if errors.IsNotFound(err) { glog.Errorf("could not find subscription %v\n", request) @@ -112,7 +112,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // Reconcile this copy of the Subscription and then write back any status // updates regardless of whether the reconcile error out. - err = r.reconcile(subscription) + err = r.reconcile(ctx, subscription) if err != nil { glog.Warningf("Error reconciling Subscription: %v", err) } else { @@ -120,7 +120,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err r.recorder.Eventf(subscription, corev1.EventTypeNormal, subscriptionReconciled, "Subscription reconciled: %q", subscription.Name) } - if _, updateStatusErr := r.updateStatus(subscription.DeepCopy()); updateStatusErr != nil { + if _, updateStatusErr := r.updateStatus(ctx, subscription.DeepCopy()); updateStatusErr != nil { glog.Warningf("Failed to update subscription status: %v", updateStatusErr) r.recorder.Eventf(subscription, corev1.EventTypeWarning, subscriptionUpdateStatusFailed, "Failed to update Subscription's status: %v", err) return reconcile.Result{}, updateStatusErr @@ -130,7 +130,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, err } -func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { +func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subscription) error { subscription.Status.InitializeConditions() // See if the subscription has been deleted @@ -139,7 +139,7 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { // If the subscription is Ready, then we have to remove it // from the channel's subscriber list. if subscription.Status.IsReady() { - err := r.syncPhysicalChannel(subscription, true) + err := r.syncPhysicalChannel(ctx, subscription, true) if err != nil { glog.Warningf("Failed to sync physical from Channel : %s", err) r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) @@ -158,7 +158,7 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { return err } - if subscriberURI, err := ResolveSubscriberSpec(context.TODO(), r.client, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil { + if subscriberURI, err := ResolveSubscriberSpec(ctx, r.client, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil { glog.Warningf("Failed to resolve Subscriber %+v : %s", *subscription.Spec.Subscriber, err) r.recorder.Eventf(subscription, corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber: %v", err) return err @@ -181,7 +181,7 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile // the Channel with this information. - err = r.syncPhysicalChannel(subscription, false) + err = r.syncPhysicalChannel(ctx, subscription, false) if err != nil { glog.Warningf("Failed to sync physical Channel : %s", err) r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) @@ -202,11 +202,11 @@ func isNilOrEmptyReply(reply *v1alpha1.ReplyStrategy) bool { } // updateStatus may in fact update the subscription's finalizers in addition to the status -func (r *reconciler) updateStatus(subscription *v1alpha1.Subscription) (*v1alpha1.Subscription, error) { +func (r *reconciler) updateStatus(ctx context.Context, subscription *v1alpha1.Subscription) (*v1alpha1.Subscription, error) { objectKey := client.ObjectKey{Namespace: subscription.Namespace, Name: subscription.Name} latestSubscription := &v1alpha1.Subscription{} - if err := r.client.Get(context.TODO(), objectKey, latestSubscription); err != nil { + if err := r.client.Get(ctx, objectKey, latestSubscription); err != nil { return nil, err } @@ -214,7 +214,7 @@ func (r *reconciler) updateStatus(subscription *v1alpha1.Subscription) (*v1alpha if !equality.Semantic.DeepEqual(latestSubscription.Finalizers, subscription.Finalizers) { latestSubscription.SetFinalizers(subscription.ObjectMeta.Finalizers) - if err := r.client.Update(context.TODO(), latestSubscription); err != nil { + if err := r.client.Update(ctx, latestSubscription); err != nil { return nil, err } subscriptionChanged = true @@ -227,13 +227,13 @@ func (r *reconciler) updateStatus(subscription *v1alpha1.Subscription) (*v1alpha if subscriptionChanged { // Refetch latestSubscription = &v1alpha1.Subscription{} - if err := r.client.Get(context.TODO(), objectKey, latestSubscription); err != nil { + if err := r.client.Get(ctx, objectKey, latestSubscription); err != nil { return nil, err } } latestSubscription.Status = subscription.Status - if err := r.client.Status().Update(context.TODO(), latestSubscription); err != nil { + if err := r.client.Status().Update(ctx, latestSubscription); err != nil { return nil, err } @@ -333,10 +333,10 @@ func domainToURL(domain string) string { return u.String() } -func (r *reconciler) syncPhysicalChannel(sub *v1alpha1.Subscription, isDeleted bool) error { +func (r *reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription, isDeleted bool) error { glog.Infof("Reconciling Physical From Channel: %+v", sub) - subs, err := r.listAllSubscriptionsWithPhysicalChannel(sub) + subs, err := r.listAllSubscriptionsWithPhysicalChannel(ctx, sub) if err != nil { glog.Infof("Unable to list all subscriptions with physical channel: %+v", err) return err @@ -360,7 +360,7 @@ func (r *reconciler) syncPhysicalChannel(sub *v1alpha1.Subscription, isDeleted b return nil } -func (r *reconciler) listAllSubscriptionsWithPhysicalChannel(sub *v1alpha1.Subscription) ([]v1alpha1.Subscription, error) { +func (r *reconciler) listAllSubscriptionsWithPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription) ([]v1alpha1.Subscription, error) { subs := make([]v1alpha1.Subscription, 0) opts := &client.ListOptions{ @@ -369,7 +369,6 @@ func (r *reconciler) listAllSubscriptionsWithPhysicalChannel(sub *v1alpha1.Subsc // into opts.Raw.Continue. Raw: &metav1.ListOptions{}, } - ctx := context.TODO() for { sl := &v1alpha1.SubscriptionList{} err := r.client.List(ctx, opts, sl) From 3456ed870a762c960715c3ce814b1088518178ec Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 8 Mar 2019 13:09:40 -0800 Subject: [PATCH 3/6] Swap glog for zap.Logger in the Subscription reconciler. --- .../v1alpha1/subscription/subscription.go | 87 +++++++++++-------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/pkg/reconciler/v1alpha1/subscription/subscription.go b/pkg/reconciler/v1alpha1/subscription/subscription.go index 240a06bb32c..82b059ff1a7 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription.go @@ -21,13 +21,14 @@ import ( "fmt" "net/url" - "github.com/golang/glog" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/reconciler/names" + "github.com/knative/eventing/pkg/utils/logging" duckapis "github.com/knative/pkg/apis" "github.com/knative/pkg/apis/duck" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -95,18 +96,18 @@ func ProvideController(mgr manager.Manager) (controller.Controller, error) { // converge the two. It then updates the Status block of the Subscription resource // with the current status of the resource. func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { - ctx := context.TODO() - glog.Infof("Reconciling subscription %v", request) + ctx := logging.With(context.TODO(), zap.Any("request", request)) + logging.FromContext(ctx).Debug("Reconciling Subscription") subscription := &v1alpha1.Subscription{} err := r.client.Get(ctx, request.NamespacedName, subscription) if errors.IsNotFound(err) { - glog.Errorf("could not find subscription %v\n", request) + logging.FromContext(ctx).Error("Could not find Subscription") return reconcile.Result{}, nil } if err != nil { - glog.Errorf("could not fetch Subscription %v for %+v\n", err, request) + logging.FromContext(ctx).Error("Error getting Subscription", zap.Error(err)) return reconcile.Result{}, err } @@ -114,14 +115,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // updates regardless of whether the reconcile error out. err = r.reconcile(ctx, subscription) if err != nil { - glog.Warningf("Error reconciling Subscription: %v", err) + logging.FromContext(ctx).Warn("Error reconciling Subscription", zap.Error(err)) } else { - glog.Info("Subscription reconciled") + logging.FromContext(ctx).Debug("Subscription reconciled") r.recorder.Eventf(subscription, corev1.EventTypeNormal, subscriptionReconciled, "Subscription reconciled: %q", subscription.Name) } if _, updateStatusErr := r.updateStatus(ctx, subscription.DeepCopy()); updateStatusErr != nil { - glog.Warningf("Failed to update subscription status: %v", updateStatusErr) + logging.FromContext(ctx).Warn("Failed to update the Subscription", zap.Error(err)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, subscriptionUpdateStatusFailed, "Failed to update Subscription's status: %v", err) return reconcile.Result{}, updateStatusErr } @@ -134,14 +135,13 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc subscription.Status.InitializeConditions() // See if the subscription has been deleted - glog.Infof("DeletionTimestamp: %v", subscription.DeletionTimestamp) if subscription.DeletionTimestamp != nil { // If the subscription is Ready, then we have to remove it // from the channel's subscriber list. if subscription.Status.IsReady() { err := r.syncPhysicalChannel(ctx, subscription, true) if err != nil { - glog.Warningf("Failed to sync physical from Channel : %s", err) + logging.FromContext(ctx).Warn("Failed to sync physical from Channel", zap.Error(err)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return err } @@ -151,29 +151,35 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc } // Verify that `channel` exists. - _, err := fetchObjectReference(r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel) + _, err := fetchObjectReference(ctx, r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel) if err != nil { - glog.Warningf("Failed to validate `channel` exists: %+v, %v", subscription.Spec.Channel, err) + logging.FromContext(ctx).Warn("Failed to validate Channel exists", + zap.Error(err), + zap.Any("channel", subscription.Spec.Channel)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, channelReferenceFetchFailed, "Failed to validate spec.channel exists: %v", err) return err } if subscriberURI, err := ResolveSubscriberSpec(ctx, r.client, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil { - glog.Warningf("Failed to resolve Subscriber %+v : %s", *subscription.Spec.Subscriber, err) + logging.FromContext(ctx).Warn("Failed to resolve Subscriber", + zap.Error(err), + zap.Any("subscriber", subscription.Spec.Subscriber)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber: %v", err) return err } else { subscription.Status.PhysicalSubscription.SubscriberURI = subscriberURI - glog.Infof("Resolved subscriber to: %q", subscriberURI) + logging.FromContext(ctx).Debug("Resolved Subscriber", zap.String("subscriberURI", subscriberURI)) } - if replyURI, err := r.resolveResult(subscription.Namespace, subscription.Spec.Reply); err != nil { - glog.Warningf("Failed to resolve Result %v : %v", subscription.Spec.Reply, err) + if replyURI, err := r.resolveResult(ctx, subscription.Namespace, subscription.Spec.Reply); err != nil { + logging.FromContext(ctx).Warn("Failed to resolve reply", + zap.Error(err), + zap.Any("reply", subscription.Spec.Reply)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, resultResolveFailed, "Failed to resolve spec.reply: %v", err) return err } else { subscription.Status.PhysicalSubscription.ReplyURI = replyURI - glog.Infof("Resolved reply to: %q", replyURI) + logging.FromContext(ctx).Debug("Resolved reply", zap.String("replyURI", replyURI)) } // Everything that was supposed to be resolved was, so flip the status bit on that. @@ -183,7 +189,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc // the Channel with this information. err = r.syncPhysicalChannel(ctx, subscription, false) if err != nil { - glog.Warningf("Failed to sync physical Channel : %s", err) + logging.FromContext(ctx).Warn("Failed to sync physical Channel", zap.Error(err)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return err } @@ -263,15 +269,19 @@ func ResolveSubscriberSpec(ctx context.Context, client client.Client, dynamicCli } err := client.Get(ctx, svcKey, svc) if err != nil { - glog.Warningf("Failed to fetch SubscriberSpec target as a K8s Service %+v: %s", s.Ref, err) + logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target as a K8s Service", + zap.Error(err), + zap.Any("subscriberSpec.Ref", s.Ref)) return "", err } return domainToURL(names.ServiceHostName(svc.Name, svc.Namespace)), nil } - obj, err := fetchObjectReference(dynamicClient, namespace, s.Ref) + obj, err := fetchObjectReference(ctx, dynamicClient, namespace, s.Ref) if err != nil { - glog.Warningf("Failed to fetch SubscriberSpec target %+v: %s", s.Ref, err) + logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target", + zap.Error(err), + zap.Any("subscriberSpec.Ref", s.Ref)) return "", err } t := duckv1alpha1.AddressableType{} @@ -292,19 +302,21 @@ func ResolveSubscriberSpec(ctx context.Context, client client.Client, dynamicCli } // resolveResult resolves the Spec.Result object. -func (r *reconciler) resolveResult(namespace string, replyStrategy *v1alpha1.ReplyStrategy) (string, error) { +func (r *reconciler) resolveResult(ctx context.Context, namespace string, replyStrategy *v1alpha1.ReplyStrategy) (string, error) { if isNilOrEmptyReply(replyStrategy) { return "", nil } - obj, err := fetchObjectReference(r.dynamicClient, namespace, replyStrategy.Channel) + obj, err := fetchObjectReference(ctx, r.dynamicClient, namespace, replyStrategy.Channel) if err != nil { - glog.Warningf("Failed to fetch ReplyStrategy channel %+v: %s", replyStrategy, err) + logging.FromContext(ctx).Warn("Failed to fetch ReplyStrategy Channel", + zap.Error(err), + zap.Any("replyStrategy", replyStrategy)) return "", err } s := duckv1alpha1.AddressableType{} err = duck.FromUnstructured(obj, &s) if err != nil { - glog.Warningf("Failed to deserialize Addressable target: %s", err) + logging.FromContext(ctx).Warn("Failed to deserialize Addressable target", zap.Error(err)) return "", err } if s.Status.Address != nil { @@ -314,10 +326,10 @@ func (r *reconciler) resolveResult(namespace string, replyStrategy *v1alpha1.Rep } // fetchObjectReference fetches an object based on ObjectReference. -func fetchObjectReference(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) { +func fetchObjectReference(ctx context.Context, dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) { resourceClient, err := createResourceInterface(dynamicClient, namespace, ref) if err != nil { - glog.Warningf("failed to create dynamic client resource: %v", err) + logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) return nil, err } @@ -334,11 +346,11 @@ func domainToURL(domain string) string { } func (r *reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription, isDeleted bool) error { - glog.Infof("Reconciling Physical From Channel: %+v", sub) + logging.FromContext(ctx).Debug("Reconciling physical from Channel", zap.Any("sub", sub)) subs, err := r.listAllSubscriptionsWithPhysicalChannel(ctx, sub) if err != nil { - glog.Infof("Unable to list all subscriptions with physical channel: %+v", err) + logging.FromContext(ctx).Info("Unable to list all Subscriptions with physical Channel", zap.Error(err)) return err } @@ -350,9 +362,9 @@ func (r *reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subs } subscribable := r.createSubscribable(subs) - if patchErr := r.patchPhysicalFrom(sub.Namespace, sub.Spec.Channel, subscribable); patchErr != nil { + if patchErr := r.patchPhysicalFrom(ctx, sub.Namespace, sub.Spec.Channel, subscribable); patchErr != nil { if isDeleted && errors.IsNotFound(patchErr) { - glog.Infof("could not find channel %v\n", sub.Spec.Channel) + logging.FromContext(ctx).Warn("Could not find Channel", zap.Any("channel", sub.Spec.Channel)) return nil } return patchErr @@ -412,9 +424,9 @@ func (r *reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingd return rv } -func (r *reconciler) patchPhysicalFrom(namespace string, physicalFrom corev1.ObjectReference, subs *eventingduck.Subscribable) error { +func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, physicalFrom corev1.ObjectReference, subs *eventingduck.Subscribable) error { // First get the original object and convert it to only the bits we care about - s, err := fetchObjectReference(r.dynamicClient, namespace, &physicalFrom) + s, err := fetchObjectReference(ctx, r.dynamicClient, namespace, &physicalFrom) if err != nil { return err } @@ -434,22 +446,21 @@ func (r *reconciler) patchPhysicalFrom(namespace string, physicalFrom corev1.Obj patchBytes, err := patch.MarshalJSON() if err != nil { - glog.Warningf("failed to marshal json patch: %s", err) + logging.FromContext(ctx).Warn("Failed to marshal JSON patch", zap.Error(err), zap.Any("patch", patch)) return err } resourceClient, err := createResourceInterface(r.dynamicClient, namespace, &physicalFrom) if err != nil { - glog.Warningf("failed to create dynamic client resource: %v", err) + logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) return err } patched, err := resourceClient.Patch(original.Name, types.JSONPatchType, patchBytes, metav1.UpdateOptions{}) if err != nil { - glog.Warningf("Failed to patch the object: %s", err) - glog.Warningf("Patch was: %+v", patch) + logging.FromContext(ctx).Warn("Failed to patch the Channel", zap.Error(err), zap.Any("patch", patch)) return err } - glog.Warningf("Patched resource: %+v", patched) + logging.FromContext(ctx).Debug("Patched resource", zap.Any("patched", patched)) return nil } From c586868066880a7ba44698b31df9f7e75a8fc0c7 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Fri, 8 Mar 2019 13:28:06 -0800 Subject: [PATCH 4/6] Pass in the logger form main(). --- cmd/controller/main.go | 4 ++-- pkg/reconciler/v1alpha1/channel/channel.go | 21 +++++++++---------- .../v1alpha1/subscription/subscription.go | 6 ++++-- .../subscription/subscription_test.go | 3 +++ 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index d8759939de0..888686f109d 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -60,7 +60,7 @@ var ( type SchemeFunc func(*runtime.Scheme) error // ProvideFunc adds a controller to a Manager. -type ProvideFunc func(manager.Manager) (controller.Controller, error) +type ProvideFunc func(manager.Manager, *zap.Logger) (controller.Controller, error) func main() { flag.Parse() @@ -123,7 +123,7 @@ func main() { channel.ProvideController, } for _, provider := range providers { - if _, err := provider(mgr); err != nil { + if _, err := provider(mgr, logger.Desugar()); err != nil { logger.Fatalf("Error adding controller to manager: %v", err) } } diff --git a/pkg/reconciler/v1alpha1/channel/channel.go b/pkg/reconciler/v1alpha1/channel/channel.go index c02fb49dbfd..7f7515f4e36 100644 --- a/pkg/reconciler/v1alpha1/channel/channel.go +++ b/pkg/reconciler/v1alpha1/channel/channel.go @@ -19,23 +19,22 @@ package channel import ( "context" + "github.com/golang/glog" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" - + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/source" - - "github.com/golang/glog" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/dynamic" - "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) const ( @@ -58,7 +57,7 @@ var _ reconcile.Reconciler = &reconciler{} // ProvideController returns a Channel controller. // This Channel controller is a default controller for channels of all provisioner kinds -func ProvideController(mgr manager.Manager) (controller.Controller, error) { +func ProvideController(mgr manager.Manager, _ *zap.Logger) (controller.Controller, error) { // Setup a new controller to Reconcile channel c, err := controller.New(controllerAgentName, mgr, controller.Options{ Reconciler: &reconciler{ diff --git a/pkg/reconciler/v1alpha1/subscription/subscription.go b/pkg/reconciler/v1alpha1/subscription/subscription.go index 82b059ff1a7..26b3698b687 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription.go @@ -67,17 +67,19 @@ type reconciler struct { restConfig *rest.Config dynamicClient dynamic.Interface recorder record.EventRecorder + logger *zap.Logger } // Verify the struct implements reconcile.Reconciler var _ reconcile.Reconciler = &reconciler{} // ProvideController returns a Subscription controller. -func ProvideController(mgr manager.Manager) (controller.Controller, error) { +func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Controller, error) { // Setup a new controller to Reconcile Subscriptions. c, err := controller.New(controllerAgentName, mgr, controller.Options{ Reconciler: &reconciler{ recorder: mgr.GetRecorder(controllerAgentName), + logger: logger, }, }) if err != nil { @@ -96,7 +98,7 @@ func ProvideController(mgr manager.Manager) (controller.Controller, error) { // converge the two. It then updates the Status block of the Subscription resource // with the current status of the resource. func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { - ctx := logging.With(context.TODO(), zap.Any("request", request)) + ctx := logging.WithLogger(context.TODO(), r.logger.With(zap.Any("request", request))) logging.FromContext(ctx).Debug("Reconciling Subscription") subscription := &v1alpha1.Subscription{} err := r.client.Get(ctx, request.NamespacedName, subscription) diff --git a/pkg/reconciler/v1alpha1/subscription/subscription_test.go b/pkg/reconciler/v1alpha1/subscription/subscription_test.go index df5a81034c9..a7863032544 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription_test.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription_test.go @@ -20,6 +20,8 @@ import ( "fmt" "testing" + "go.uber.org/zap" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" @@ -1100,6 +1102,7 @@ func TestAllCases(t *testing.T) { dynamicClient: dc, restConfig: &rest.Config{}, recorder: recorder, + logger: zap.NewNop(), } tc.ReconcileKey = fmt.Sprintf("%s/%s", testNS, subscriptionName) tc.IgnoreTimes = true From e063553c092d51bdd666dac61b70b783e58898c5 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Mon, 11 Mar 2019 15:24:34 -0700 Subject: [PATCH 5/6] Move ResolveSubscriberSpec out of Subscription Controller, as it will be used by multiple Controllers. --- .../v1alpha1/subscription/subscription.go | 111 +- .../subscription/subscription_test.go | 1759 ++++++++--------- pkg/utils/resolve/subscriber.go | 114 ++ pkg/utils/resolve/subscriber_test.go | 254 +++ 4 files changed, 1248 insertions(+), 990 deletions(-) create mode 100644 pkg/utils/resolve/subscriber.go create mode 100644 pkg/utils/resolve/subscriber_test.go diff --git a/pkg/reconciler/v1alpha1/subscription/subscription.go b/pkg/reconciler/v1alpha1/subscription/subscription.go index 26b3698b687..6ce4f6a5706 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription.go @@ -19,13 +19,11 @@ package subscription import ( "context" "fmt" - "net/url" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/reconciler/names" "github.com/knative/eventing/pkg/utils/logging" - duckapis "github.com/knative/pkg/apis" + "github.com/knative/eventing/pkg/utils/resolve" "github.com/knative/pkg/apis/duck" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" @@ -87,7 +85,7 @@ func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Cont } // Watch Subscription events and enqueue Subscription object key. - if err := c.Watch(&source.Kind{Type: &v1alpha1.Subscription{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err = c.Watch(&source.Kind{Type: &v1alpha1.Subscription{}}, &handler.EnqueueRequestForObject{}); err != nil { return nil, err } @@ -153,8 +151,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc } // Verify that `channel` exists. - _, err := fetchObjectReference(ctx, r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel) - if err != nil { + if _, err := resolve.ObjectReference(ctx, r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel); err != nil { logging.FromContext(ctx).Warn("Failed to validate Channel exists", zap.Error(err), zap.Any("channel", subscription.Spec.Channel)) @@ -162,7 +159,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc return err } - if subscriberURI, err := ResolveSubscriberSpec(ctx, r.client, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil { + if subscriberURI, err := resolve.SubscriberSpec(ctx, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil { logging.FromContext(ctx).Warn("Failed to resolve Subscriber", zap.Error(err), zap.Any("subscriber", subscription.Spec.Subscriber)) @@ -189,8 +186,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile // the Channel with this information. - err = r.syncPhysicalChannel(ctx, subscription, false) - if err != nil { + if err := r.syncPhysicalChannel(ctx, subscription, false); err != nil { logging.FromContext(ctx).Warn("Failed to sync physical Channel", zap.Error(err)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return err @@ -201,10 +197,6 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc return nil } -func isNilOrEmptySubscriber(sub *v1alpha1.SubscriberSpec) bool { - return sub == nil || equality.Semantic.DeepEqual(sub, &v1alpha1.SubscriberSpec{}) -} - func isNilOrEmptyReply(reply *v1alpha1.ReplyStrategy) bool { return reply == nil || equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{}) } @@ -248,67 +240,12 @@ func (r *reconciler) updateStatus(ctx context.Context, subscription *v1alpha1.Su return latestSubscription, nil } -// ResolveSubscriberSpec resolves the Spec.Call object. If it's an -// ObjectReference will resolve the object and treat it as an Addressable. If -// it's DNSName then it's used as is. -// TODO: Once Service Routes, etc. support Callable, use that. -// -func ResolveSubscriberSpec(ctx context.Context, client client.Client, dynamicClient dynamic.Interface, namespace string, s *v1alpha1.SubscriberSpec) (string, error) { - if isNilOrEmptySubscriber(s) { - return "", nil - } - if s.DNSName != nil && *s.DNSName != "" { - return *s.DNSName, nil - } - - // K8s services are special cased. They can be called, even though they do not satisfy the - // Callable interface. - if s.Ref != nil && s.Ref.APIVersion == "v1" && s.Ref.Kind == "Service" { - svc := &corev1.Service{} - svcKey := types.NamespacedName{ - Namespace: namespace, - Name: s.Ref.Name, - } - err := client.Get(ctx, svcKey, svc) - if err != nil { - logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target as a K8s Service", - zap.Error(err), - zap.Any("subscriberSpec.Ref", s.Ref)) - return "", err - } - return domainToURL(names.ServiceHostName(svc.Name, svc.Namespace)), nil - } - - obj, err := fetchObjectReference(ctx, dynamicClient, namespace, s.Ref) - if err != nil { - logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target", - zap.Error(err), - zap.Any("subscriberSpec.Ref", s.Ref)) - return "", err - } - t := duckv1alpha1.AddressableType{} - if err := duck.FromUnstructured(obj, &t); err == nil { - if t.Status.Address != nil { - return domainToURL(t.Status.Address.Hostname), nil - } - } - - legacy := duckv1alpha1.LegacyTarget{} - if err := duck.FromUnstructured(obj, &legacy); err == nil { - if legacy.Status.DomainInternal != "" { - return domainToURL(legacy.Status.DomainInternal), nil - } - } - - return "", fmt.Errorf("status does not contain address") -} - // resolveResult resolves the Spec.Result object. func (r *reconciler) resolveResult(ctx context.Context, namespace string, replyStrategy *v1alpha1.ReplyStrategy) (string, error) { if isNilOrEmptyReply(replyStrategy) { return "", nil } - obj, err := fetchObjectReference(ctx, r.dynamicClient, namespace, replyStrategy.Channel) + obj, err := resolve.ObjectReference(ctx, r.dynamicClient, namespace, replyStrategy.Channel) if err != nil { logging.FromContext(ctx).Warn("Failed to fetch ReplyStrategy Channel", zap.Error(err), @@ -322,31 +259,11 @@ func (r *reconciler) resolveResult(ctx context.Context, namespace string, replyS return "", err } if s.Status.Address != nil { - return domainToURL(s.Status.Address.Hostname), nil + return resolve.DomainToURL(s.Status.Address.Hostname), nil } return "", fmt.Errorf("status does not contain address") } -// fetchObjectReference fetches an object based on ObjectReference. -func fetchObjectReference(ctx context.Context, dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) { - resourceClient, err := createResourceInterface(dynamicClient, namespace, ref) - if err != nil { - logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) - return nil, err - } - - return resourceClient.Get(ref.Name, metav1.GetOptions{}) -} - -func domainToURL(domain string) string { - u := url.URL{ - Scheme: "http", - Host: domain, - Path: "/", - } - return u.String() -} - func (r *reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription, isDeleted bool) error { logging.FromContext(ctx).Debug("Reconciling physical from Channel", zap.Any("sub", sub)) @@ -428,7 +345,7 @@ func (r *reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingd func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, physicalFrom corev1.ObjectReference, subs *eventingduck.Subscribable) error { // First get the original object and convert it to only the bits we care about - s, err := fetchObjectReference(ctx, r.dynamicClient, namespace, &physicalFrom) + s, err := resolve.ObjectReference(ctx, r.dynamicClient, namespace, &physicalFrom) if err != nil { return err } @@ -452,7 +369,7 @@ func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, ph return err } - resourceClient, err := createResourceInterface(r.dynamicClient, namespace, &physicalFrom) + resourceClient, err := resolve.ResourceInterface(r.dynamicClient, namespace, &physicalFrom) if err != nil { logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) return err @@ -466,16 +383,6 @@ func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, ph return nil } -func createResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) { - rc := dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind())) - - if rc == nil { - return nil, fmt.Errorf("failed to create dynamic client resource") - } - return rc.Namespace(namespace), nil - -} - func addFinalizer(sub *v1alpha1.Subscription) { finalizers := sets.NewString(sub.Finalizers...) finalizers.Insert(finalizerName) diff --git a/pkg/reconciler/v1alpha1/subscription/subscription_test.go b/pkg/reconciler/v1alpha1/subscription/subscription_test.go index a7863032544..1d89263ea92 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription_test.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription_test.go @@ -26,6 +26,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "github.com/knative/eventing/pkg/utils" + "github.com/knative/eventing/pkg/utils/resolve" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,1002 +84,987 @@ func init() { duckv1alpha1.AddToScheme(scheme.Scheme) } -var testCases = []controllertesting.TestCase{ - { - Name: "subscription does not exist", - WantErr: false, - }, { - Name: "subscription but From channel does not exist", - InitialState: []runtime.Object{ - Subscription(), - }, - WantErrMsg: `channels.eventing.knative.dev "fromchannel" not found`, - WantEvent: []corev1.Event{ - events[channelReferenceFetchFailed], - }, - }, { - Name: "subscription, but From is not subscribable", - InitialState: []runtime.Object{ - Subscription().FromSource(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. It should actually fail saying that there is no - // Spec.Subscribers field. - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": sourceKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - }, - "spec": map[string]interface{}{}, - }, +func TestAllCases(t *testing.T) { + testCases := []controllertesting.TestCase{ + { + Name: "subscription does not exist", + WantErr: false, + }, { + Name: "subscription but From channel does not exist", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantErrMsg: `channels.eventing.knative.dev "fromchannel" not found`, + WantEvent: []corev1.Event{ + events[channelReferenceFetchFailed], + }, + }, { + Name: "subscription, but From is not subscribable", + InitialState: []runtime.Object{ + Subscription().FromSource(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. It should actually fail saying that there is no + // Spec.Subscribers field. + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": sourceKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, }, + "spec": map[string]interface{}{}, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, }, }, }, - }, - }, - }, { - Name: "Valid channel, subscriber does not exist", - InitialState: []runtime.Object{ - Subscription(), - }, - WantErrMsg: `routes.serving.knative.dev "subscriberroute" not found`, - WantPresent: []runtime.Object{ - Subscription().UnknownConditions(), - }, - WantEvent: []corev1.Event{ - events[subscriberResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - }, - }, { - Name: "Valid channel, subscriber is not callable", - InitialState: []runtime.Object{ - Subscription(), - }, - WantPresent: []runtime.Object{ - Subscription().UnknownConditions(), - }, - WantErrMsg: "status does not contain address", - WantEvent: []corev1.Event{ - events[subscriberResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "Valid channel, subscriber does not exist", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "someotherstuff": targetDNS, - }, - }, + WantErrMsg: `routes.serving.knative.dev "subscriberroute" not found`, + WantPresent: []runtime.Object{ + Subscription().UnknownConditions(), }, - }, - }, { - Name: "Valid channel and subscriber, result does not exist", - InitialState: []runtime.Object{ - Subscription(), - }, - WantPresent: []runtime.Object{ - Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), - }, - WantErrMsg: `channels.eventing.knative.dev "resultchannel" not found`, - WantEvent: []corev1.Event{ - events[resultResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, }, - }, - }, { - Name: "valid channel, subscriber, result is not addressable", - InitialState: []runtime.Object{ - Subscription(), - }, - WantErrMsg: "status does not contain address", - WantPresent: []runtime.Object{ - // TODO: Again this works on gke cluster, but I need to set - // something else up here. later... - // Subscription().ReferencesResolved(), - Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), - }, - WantEvent: []corev1.Event{ - events[resultResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "Valid channel, subscriber is not callable", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantPresent: []runtime.Object{ + Subscription().UnknownConditions(), + }, + WantErrMsg: "status does not contain address", + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "someotherstuff": targetDNS, + }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified", - InitialState: []runtime.Object{ - Subscription(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "Valid channel and subscriber, result does not exist", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantPresent: []runtime.Object{ + Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), + }, + WantErrMsg: `channels.eventing.knative.dev "resultchannel" not found`, + WantEvent: []corev1.Event{ + events[resultResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil reply", - InitialState: []runtime.Object{ - Subscription().NilReply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "valid channel, subscriber, result is not addressable", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantErrMsg: "status does not contain address", + WantPresent: []runtime.Object{ + // TODO: Again this works on gke cluster, but I need to set + // something else up here. later... + // Subscription().ReferencesResolved(), + Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), + }, + WantEvent: []corev1.Event{ + events[resultResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil reply", - InitialState: []runtime.Object{ - Subscription().EmptyNonNilReply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, }, - }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, }, - }, - }, { - Name: "new subscription: adds status, target points to the legacy targetable interface", - InitialState: []runtime.Object{ - Subscription().EmptyNonNilReply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "domainInternal": targetDNS, - }, - }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), }, - }, - }, { - Name: "old subscription: updates status, removing the no longer present Subscriber", - InitialState: []runtime.Object{ - // This will have no Subscriber in the spec, but will have one in the status. - Subscription().NilSubscriber().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilSubscriber().ReferencesResolved().Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - }, - }, { - Name: "old subscription: updates status, removing the no longer present reply", - InitialState: []runtime.Object{ - // This will have no Reply in the spec, but will have one in the status. - Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil reply", + InitialState: []runtime.Object{ + Subscription().NilReply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "status": map[string]interface{}{ - "domainInternal": targetDNS, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil subscriber", - InitialState: []runtime.Object{ - Subscription().NilSubscriber(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilSubscriber().ReferencesResolved().Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil reply", + InitialState: []runtime.Object{ + Subscription().EmptyNonNilReply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, }, }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "new subscription: adds status, target points to the legacy targetable interface", + InitialState: []runtime.Object{ + Subscription().EmptyNonNilReply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "domainInternal": targetDNS, }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil subscriber", - InitialState: []runtime.Object{ - Subscription().EmptyNonNilSubscriber(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().EmptyNonNilSubscriber().ReferencesResolved().Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + }, { + Name: "old subscription: updates status, removing the no longer present Subscriber", + InitialState: []runtime.Object{ + // This will have no Subscriber in the spec, but will have one in the status. + Subscription().NilSubscriber().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilSubscriber().ReferencesResolved().Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "old subscription: updates status, removing the no longer present reply", + InitialState: []runtime.Object{ + // This will have no Reply in the spec, but will have one in the status. + Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "domainInternal": targetDNS, }, }, }, }, - }, - }, { - Name: "new subscription to non-existent K8s Service: fails with no service found", - InitialState: []runtime.Object{ - Subscription().ToK8sService(), - }, - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ToK8sService().UnknownConditions(), - }, - WantErrMsg: "services \"testk8sservice\" not found", - WantEvent: []corev1.Event{ - events[subscriberResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil subscriber", + InitialState: []runtime.Object{ + Subscription().NilSubscriber(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilSubscriber().ReferencesResolved().Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - }, - }, { - Name: "new subscription to K8s Service: adds status, all targets resolved, subscribers modified", - InitialState: []runtime.Object{ - Subscription().ToK8sService(), - getK8sService(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ToK8sService().ReferencesResolved().PhysicalSubscriber(k8sServiceDNS).Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil subscriber", + InitialState: []runtime.Object{ + Subscription().EmptyNonNilSubscriber(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().EmptyNonNilSubscriber().ReferencesResolved().Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, }, - }, - // Subscriber (using K8s Service) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Service", - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": k8sServiceName, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, { + Name: "new subscription to non-existent K8s Service: fails with no service found", + InitialState: []runtime.Object{ + Subscription().ToK8sService(), + }, + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ToK8sService().UnknownConditions(), + }, + WantErrMsg: "services \"testk8sservice\" not found", + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, }, - }, - }, { - Name: "new subscription with from channel: adds status, all targets resolved, subscribers modified", - InitialState: []runtime.Object{ - Subscription(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantErrMsg: "invalid JSON document", - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source with a reference to the From Channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": sourceKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "new subscription to K8s Service: adds status, all targets resolved, subscribers modified", + InitialState: []runtime.Object{ + Subscription().ToK8sService(), + getK8sService(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ToK8sService().ReferencesResolved().PhysicalSubscriber(k8sServiceDNS).Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, }, - }, - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + // Subscriber (using K8s Service) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Service", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": k8sServiceName, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + }, { + Name: "new subscription with from channel: adds status, all targets resolved, subscribers modified", + InitialState: []runtime.Object{ + Subscription(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantErrMsg: "invalid JSON document", + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source with a reference to the From Channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": sourceKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, }, }, }, }, }, - }, - { - Name: "sync multiple Subscriptions to one channel", - InitialState: []runtime.Object{ - // The first two Subscriptions both have the same physical From, so we should see that - // Channel updated with both Subscriptions. - Subscription(), - Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - // This subscription has a different physical From, so we should not see it in the same - // Channel as the first two. - Subscription().DifferentChannel(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantErrMsg: "invalid JSON document", - WantPresent: []runtime.Object{ + { + Name: "sync multiple Subscriptions to one channel", + InitialState: []runtime.Object{ + // The first two Subscriptions both have the same physical From, so we should see that + // Channel updated with both Subscriptions. + Subscription(), + Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + // This subscription has a different physical From, so we should not see it in the same + // Channel as the first two. + Subscription().DifferentChannel(), + }, // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. The entire test is really to - // verify the following, but can't be done because the call to Patch fails (it assumes - // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it - // out. - //getChannelWithMultipleSubscriptions(), - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - // Unaltered because this Subscription was not reconciled. - Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - Subscription().DifferentChannel(), - }, - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source with a reference to the From Channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": sourceKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantErrMsg: "invalid JSON document", + WantPresent: []runtime.Object{ + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. The entire test is really to + // verify the following, but can't be done because the call to Patch fails (it assumes + // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it + // out. + //getChannelWithMultipleSubscriptions(), + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + // Unaltered because this Subscription was not reconciled. + Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + Subscription().DifferentChannel(), }, - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + Objects: []runtime.Object{ + // Source with a reference to the From Channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": sourceKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, }, }, }, }, }, - }, - { - Name: "delete subscription with from channel: subscribers modified", - InitialState: []runtime.Object{ - Subscription().Deleted().ChannelReady(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantErrMsg: "invalid JSON document", - WantAbsent: []runtime.Object{ - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. The entire test is really to - // verify the following, but can't be done because the call to Patch fails (it assumes - // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it - // out. - //getNewDeletedSubscriptionWithChannelReady(), - }, - WantPresent: []runtime.Object{ + { + Name: "delete subscription with from channel: subscribers modified", + InitialState: []runtime.Object{ + Subscription().Deleted().ChannelReady(), + }, // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. The entire test is really to - // verify the following, but can't be done because the call to Patch fails (it assumes - // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it - // out. - //getChannelWithOtherSubscription(), - }, - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "channelable": map[string]interface{}{ - "subscribers": []interface{}{ - map[string]interface{}{ - "subscriberURI": targetDNS, - "replyURI": sinkableDNS, - }, - map[string]interface{}{ - "replyURI": otherAddressableDNS, + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantErrMsg: "invalid JSON document", + WantAbsent: []runtime.Object{ + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. The entire test is really to + // verify the following, but can't be done because the call to Patch fails (it assumes + // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it + // out. + //getNewDeletedSubscriptionWithChannelReady(), + }, + WantPresent: []runtime.Object{ + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. The entire test is really to + // verify the following, but can't be done because the call to Patch fails (it assumes + // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it + // out. + //getChannelWithOtherSubscription(), + }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "channelable": map[string]interface{}{ + "subscribers": []interface{}{ + map[string]interface{}{ + "subscriberURI": targetDNS, + "replyURI": sinkableDNS, + }, + map[string]interface{}{ + "replyURI": otherAddressableDNS, + }, }, }, }, @@ -1086,13 +1072,10 @@ var testCases = []controllertesting.TestCase{ }, }, }, - Scheme: scheme.Scheme, - }, -} - -func TestAllCases(t *testing.T) { + } for _, tc := range testCases { + tc.Scheme = scheme.Scheme c := tc.GetClient() dc := tc.GetDynamicClient() recorder := tc.GetEventRecorder() @@ -1111,7 +1094,7 @@ func TestAllCases(t *testing.T) { } func TestFinalizers(t *testing.T) { - var testcases = []struct { + testCases := []struct { name string original sets.String add bool @@ -1150,7 +1133,7 @@ func TestFinalizers(t *testing.T) { }, } - for _, tc := range testcases { + for _, tc := range testCases { original := &eventingv1alpha1.Subscription{} original.Finalizers = tc.original.List() if tc.add { @@ -1280,7 +1263,7 @@ func (s *SubscriptionBuilder) UnknownConditions() *SubscriptionBuilder { } func (s *SubscriptionBuilder) PhysicalSubscriber(dns string) *SubscriptionBuilder { - s.Status.PhysicalSubscription.SubscriberURI = domainToURL(dns) + s.Status.PhysicalSubscription.SubscriberURI = resolve.DomainToURL(dns) return s } @@ -1291,7 +1274,7 @@ func (s *SubscriptionBuilder) ReferencesResolved() *SubscriptionBuilder { } func (s *SubscriptionBuilder) Reply() *SubscriptionBuilder { - s.Status.PhysicalSubscription.ReplyURI = domainToURL(sinkableDNS) + s.Status.PhysicalSubscription.ReplyURI = resolve.DomainToURL(sinkableDNS) return s } diff --git a/pkg/utils/resolve/subscriber.go b/pkg/utils/resolve/subscriber.go new file mode 100644 index 00000000000..0454a567a64 --- /dev/null +++ b/pkg/utils/resolve/subscriber.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolve + +import ( + "context" + "fmt" + "net/url" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/reconciler/names" + "github.com/knative/eventing/pkg/utils/logging" + duckapis "github.com/knative/pkg/apis" + "github.com/knative/pkg/apis/duck" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" +) + +// DomainToURL converts a domain into an HTTP URL. +func DomainToURL(domain string) string { + u := url.URL{ + Scheme: "http", + Host: domain, + Path: "/", + } + return u.String() +} + +// ResourceInterface creates a resource interface for the given ObjectReference. +func ResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) { + rc := dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind())) + + if rc == nil { + return nil, fmt.Errorf("failed to create dynamic client resource") + } + return rc.Namespace(namespace), nil +} + +// ObjectReference resolves an object based on an ObjectReference. +func ObjectReference(ctx context.Context, dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) { + resourceClient, err := ResourceInterface(dynamicClient, namespace, ref) + if err != nil { + logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) + return nil, err + } + + return resourceClient.Get(ref.Name, metav1.GetOptions{}) +} + +// ResolveSubscriberSpec resolves the Spec.Call object. If it's an +// ObjectReference will resolve the object and treat it as an Addressable. If +// it's DNSName then it's used as is. +// TODO: Once Service Routes, etc. support Callable, use that. +func SubscriberSpec(ctx context.Context, dynamicClient dynamic.Interface, namespace string, s *v1alpha1.SubscriberSpec) (string, error) { + if isNilOrEmptySubscriber(s) { + return "", nil + } + if s.DNSName != nil && *s.DNSName != "" { + return *s.DNSName, nil + } + + obj, err := ObjectReference(ctx, dynamicClient, namespace, s.Ref) + if err != nil { + logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target", + zap.Error(err), + zap.Any("subscriberSpec.Ref", s.Ref)) + return "", err + } + + // K8s services are special cased. They can be called, even though they do not satisfy the + // Callable interface. + if s.Ref != nil && s.Ref.APIVersion == "v1" && s.Ref.Kind == "Service" { + // This Service must exist because ObjectReference did not return an error. + return DomainToURL(names.ServiceHostName(s.Ref.Name, namespace)), nil + } + + t := duckv1alpha1.AddressableType{} + if err = duck.FromUnstructured(obj, &t); err == nil { + if t.Status.Address != nil { + return DomainToURL(t.Status.Address.Hostname), nil + } + } + + legacy := duckv1alpha1.LegacyTarget{} + if err = duck.FromUnstructured(obj, &legacy); err == nil { + if legacy.Status.DomainInternal != "" { + return DomainToURL(legacy.Status.DomainInternal), nil + } + } + + return "", fmt.Errorf("status does not contain address") +} + +func isNilOrEmptySubscriber(sub *v1alpha1.SubscriberSpec) bool { + return sub == nil || equality.Semantic.DeepEqual(sub, &v1alpha1.SubscriberSpec{}) +} diff --git a/pkg/utils/resolve/subscriber_test.go b/pkg/utils/resolve/subscriber_test.go new file mode 100644 index 00000000000..4b9ee9c6912 --- /dev/null +++ b/pkg/utils/resolve/subscriber_test.go @@ -0,0 +1,254 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolve + +import ( + "context" + "fmt" + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" +) + +const ( + testNS = "test-namespace" +) + +var ( + dnsName = "dns-name" + + channelAddress = "test-channel.hostname" + channelURL = fmt.Sprintf("http://%s/", channelAddress) + + legacyCallableAddress = "legacy-callable.domain-internal" + legacyCallableURL = fmt.Sprintf("http://%s/", legacyCallableAddress) +) + +func init() { + // Add types to scheme + _ = eventingv1alpha1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestDomainToURL(t *testing.T) { + d := "default-broker.default.svc.cluster.local" + e := fmt.Sprintf("http://%s/", d) + if actual := DomainToURL(d); e != actual { + t.Fatalf("Unexpected domain. Expected '%v', actually '%v'", e, actual) + } +} + +func TestResourceInterface_BadDynamicInterface(t *testing.T) { + actual, err := ResourceInterface(&badDynamicInterface{}, testNS, &corev1.ObjectReference{}) + if err.Error() != "failed to create dynamic client resource" { + t.Fatalf("Unexpected error '%v'", err) + } + if actual != nil { + t.Fatalf("Unexpected actual. Expected nil. Actual '%v'", actual) + } +} + +type badDynamicInterface struct{} + +var _ dynamic.Interface = &badDynamicInterface{} + +func (badDynamicInterface) Resource(_ schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { + return nil +} + +func TestObjectReference_BadDynamicInterface(t *testing.T) { + actual, err := ObjectReference(context.TODO(), &badDynamicInterface{}, testNS, &corev1.ObjectReference{}) + if err.Error() != "failed to create dynamic client resource" { + t.Fatalf("Unexpected error '%v'", err) + } + if actual != nil { + t.Fatalf("Unexpected actual. Expected nil. Actual '%v'", actual) + } +} + +func TestSubscriberSpec(t *testing.T) { + testCases := map[string]struct { + Sub *v1alpha1.SubscriberSpec + Objects []runtime.Object + Expected string + ExpectedErr string + }{ + "nil": { + Sub: nil, + Expected: "", + }, + "empty": { + Sub: &v1alpha1.SubscriberSpec{}, + Expected: "", + }, + "DNS Name": { + Sub: &v1alpha1.SubscriberSpec{ + DNSName: &dnsName, + }, + Expected: dnsName, + }, + "Doesn't exist": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Service", + Name: "doesnt-exist", + }, + }, + ExpectedErr: "services \"doesnt-exist\" not found", + }, + "K8s Service": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Service", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + k8sService("does-exist"), + }, + Expected: fmt.Sprintf("http://does-exist.%s.svc.cluster.local/", testNS), + }, + "Addressable": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Channel", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + channel("does-exist"), + }, + Expected: channelURL, + }, + "Legacy Callable": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "LegacyCallable", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + legacyCallable("does-exist"), + }, + Expected: legacyCallableURL, + }, + "Non-Addressable": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "ConfigMap", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + configMap("does-exist"), + }, + ExpectedErr: "status does not contain address", + }, + } + + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + dc := fake.NewSimpleDynamicClient(scheme.Scheme, tc.Objects...) + + actual, err := SubscriberSpec(context.TODO(), dc, testNS, tc.Sub) + if err != nil { + if tc.ExpectedErr == "" || tc.ExpectedErr != err.Error() { + t.Fatalf("Unexpected error. Expected '%s'. Actual '%s'.", tc.ExpectedErr, err.Error()) + } + } + if tc.Expected != actual { + t.Fatalf("Unexpected URL. Expected '%s'. Actual '%s'", tc.Expected, actual) + } + }) + } +} + +func k8sService(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Service", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + }, + } +} + +func channel(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "eventing.knative.dev/v1alpha1", + "kind": "Channel", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": channelAddress, + }, + }, + }, + } +} + +func legacyCallable(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "eventing.knative.dev/v1alpha1", + "kind": "LegacyCallable", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + "status": map[string]interface{}{ + "domainInternal": legacyCallableAddress, + }, + }, + } +} + +func configMap(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + }, + } +} From 7a3782751aac772456039b493f8f7f7d35c849c3 Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Tue, 12 Mar 2019 09:20:01 -0700 Subject: [PATCH 6/6] Fix bad merge of gcp-pubsub. --- contrib/gcppubsub/pkg/controller/channel/reconcile.go | 1 - .../pkg/controller/clusterchannelprovisioner/reconcile.go | 1 - contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go | 1 - contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go | 1 - 4 files changed, 4 deletions(-) diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile.go b/contrib/gcppubsub/pkg/controller/channel/reconcile.go index 05d58a06fad..b446f11de3c 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile.go @@ -26,7 +26,6 @@ import ( "github.com/knative/eventing/pkg/logging" util "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/reconciler/names" - "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" "golang.org/x/oauth2/google" v1 "k8s.io/api/core/v1" diff --git a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go index b979bc58960..ed42c57533f 100644 --- a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go +++ b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile.go @@ -22,7 +22,6 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" util "github.com/knative/eventing/pkg/provisioners" - "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go index 5aba65930a9..0186fbfbbe3 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go @@ -32,7 +32,6 @@ import ( "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" util "github.com/knative/eventing/pkg/provisioners" - "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" diff --git a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go index fefb4144a48..702ee7fd5f4 100644 --- a/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go +++ b/contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go @@ -27,7 +27,6 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" - "github.com/knative/eventing/pkg/utils/logging" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client"