diff --git a/pkg/reconciler/v1alpha1/channel/channel_test.go b/pkg/reconciler/v1alpha1/channel/channel_test.go index 6b4354de1cc..75764342b16 100644 --- a/pkg/reconciler/v1alpha1/channel/channel_test.go +++ b/pkg/reconciler/v1alpha1/channel/channel_test.go @@ -22,8 +22,6 @@ import ( "fmt" "testing" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" @@ -33,6 +31,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) diff --git a/pkg/reconciler/v1alpha1/subscription/subscription.go b/pkg/reconciler/v1alpha1/subscription/subscription.go index 2602e569d41..1ebbec7b117 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription.go @@ -19,13 +19,11 @@ package subscription import ( "context" "fmt" - "net/url" eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" - "github.com/knative/eventing/pkg/reconciler/names" - duckapis "github.com/knative/pkg/apis" + "github.com/knative/eventing/pkg/utils/resolve" "github.com/knative/pkg/apis/duck" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" @@ -87,7 +85,7 @@ func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Cont } // Watch Subscription events and enqueue Subscription object key. - if err := c.Watch(&source.Kind{Type: &v1alpha1.Subscription{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err = c.Watch(&source.Kind{Type: &v1alpha1.Subscription{}}, &handler.EnqueueRequestForObject{}); err != nil { return nil, err } @@ -153,8 +151,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc } // Verify that `channel` exists. - _, err := fetchObjectReference(ctx, r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel) - if err != nil { + if _, err := resolve.ObjectReference(ctx, r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel); err != nil { logging.FromContext(ctx).Warn("Failed to validate Channel exists", zap.Error(err), zap.Any("channel", subscription.Spec.Channel)) @@ -162,7 +159,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc return err } - if subscriberURI, err := ResolveSubscriberSpec(ctx, r.client, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil { + if subscriberURI, err := resolve.SubscriberSpec(ctx, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil { logging.FromContext(ctx).Warn("Failed to resolve Subscriber", zap.Error(err), zap.Any("subscriber", subscription.Spec.Subscriber)) @@ -189,8 +186,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile // the Channel with this information. - err = r.syncPhysicalChannel(ctx, subscription, false) - if err != nil { + if err := r.syncPhysicalChannel(ctx, subscription, false); err != nil { logging.FromContext(ctx).Warn("Failed to sync physical Channel", zap.Error(err)) r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return err @@ -201,10 +197,6 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc return nil } -func isNilOrEmptySubscriber(sub *v1alpha1.SubscriberSpec) bool { - return sub == nil || equality.Semantic.DeepEqual(sub, &v1alpha1.SubscriberSpec{}) -} - func isNilOrEmptyReply(reply *v1alpha1.ReplyStrategy) bool { return reply == nil || equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{}) } @@ -248,67 +240,12 @@ func (r *reconciler) updateStatus(ctx context.Context, subscription *v1alpha1.Su return latestSubscription, nil } -// ResolveSubscriberSpec resolves the Spec.Call object. If it's an -// ObjectReference will resolve the object and treat it as an Addressable. If -// it's DNSName then it's used as is. -// TODO: Once Service Routes, etc. support Callable, use that. -// -func ResolveSubscriberSpec(ctx context.Context, client client.Client, dynamicClient dynamic.Interface, namespace string, s *v1alpha1.SubscriberSpec) (string, error) { - if isNilOrEmptySubscriber(s) { - return "", nil - } - if s.DNSName != nil && *s.DNSName != "" { - return *s.DNSName, nil - } - - // K8s services are special cased. They can be called, even though they do not satisfy the - // Callable interface. - if s.Ref != nil && s.Ref.APIVersion == "v1" && s.Ref.Kind == "Service" { - svc := &corev1.Service{} - svcKey := types.NamespacedName{ - Namespace: namespace, - Name: s.Ref.Name, - } - err := client.Get(ctx, svcKey, svc) - if err != nil { - logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target as a K8s Service", - zap.Error(err), - zap.Any("subscriberSpec.Ref", s.Ref)) - return "", err - } - return domainToURL(names.ServiceHostName(svc.Name, svc.Namespace)), nil - } - - obj, err := fetchObjectReference(ctx, dynamicClient, namespace, s.Ref) - if err != nil { - logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target", - zap.Error(err), - zap.Any("subscriberSpec.Ref", s.Ref)) - return "", err - } - t := duckv1alpha1.AddressableType{} - if err := duck.FromUnstructured(obj, &t); err == nil { - if t.Status.Address != nil { - return domainToURL(t.Status.Address.Hostname), nil - } - } - - legacy := duckv1alpha1.LegacyTarget{} - if err := duck.FromUnstructured(obj, &legacy); err == nil { - if legacy.Status.DomainInternal != "" { - return domainToURL(legacy.Status.DomainInternal), nil - } - } - - return "", fmt.Errorf("status does not contain address") -} - // resolveResult resolves the Spec.Result object. func (r *reconciler) resolveResult(ctx context.Context, namespace string, replyStrategy *v1alpha1.ReplyStrategy) (string, error) { if isNilOrEmptyReply(replyStrategy) { return "", nil } - obj, err := fetchObjectReference(ctx, r.dynamicClient, namespace, replyStrategy.Channel) + obj, err := resolve.ObjectReference(ctx, r.dynamicClient, namespace, replyStrategy.Channel) if err != nil { logging.FromContext(ctx).Warn("Failed to fetch ReplyStrategy Channel", zap.Error(err), @@ -322,31 +259,11 @@ func (r *reconciler) resolveResult(ctx context.Context, namespace string, replyS return "", err } if s.Status.Address != nil { - return domainToURL(s.Status.Address.Hostname), nil + return resolve.DomainToURL(s.Status.Address.Hostname), nil } return "", fmt.Errorf("status does not contain address") } -// fetchObjectReference fetches an object based on ObjectReference. -func fetchObjectReference(ctx context.Context, dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) { - resourceClient, err := createResourceInterface(dynamicClient, namespace, ref) - if err != nil { - logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) - return nil, err - } - - return resourceClient.Get(ref.Name, metav1.GetOptions{}) -} - -func domainToURL(domain string) string { - u := url.URL{ - Scheme: "http", - Host: domain, - Path: "/", - } - return u.String() -} - func (r *reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription, isDeleted bool) error { logging.FromContext(ctx).Debug("Reconciling physical from Channel", zap.Any("sub", sub)) @@ -428,7 +345,7 @@ func (r *reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingd func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, physicalFrom corev1.ObjectReference, subs *eventingduck.Subscribable) error { // First get the original object and convert it to only the bits we care about - s, err := fetchObjectReference(ctx, r.dynamicClient, namespace, &physicalFrom) + s, err := resolve.ObjectReference(ctx, r.dynamicClient, namespace, &physicalFrom) if err != nil { return err } @@ -452,7 +369,7 @@ func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, ph return err } - resourceClient, err := createResourceInterface(r.dynamicClient, namespace, &physicalFrom) + resourceClient, err := resolve.ResourceInterface(r.dynamicClient, namespace, &physicalFrom) if err != nil { logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) return err @@ -466,16 +383,6 @@ func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, ph return nil } -func createResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) { - rc := dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind())) - - if rc == nil { - return nil, fmt.Errorf("failed to create dynamic client resource") - } - return rc.Namespace(namespace), nil - -} - func addFinalizer(sub *v1alpha1.Subscription) { finalizers := sets.NewString(sub.Finalizers...) finalizers.Insert(finalizerName) diff --git a/pkg/reconciler/v1alpha1/subscription/subscription_test.go b/pkg/reconciler/v1alpha1/subscription/subscription_test.go index a7863032544..1d89263ea92 100644 --- a/pkg/reconciler/v1alpha1/subscription/subscription_test.go +++ b/pkg/reconciler/v1alpha1/subscription/subscription_test.go @@ -26,6 +26,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "github.com/knative/eventing/pkg/utils" + "github.com/knative/eventing/pkg/utils/resolve" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,1002 +84,987 @@ func init() { duckv1alpha1.AddToScheme(scheme.Scheme) } -var testCases = []controllertesting.TestCase{ - { - Name: "subscription does not exist", - WantErr: false, - }, { - Name: "subscription but From channel does not exist", - InitialState: []runtime.Object{ - Subscription(), - }, - WantErrMsg: `channels.eventing.knative.dev "fromchannel" not found`, - WantEvent: []corev1.Event{ - events[channelReferenceFetchFailed], - }, - }, { - Name: "subscription, but From is not subscribable", - InitialState: []runtime.Object{ - Subscription().FromSource(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. It should actually fail saying that there is no - // Spec.Subscribers field. - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": sourceKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - }, - "spec": map[string]interface{}{}, - }, +func TestAllCases(t *testing.T) { + testCases := []controllertesting.TestCase{ + { + Name: "subscription does not exist", + WantErr: false, + }, { + Name: "subscription but From channel does not exist", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantErrMsg: `channels.eventing.knative.dev "fromchannel" not found`, + WantEvent: []corev1.Event{ + events[channelReferenceFetchFailed], + }, + }, { + Name: "subscription, but From is not subscribable", + InitialState: []runtime.Object{ + Subscription().FromSource(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. It should actually fail saying that there is no + // Spec.Subscribers field. + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": sourceKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, }, + "spec": map[string]interface{}{}, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, }, }, }, - }, - }, - }, { - Name: "Valid channel, subscriber does not exist", - InitialState: []runtime.Object{ - Subscription(), - }, - WantErrMsg: `routes.serving.knative.dev "subscriberroute" not found`, - WantPresent: []runtime.Object{ - Subscription().UnknownConditions(), - }, - WantEvent: []corev1.Event{ - events[subscriberResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - }, - }, { - Name: "Valid channel, subscriber is not callable", - InitialState: []runtime.Object{ - Subscription(), - }, - WantPresent: []runtime.Object{ - Subscription().UnknownConditions(), - }, - WantErrMsg: "status does not contain address", - WantEvent: []corev1.Event{ - events[subscriberResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "Valid channel, subscriber does not exist", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "someotherstuff": targetDNS, - }, - }, + WantErrMsg: `routes.serving.knative.dev "subscriberroute" not found`, + WantPresent: []runtime.Object{ + Subscription().UnknownConditions(), }, - }, - }, { - Name: "Valid channel and subscriber, result does not exist", - InitialState: []runtime.Object{ - Subscription(), - }, - WantPresent: []runtime.Object{ - Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), - }, - WantErrMsg: `channels.eventing.knative.dev "resultchannel" not found`, - WantEvent: []corev1.Event{ - events[resultResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, }, - }, - }, { - Name: "valid channel, subscriber, result is not addressable", - InitialState: []runtime.Object{ - Subscription(), - }, - WantErrMsg: "status does not contain address", - WantPresent: []runtime.Object{ - // TODO: Again this works on gke cluster, but I need to set - // something else up here. later... - // Subscription().ReferencesResolved(), - Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), - }, - WantEvent: []corev1.Event{ - events[resultResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "Valid channel, subscriber is not callable", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantPresent: []runtime.Object{ + Subscription().UnknownConditions(), + }, + WantErrMsg: "status does not contain address", + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "someotherstuff": targetDNS, + }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified", - InitialState: []runtime.Object{ - Subscription(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "Valid channel and subscriber, result does not exist", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantPresent: []runtime.Object{ + Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), + }, + WantErrMsg: `channels.eventing.knative.dev "resultchannel" not found`, + WantEvent: []corev1.Event{ + events[resultResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil reply", - InitialState: []runtime.Object{ - Subscription().NilReply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "valid channel, subscriber, result is not addressable", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + WantErrMsg: "status does not contain address", + WantPresent: []runtime.Object{ + // TODO: Again this works on gke cluster, but I need to set + // something else up here. later... + // Subscription().ReferencesResolved(), + Subscription().UnknownConditions().PhysicalSubscriber(targetDNS), + }, + WantEvent: []corev1.Event{ + events[resultResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil reply", - InitialState: []runtime.Object{ - Subscription().EmptyNonNilReply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, }, - }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, }, - }, - }, { - Name: "new subscription: adds status, target points to the legacy targetable interface", - InitialState: []runtime.Object{ - Subscription().EmptyNonNilReply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified", + InitialState: []runtime.Object{ + Subscription(), }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "domainInternal": targetDNS, - }, - }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), }, - }, - }, { - Name: "old subscription: updates status, removing the no longer present Subscriber", - InitialState: []runtime.Object{ - // This will have no Subscriber in the spec, but will have one in the status. - Subscription().NilSubscriber().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilSubscriber().ReferencesResolved().Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - }, - }, { - Name: "old subscription: updates status, removing the no longer present reply", - InitialState: []runtime.Object{ - // This will have no Reply in the spec, but will have one in the status. - Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil reply", + InitialState: []runtime.Object{ + Subscription().NilReply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "status": map[string]interface{}{ - "domainInternal": targetDNS, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil subscriber", - InitialState: []runtime.Object{ - Subscription().NilSubscriber(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().NilSubscriber().ReferencesResolved().Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil reply", + InitialState: []runtime.Object{ + Subscription().EmptyNonNilReply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, }, }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "new subscription: adds status, target points to the legacy targetable interface", + InitialState: []runtime.Object{ + Subscription().EmptyNonNilReply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).EmptyNonNilReply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "domainInternal": targetDNS, }, }, }, }, - }, - }, { - Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil subscriber", - InitialState: []runtime.Object{ - Subscription().EmptyNonNilSubscriber(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().EmptyNonNilSubscriber().ReferencesResolved().Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + }, { + Name: "old subscription: updates status, removing the no longer present Subscriber", + InitialState: []runtime.Object{ + // This will have no Subscriber in the spec, but will have one in the status. + Subscription().NilSubscriber().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilSubscriber().ReferencesResolved().Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "old subscription: updates status, removing the no longer present reply", + InitialState: []runtime.Object{ + // This will have no Reply in the spec, but will have one in the status. + Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilReply().ReferencesResolved().PhysicalSubscriber(targetDNS), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "domainInternal": targetDNS, }, }, }, }, - }, - }, { - Name: "new subscription to non-existent K8s Service: fails with no service found", - InitialState: []runtime.Object{ - Subscription().ToK8sService(), - }, - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ToK8sService().UnknownConditions(), - }, - WantErrMsg: "services \"testk8sservice\" not found", - WantEvent: []corev1.Event{ - events[subscriberResolveFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- nil subscriber", + InitialState: []runtime.Object{ + Subscription().NilSubscriber(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().NilSubscriber().ReferencesResolved().Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - }, - }, { - Name: "new subscription to K8s Service: adds status, all targets resolved, subscribers modified", - InitialState: []runtime.Object{ - Subscription().ToK8sService(), - getK8sService(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantPresent: []runtime.Object{ - Subscription().ToK8sService().ReferencesResolved().PhysicalSubscriber(k8sServiceDNS).Reply(), - }, - WantErrMsg: "invalid JSON document", - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "new subscription: adds status, all targets resolved, subscribers modified -- empty but non-nil subscriber", + InitialState: []runtime.Object{ + Subscription().EmptyNonNilSubscriber(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().EmptyNonNilSubscriber().ReferencesResolved().Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, }, - }, - // Subscriber (using K8s Service) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Service", - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": k8sServiceName, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, { + Name: "new subscription to non-existent K8s Service: fails with no service found", + InitialState: []runtime.Object{ + Subscription().ToK8sService(), + }, + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ToK8sService().UnknownConditions(), + }, + WantErrMsg: "services \"testk8sservice\" not found", + WantEvent: []corev1.Event{ + events[subscriberResolveFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, }, - }, - }, { - Name: "new subscription with from channel: adds status, all targets resolved, subscribers modified", - InitialState: []runtime.Object{ - Subscription(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantErrMsg: "invalid JSON document", - WantPresent: []runtime.Object{ - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - }, - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source with a reference to the From Channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": sourceKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, { + Name: "new subscription to K8s Service: adds status, all targets resolved, subscribers modified", + InitialState: []runtime.Object{ + Subscription().ToK8sService(), + getK8sService(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + Subscription().ToK8sService().ReferencesResolved().PhysicalSubscriber(k8sServiceDNS).Reply(), + }, + WantErrMsg: "invalid JSON document", + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, }, - }, - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, + // Subscriber (using K8s Service) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Service", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": k8sServiceName, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, + }, }, }, }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + }, { + Name: "new subscription with from channel: adds status, all targets resolved, subscribers modified", + InitialState: []runtime.Object{ + Subscription(), + }, + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantErrMsg: "invalid JSON document", + WantPresent: []runtime.Object{ + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source with a reference to the From Channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": sourceKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, }, }, }, }, }, - }, - { - Name: "sync multiple Subscriptions to one channel", - InitialState: []runtime.Object{ - // The first two Subscriptions both have the same physical From, so we should see that - // Channel updated with both Subscriptions. - Subscription(), - Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - // This subscription has a different physical From, so we should not see it in the same - // Channel as the first two. - Subscription().DifferentChannel(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantErrMsg: "invalid JSON document", - WantPresent: []runtime.Object{ + { + Name: "sync multiple Subscriptions to one channel", + InitialState: []runtime.Object{ + // The first two Subscriptions both have the same physical From, so we should see that + // Channel updated with both Subscriptions. + Subscription(), + Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + // This subscription has a different physical From, so we should not see it in the same + // Channel as the first two. + Subscription().DifferentChannel(), + }, // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. The entire test is really to - // verify the following, but can't be done because the call to Patch fails (it assumes - // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it - // out. - //getChannelWithMultipleSubscriptions(), - Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - // Unaltered because this Subscription was not reconciled. - Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), - Subscription().DifferentChannel(), - }, - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Scheme: scheme.Scheme, - Objects: []runtime.Object{ - // Source with a reference to the From Channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": sourceKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantErrMsg: "invalid JSON document", + WantPresent: []runtime.Object{ + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. The entire test is really to + // verify the following, but can't be done because the call to Patch fails (it assumes + // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it + // out. + //getChannelWithMultipleSubscriptions(), + Subscription().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + // Unaltered because this Subscription was not reconciled. + Subscription().Renamed().ReferencesResolved().PhysicalSubscriber(targetDNS).Reply(), + Subscription().DifferentChannel(), }, - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, - }, - }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], }, - // Subscriber (using knative route) - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "serving.knative.dev/v1alpha1", - "kind": routeKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": routeName, - }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": targetDNS, + Objects: []runtime.Object{ + // Source with a reference to the From Channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": sourceKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, }, }, }, - }, - // Reply channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": resultChannelName, + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, }, - "spec": map[string]interface{}{ - "subscribable": map[string]interface{}{}, + }, + // Subscriber (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": targetDNS, + }, + }, }, - "status": map[string]interface{}{ - "address": map[string]interface{}{ - "hostname": sinkableDNS, + }, + // Reply channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": resultChannelName, + }, + "spec": map[string]interface{}{ + "subscribable": map[string]interface{}{}, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": sinkableDNS, + }, }, }, }, }, }, - }, - { - Name: "delete subscription with from channel: subscribers modified", - InitialState: []runtime.Object{ - Subscription().Deleted().ChannelReady(), - }, - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific - // failure for now, until upstream is fixed. - WantResult: reconcile.Result{}, - WantErrMsg: "invalid JSON document", - WantAbsent: []runtime.Object{ - // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. The entire test is really to - // verify the following, but can't be done because the call to Patch fails (it assumes - // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it - // out. - //getNewDeletedSubscriptionWithChannelReady(), - }, - WantPresent: []runtime.Object{ + { + Name: "delete subscription with from channel: subscribers modified", + InitialState: []runtime.Object{ + Subscription().Deleted().ChannelReady(), + }, // TODO: JSON patch is not working on the fake, see - // https://github.com/kubernetes/client-go/issues/478. The entire test is really to - // verify the following, but can't be done because the call to Patch fails (it assumes - // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it - // out. - //getChannelWithOtherSubscription(), - }, - WantEvent: []corev1.Event{ - events[physicalChannelSyncFailed], - }, - Objects: []runtime.Object{ - // Source channel - &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - "kind": channelKind, - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": fromChannelName, - }, - "spec": map[string]interface{}{ - "channelable": map[string]interface{}{ - "subscribers": []interface{}{ - map[string]interface{}{ - "subscriberURI": targetDNS, - "replyURI": sinkableDNS, - }, - map[string]interface{}{ - "replyURI": otherAddressableDNS, + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. + WantResult: reconcile.Result{}, + WantErrMsg: "invalid JSON document", + WantAbsent: []runtime.Object{ + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. The entire test is really to + // verify the following, but can't be done because the call to Patch fails (it assumes + // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it + // out. + //getNewDeletedSubscriptionWithChannelReady(), + }, + WantPresent: []runtime.Object{ + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. The entire test is really to + // verify the following, but can't be done because the call to Patch fails (it assumes + // a Strategic Merge Patch, whereas we are doing a JSON Patch). so for now, comment it + // out. + //getChannelWithOtherSubscription(), + }, + WantEvent: []corev1.Event{ + events[physicalChannelSyncFailed], + }, + Objects: []runtime.Object{ + // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": channelKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": fromChannelName, + }, + "spec": map[string]interface{}{ + "channelable": map[string]interface{}{ + "subscribers": []interface{}{ + map[string]interface{}{ + "subscriberURI": targetDNS, + "replyURI": sinkableDNS, + }, + map[string]interface{}{ + "replyURI": otherAddressableDNS, + }, }, }, }, @@ -1086,13 +1072,10 @@ var testCases = []controllertesting.TestCase{ }, }, }, - Scheme: scheme.Scheme, - }, -} - -func TestAllCases(t *testing.T) { + } for _, tc := range testCases { + tc.Scheme = scheme.Scheme c := tc.GetClient() dc := tc.GetDynamicClient() recorder := tc.GetEventRecorder() @@ -1111,7 +1094,7 @@ func TestAllCases(t *testing.T) { } func TestFinalizers(t *testing.T) { - var testcases = []struct { + testCases := []struct { name string original sets.String add bool @@ -1150,7 +1133,7 @@ func TestFinalizers(t *testing.T) { }, } - for _, tc := range testcases { + for _, tc := range testCases { original := &eventingv1alpha1.Subscription{} original.Finalizers = tc.original.List() if tc.add { @@ -1280,7 +1263,7 @@ func (s *SubscriptionBuilder) UnknownConditions() *SubscriptionBuilder { } func (s *SubscriptionBuilder) PhysicalSubscriber(dns string) *SubscriptionBuilder { - s.Status.PhysicalSubscription.SubscriberURI = domainToURL(dns) + s.Status.PhysicalSubscription.SubscriberURI = resolve.DomainToURL(dns) return s } @@ -1291,7 +1274,7 @@ func (s *SubscriptionBuilder) ReferencesResolved() *SubscriptionBuilder { } func (s *SubscriptionBuilder) Reply() *SubscriptionBuilder { - s.Status.PhysicalSubscription.ReplyURI = domainToURL(sinkableDNS) + s.Status.PhysicalSubscription.ReplyURI = resolve.DomainToURL(sinkableDNS) return s } diff --git a/pkg/utils/resolve/subscriber.go b/pkg/utils/resolve/subscriber.go new file mode 100644 index 00000000000..85400bfaed4 --- /dev/null +++ b/pkg/utils/resolve/subscriber.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolve + +import ( + "context" + "fmt" + "net/url" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/reconciler/names" + duckapis "github.com/knative/pkg/apis" + "github.com/knative/pkg/apis/duck" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" +) + +// DomainToURL converts a domain into an HTTP URL. +func DomainToURL(domain string) string { + u := url.URL{ + Scheme: "http", + Host: domain, + Path: "/", + } + return u.String() +} + +// ResourceInterface creates a resource interface for the given ObjectReference. +func ResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) { + rc := dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind())) + + if rc == nil { + return nil, fmt.Errorf("failed to create dynamic client resource") + } + return rc.Namespace(namespace), nil +} + +// ObjectReference resolves an object based on an ObjectReference. +func ObjectReference(ctx context.Context, dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) { + resourceClient, err := ResourceInterface(dynamicClient, namespace, ref) + if err != nil { + logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) + return nil, err + } + + return resourceClient.Get(ref.Name, metav1.GetOptions{}) +} + +// ResolveSubscriberSpec resolves the Spec.Call object. If it's an +// ObjectReference will resolve the object and treat it as an Addressable. If +// it's DNSName then it's used as is. +// TODO: Once Service Routes, etc. support Callable, use that. +func SubscriberSpec(ctx context.Context, dynamicClient dynamic.Interface, namespace string, s *v1alpha1.SubscriberSpec) (string, error) { + if isNilOrEmptySubscriber(s) { + return "", nil + } + if s.DNSName != nil && *s.DNSName != "" { + return *s.DNSName, nil + } + + obj, err := ObjectReference(ctx, dynamicClient, namespace, s.Ref) + if err != nil { + logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target", + zap.Error(err), + zap.Any("subscriberSpec.Ref", s.Ref)) + return "", err + } + + // K8s services are special cased. They can be called, even though they do not satisfy the + // Callable interface. + if s.Ref != nil && s.Ref.APIVersion == "v1" && s.Ref.Kind == "Service" { + // This Service must exist because ObjectReference did not return an error. + return DomainToURL(names.ServiceHostName(s.Ref.Name, namespace)), nil + } + + t := duckv1alpha1.AddressableType{} + if err = duck.FromUnstructured(obj, &t); err == nil { + if t.Status.Address != nil { + return DomainToURL(t.Status.Address.Hostname), nil + } + } + + legacy := duckv1alpha1.LegacyTarget{} + if err = duck.FromUnstructured(obj, &legacy); err == nil { + if legacy.Status.DomainInternal != "" { + return DomainToURL(legacy.Status.DomainInternal), nil + } + } + + return "", fmt.Errorf("status does not contain address") +} + +func isNilOrEmptySubscriber(sub *v1alpha1.SubscriberSpec) bool { + return sub == nil || equality.Semantic.DeepEqual(sub, &v1alpha1.SubscriberSpec{}) +} diff --git a/pkg/utils/resolve/subscriber_test.go b/pkg/utils/resolve/subscriber_test.go new file mode 100644 index 00000000000..4b9ee9c6912 --- /dev/null +++ b/pkg/utils/resolve/subscriber_test.go @@ -0,0 +1,254 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolve + +import ( + "context" + "fmt" + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" +) + +const ( + testNS = "test-namespace" +) + +var ( + dnsName = "dns-name" + + channelAddress = "test-channel.hostname" + channelURL = fmt.Sprintf("http://%s/", channelAddress) + + legacyCallableAddress = "legacy-callable.domain-internal" + legacyCallableURL = fmt.Sprintf("http://%s/", legacyCallableAddress) +) + +func init() { + // Add types to scheme + _ = eventingv1alpha1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestDomainToURL(t *testing.T) { + d := "default-broker.default.svc.cluster.local" + e := fmt.Sprintf("http://%s/", d) + if actual := DomainToURL(d); e != actual { + t.Fatalf("Unexpected domain. Expected '%v', actually '%v'", e, actual) + } +} + +func TestResourceInterface_BadDynamicInterface(t *testing.T) { + actual, err := ResourceInterface(&badDynamicInterface{}, testNS, &corev1.ObjectReference{}) + if err.Error() != "failed to create dynamic client resource" { + t.Fatalf("Unexpected error '%v'", err) + } + if actual != nil { + t.Fatalf("Unexpected actual. Expected nil. Actual '%v'", actual) + } +} + +type badDynamicInterface struct{} + +var _ dynamic.Interface = &badDynamicInterface{} + +func (badDynamicInterface) Resource(_ schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { + return nil +} + +func TestObjectReference_BadDynamicInterface(t *testing.T) { + actual, err := ObjectReference(context.TODO(), &badDynamicInterface{}, testNS, &corev1.ObjectReference{}) + if err.Error() != "failed to create dynamic client resource" { + t.Fatalf("Unexpected error '%v'", err) + } + if actual != nil { + t.Fatalf("Unexpected actual. Expected nil. Actual '%v'", actual) + } +} + +func TestSubscriberSpec(t *testing.T) { + testCases := map[string]struct { + Sub *v1alpha1.SubscriberSpec + Objects []runtime.Object + Expected string + ExpectedErr string + }{ + "nil": { + Sub: nil, + Expected: "", + }, + "empty": { + Sub: &v1alpha1.SubscriberSpec{}, + Expected: "", + }, + "DNS Name": { + Sub: &v1alpha1.SubscriberSpec{ + DNSName: &dnsName, + }, + Expected: dnsName, + }, + "Doesn't exist": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Service", + Name: "doesnt-exist", + }, + }, + ExpectedErr: "services \"doesnt-exist\" not found", + }, + "K8s Service": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Service", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + k8sService("does-exist"), + }, + Expected: fmt.Sprintf("http://does-exist.%s.svc.cluster.local/", testNS), + }, + "Addressable": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Channel", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + channel("does-exist"), + }, + Expected: channelURL, + }, + "Legacy Callable": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "LegacyCallable", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + legacyCallable("does-exist"), + }, + Expected: legacyCallableURL, + }, + "Non-Addressable": { + Sub: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "ConfigMap", + Name: "does-exist", + }, + }, + Objects: []runtime.Object{ + configMap("does-exist"), + }, + ExpectedErr: "status does not contain address", + }, + } + + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + dc := fake.NewSimpleDynamicClient(scheme.Scheme, tc.Objects...) + + actual, err := SubscriberSpec(context.TODO(), dc, testNS, tc.Sub) + if err != nil { + if tc.ExpectedErr == "" || tc.ExpectedErr != err.Error() { + t.Fatalf("Unexpected error. Expected '%s'. Actual '%s'.", tc.ExpectedErr, err.Error()) + } + } + if tc.Expected != actual { + t.Fatalf("Unexpected URL. Expected '%s'. Actual '%s'", tc.Expected, actual) + } + }) + } +} + +func k8sService(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Service", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + }, + } +} + +func channel(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "eventing.knative.dev/v1alpha1", + "kind": "Channel", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "hostname": channelAddress, + }, + }, + }, + } +} + +func legacyCallable(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "eventing.knative.dev/v1alpha1", + "kind": "LegacyCallable", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + "status": map[string]interface{}{ + "domainInternal": legacyCallableAddress, + }, + }, + } +} + +func configMap(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": name, + }, + }, + } +}