From 139284e9aa0edd0a23c328b29685fcc5fd3fc47f Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Tue, 23 Oct 2018 16:24:23 -0700 Subject: [PATCH 1/3] Change Channelable to match the definition in https://github.com/knative/eventing/pull/482. --- .../eventing/v1alpha1/channel_validation.go | 4 +- .../v1alpha1/channel_validation_test.go | 14 ++--- .../eventing/v1alpha1/subscription_types.go | 8 +-- .../inmemory/channel/reconcile_test.go | 22 ++++---- .../eventing/subscription/reconcile.go | 42 +++++++++------ .../eventing/subscription/reconcile_test.go | 28 +++++----- .../filesystem/filesystem_watcher_test.go | 47 ++++++++--------- pkg/sidecar/configmap/parse_test.go | 37 +++++++------- pkg/sidecar/configmap/watcher/watcher_test.go | 11 ++-- pkg/sidecar/fanout/fanout_handler.go | 2 +- pkg/sidecar/fanout/fanout_handler_test.go | 51 ++++++++++--------- .../multi_channel_fanout_handler_test.go | 31 +++++------ pkg/sidecar/multichannelfanout/parse_test.go | 41 ++++++++------- pkg/sidecar/swappable/swappable_test.go | 23 +++++---- .../apis/duck/v1alpha1/channelable_types.go | 4 +- 15 files changed, 192 insertions(+), 173 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/channel_validation.go b/pkg/apis/eventing/v1alpha1/channel_validation.go index 1a41996e3ae..0840dbec25c 100644 --- a/pkg/apis/eventing/v1alpha1/channel_validation.go +++ b/pkg/apis/eventing/v1alpha1/channel_validation.go @@ -36,8 +36,8 @@ func (cs *ChannelSpec) Validate() *apis.FieldError { if cs.Channelable != nil { for i, subscriber := range cs.Channelable.Subscribers { - if subscriber.SinkableDomain == "" && subscriber.CallableDomain == "" { - fe := apis.ErrMissingField("sinkableDomain", "callableDomain") + if subscriber.SinkableURI == "" && subscriber.CallableURI == "" { + fe := apis.ErrMissingField("sinkableURI", "callableURI") fe.Details = "expected at least one of, got none" errs = errs.Also(fe.ViaField(fmt.Sprintf("subscriber[%d]", i)).ViaField("channelable")) } diff --git a/pkg/apis/eventing/v1alpha1/channel_validation_test.go b/pkg/apis/eventing/v1alpha1/channel_validation_test.go index e72623cf818..c79ea80a82d 100644 --- a/pkg/apis/eventing/v1alpha1/channel_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/channel_validation_test.go @@ -58,8 +58,8 @@ func TestChannelValidation(t *testing.T) { }, Channelable: &duckv1alpha1.Channelable{ Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{ - CallableDomain: "callableendpoint", - SinkableDomain: "resultendpoint", + CallableURI: "callableendpoint", + SinkableURI: "resultendpoint", }}, }}, }, @@ -75,13 +75,13 @@ func TestChannelValidation(t *testing.T) { }, Channelable: &duckv1alpha1.Channelable{ Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{ - CallableDomain: "callableendpoint", - SinkableDomain: "callableendpoint", + CallableURI: "callableendpoint", + SinkableURI: "callableendpoint", }, {}}, }}, }, want: func() *apis.FieldError { - fe := apis.ErrMissingField("spec.channelable.subscriber[1].callableDomain", "spec.channelable.subscriber[1].sinkableDomain") + fe := apis.ErrMissingField("spec.channelable.subscriber[1].callableURI", "spec.channelable.subscriber[1].sinkableURI") fe.Details = "expected at least one of, got none" return fe }(), @@ -101,10 +101,10 @@ func TestChannelValidation(t *testing.T) { }, want: func() *apis.FieldError { var errs *apis.FieldError - fe := apis.ErrMissingField("spec.channelable.subscriber[0].callableDomain", "spec.channelable.subscriber[0].sinkableDomain") + fe := apis.ErrMissingField("spec.channelable.subscriber[0].callableURI", "spec.channelable.subscriber[0].sinkableURI") fe.Details = "expected at least one of, got none" errs = errs.Also(fe) - fe = apis.ErrMissingField("spec.channelable.subscriber[1].callableDomain", "spec.channelable.subscriber[1].sinkableDomain") + fe = apis.ErrMissingField("spec.channelable.subscriber[1].callableURI", "spec.channelable.subscriber[1].sinkableURI") fe.Details = "expected at least one of, got none" errs = errs.Also(fe) return errs diff --git a/pkg/apis/eventing/v1alpha1/subscription_types.go b/pkg/apis/eventing/v1alpha1/subscription_types.go index 4af64819a17..a79e2d0d4c3 100644 --- a/pkg/apis/eventing/v1alpha1/subscription_types.go +++ b/pkg/apis/eventing/v1alpha1/subscription_types.go @@ -185,11 +185,11 @@ type SubscriptionStatusPhysicalSubscription struct { // From is the object pointed to in status.from's Subscribable contract. From corev1.ObjectReference `json:"from,omitEmpty"` - // CallDomain is the fully resolved domain for spec.callable. - CallDomain string `json:"callDomain,omitEmpty"` + // CallURI is the fully resolved URI for spec.callable. + CallURI string `json:"callURI,omitEmpty"` - // ResultDomain is the fully resolved domain for the spec.result. - ResultDomain string `json:"resultDomain,omitEmpty"` + // ResultURI is the fully resolved URI for the spec.result. + ResultURI string `json:"resultURI,omitEmpty"` } const ( diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 13466bad4ae..74c0d63278b 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -75,14 +75,14 @@ var ( FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "foo", + CallableURI: "foo", }, { - SinkableDomain: "bar", + SinkableURI: "bar", }, { - CallableDomain: "baz", - SinkableDomain: "qux", + CallableURI: "baz", + SinkableURI: "qux", }, }, }, @@ -93,7 +93,7 @@ var ( FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "steve", + CallableURI: "steve", }, }, }, @@ -119,14 +119,14 @@ var ( Channelable: &duckv1alpha1.Channelable{ Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "foo", + CallableURI: "foo", }, { - SinkableDomain: "bar", + SinkableURI: "bar", }, { - CallableDomain: "baz", - SinkableDomain: "qux", + CallableURI: "baz", + SinkableURI: "qux", }, }, }, @@ -149,7 +149,7 @@ var ( Channelable: &duckv1alpha1.Channelable{ Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "anything", + CallableURI: "anything", }, }, }, @@ -172,7 +172,7 @@ var ( Channelable: &duckv1alpha1.Channelable{ Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "steve", + CallableURI: "steve", }, }, }, diff --git a/pkg/controller/eventing/subscription/reconcile.go b/pkg/controller/eventing/subscription/reconcile.go index 36b307e0094..8c7775fd185 100644 --- a/pkg/controller/eventing/subscription/reconcile.go +++ b/pkg/controller/eventing/subscription/reconcile.go @@ -19,6 +19,7 @@ package subscription import ( "context" "fmt" + "net/url" "github.com/golang/glog" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -100,33 +101,33 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { subscription.Status.PhysicalSubscription.From = from.Status.Subscribable.Channelable glog.Infof("Resolved from subscribable to: %+v", from.Status.Subscribable.Channelable) - callDomain := "" + callURI := "" if subscription.Spec.Call != nil { - callDomain, err = r.resolveEndpointSpec(subscription.Namespace, *subscription.Spec.Call) + callURI, err = r.resolveEndpointSpec(subscription.Namespace, *subscription.Spec.Call) if err != nil { glog.Warningf("Failed to resolve Call %+v : %s", *subscription.Spec.Call, err) return err } - if callDomain == "" { + if callURI == "" { return fmt.Errorf("could not get domain from call (is it not targetable?)") } - subscription.Status.PhysicalSubscription.CallDomain = callDomain - glog.Infof("Resolved call to: %q", callDomain) + subscription.Status.PhysicalSubscription.CallURI = callURI + glog.Infof("Resolved call to: %q", callURI) } - resultDomain := "" + resultURI := "" if subscription.Spec.Result != nil { - resultDomain, err = r.resolveResult(subscription.Namespace, *subscription.Spec.Result) + resultURI, err = r.resolveResult(subscription.Namespace, *subscription.Spec.Result) if err != nil { glog.Warningf("Failed to resolve Result %v : %v", subscription.Spec.Result, err) return err } - if resultDomain == "" { + if resultURI == "" { glog.Warningf("Failed to resolve result %v to actual domain", *subscription.Spec.Result) return err } - subscription.Status.PhysicalSubscription.ResultDomain = resultDomain - glog.Infof("Resolved result to: %q", resultDomain) + subscription.Status.PhysicalSubscription.ResultURI = resultURI + glog.Infof("Resolved result to: %q", resultURI) } // Everything that was supposed to be resolved was, so flip the status bit on that. @@ -186,7 +187,7 @@ func (r *reconciler) resolveEndpointSpec(namespace string, es v1alpha1.EndpointS glog.Warningf("Failed to fetch EndpointSpec target as a K8s Service %+v: %s", es.TargetRef, err) return "", err } - return controller.ServiceHostName(svc.Name, svc.Namespace), nil + return domainToURL(controller.ServiceHostName(svc.Name, svc.Namespace)), nil } obj, err := r.fetchObjectReference(namespace, es.TargetRef) @@ -202,7 +203,7 @@ func (r *reconciler) resolveEndpointSpec(namespace string, es v1alpha1.EndpointS } if t.Status.Targetable != nil { - return t.Status.Targetable.DomainInternal, nil + return domainToURL(t.Status.Targetable.DomainInternal), nil } return "", fmt.Errorf("status does not contain targetable") } @@ -221,7 +222,7 @@ func (r *reconciler) resolveResult(namespace string, resultStrategy v1alpha1.Res return "", err } if s.Status.Sinkable != nil { - return s.Status.Sinkable.DomainInternal, nil + return domainToURL(s.Status.Sinkable.DomainInternal), nil } return "", fmt.Errorf("status does not contain sinkable") } @@ -252,6 +253,15 @@ func (r *reconciler) fetchObjectReference(namespace string, ref *corev1.ObjectRe return resourceClient.Get(ref.Name, metav1.GetOptions{}) } +func domainToURL(domain string) string { + u := url.URL{ + Scheme: "http", + Host: domain, + Path: "/", + } + return u.String() +} + func (r *reconciler) syncPhysicalFromChannel(sub *v1alpha1.Subscription) error { glog.Infof("Reconciling Physical From Channel: %+v", sub) @@ -311,10 +321,10 @@ func (r *reconciler) listAllSubscriptionsWithPhysicalFrom(sub *v1alpha1.Subscrip func (r *reconciler) createChannelable(subs []v1alpha1.Subscription) *duckv1alpha1.Channelable { rv := &duckv1alpha1.Channelable{} for _, sub := range subs { - if sub.Status.PhysicalSubscription.CallDomain != "" || sub.Status.PhysicalSubscription.ResultDomain != "" { + if sub.Status.PhysicalSubscription.CallURI != "" || sub.Status.PhysicalSubscription.ResultURI != "" { rv.Subscribers = append(rv.Subscribers, duckv1alpha1.ChannelSubscriberSpec{ - CallableDomain: sub.Status.PhysicalSubscription.CallDomain, - SinkableDomain: sub.Status.PhysicalSubscription.ResultDomain, + CallableURI: sub.Status.PhysicalSubscription.CallURI, + SinkableURI: sub.Status.PhysicalSubscription.ResultURI, }) } } diff --git a/pkg/controller/eventing/subscription/reconcile_test.go b/pkg/controller/eventing/subscription/reconcile_test.go index 38042eab6d2..ad55b49d493 100644 --- a/pkg/controller/eventing/subscription/reconcile_test.go +++ b/pkg/controller/eventing/subscription/reconcile_test.go @@ -724,8 +724,8 @@ func getNewChannel(name string) *eventingv1alpha1.Channel { func rename(sub *eventingv1alpha1.Subscription) *eventingv1alpha1.Subscription { sub.Name = "renamed" sub.UID = "renamed-UID" - sub.Status.PhysicalSubscription.CallDomain = "" - sub.Status.PhysicalSubscription.ResultDomain = otherSinkableDNS + sub.Status.PhysicalSubscription.CallURI = "" + sub.Status.PhysicalSubscription.ResultURI = otherSinkableDNS return sub } @@ -822,15 +822,15 @@ func getNewSubscriptionWithUnknownConditionsAndPhysicalFrom() *eventingv1alpha1. func getNewSubscriptionWithUnknownConditionsAndPhysicalFromCall() *eventingv1alpha1.Subscription { s := getNewSubscriptionWithUnknownConditionsAndPhysicalFrom() - s.Status.PhysicalSubscription.CallDomain = targetDNS + s.Status.PhysicalSubscription.CallURI = domainToURL(targetDNS) return s } func getNewSubscriptionWithReferencesResolvedAndPhysicalFromCallResult() *eventingv1alpha1.Subscription { s := getNewSubscriptionWithUnknownConditionsAndPhysicalFrom() s.Status.MarkReferencesResolved() - s.Status.PhysicalSubscription.CallDomain = targetDNS - s.Status.PhysicalSubscription.ResultDomain = sinkableDNS + s.Status.PhysicalSubscription.CallURI = domainToURL(targetDNS) + s.Status.PhysicalSubscription.ResultURI = domainToURL(sinkableDNS) return s } @@ -839,9 +839,9 @@ func getNewSubscriptionToK8sServiceWithReferencesResolvedAndPhysicalFromCallResu s.Status.InitializeConditions() s.Status.MarkReferencesResolved() s.Status.PhysicalSubscription = eventingv1alpha1.SubscriptionStatusPhysicalSubscription{ - From: s.Spec.From, - CallDomain: k8sServiceDNS, - ResultDomain: sinkableDNS, + From: s.Spec.From, + CallURI: domainToURL(k8sServiceDNS), + ResultURI: domainToURL(sinkableDNS), } return s } @@ -856,8 +856,8 @@ func getNewSubscriptionWithSourceWithReferencesResolvedAndPhysicalFromCallResult Kind: channelKind, Name: fromChannelName, }, - CallDomain: targetDNS, - ResultDomain: sinkableDNS, + CallURI: domainToURL(targetDNS), + ResultURI: domainToURL(sinkableDNS), } return s } @@ -873,7 +873,7 @@ func getSubscriptionWithDifferentChannel() *eventingv1alpha1.Subscription { s.Name = "different-channel" s.UID = "different-channel-UID" s.Status.PhysicalSubscription.From.Name = "other-channel" - s.Status.PhysicalSubscription.CallDomain = "some-other-domain" + s.Status.PhysicalSubscription.CallURI = "some-other-domain" return s } @@ -915,11 +915,11 @@ func getChannelWithMultipleSubscriptions() *eventingv1alpha1.Channel { Channelable: &duckv1alpha1.Channelable{ Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: targetDNS, - SinkableDomain: sinkableDNS, + CallableURI: targetDNS, + SinkableURI: sinkableDNS, }, { - SinkableDomain: otherSinkableDNS, + SinkableURI: otherSinkableDNS, }, }, }, diff --git a/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go b/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go index 5b3894b1bc6..54504e8d94f 100644 --- a/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go +++ b/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go @@ -19,19 +19,20 @@ package filesystem import ( "errors" "fmt" - "github.com/google/go-cmp/cmp" - "github.com/knative/eventing/pkg/sidecar/configmap" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - "go.uber.org/zap" - "gopkg.in/yaml.v2" "io/ioutil" "os" "strings" "sync" "testing" "time" + + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + yaml "gopkg.in/yaml.v2" ) func TestReadConfigMap(t *testing.T) { @@ -82,20 +83,20 @@ func TestReadConfigMap(t *testing.T) { name: c1 fanoutConfig: subscriptions: - - callableDomain: event-changer.default.svc.cluster.local - sinkableDomain: message-dumper-bar.default.svc.cluster.local - - callableDomain: message-dumper-foo.default.svc.cluster.local - - sinkableDomain: message-dumper-bar.default.svc.cluster.local + - callableURI: event-changer.default.svc.cluster.local + sinkableURI: message-dumper-bar.default.svc.cluster.local + - callableURI: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-bar.default.svc.cluster.local - namespace: default name: c2 fanoutConfig: subscriptions: - - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-foo.default.svc.cluster.local - namespace: other name: c3 fanoutConfig: subscriptions: - - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-foo.default.svc.cluster.local `, expected: &multichannelfanout.Config{ ChannelConfigs: []multichannelfanout.ChannelConfig{ @@ -105,14 +106,14 @@ func TestReadConfigMap(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "event-changer.default.svc.cluster.local", - SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + CallableURI: "event-changer.default.svc.cluster.local", + SinkableURI: "message-dumper-bar.default.svc.cluster.local", }, { - CallableDomain: "message-dumper-foo.default.svc.cluster.local", + CallableURI: "message-dumper-foo.default.svc.cluster.local", }, { - SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + SinkableURI: "message-dumper-bar.default.svc.cluster.local", }, }, }, @@ -123,7 +124,7 @@ func TestReadConfigMap(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, }, }, @@ -134,7 +135,7 @@ func TestReadConfigMap(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, }, }, @@ -187,7 +188,7 @@ func TestWatch(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "foo.bar", + SinkableURI: "foo.bar", }, }, }, @@ -204,7 +205,7 @@ func TestWatch(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "foo.bar", + SinkableURI: "foo.bar", }, }, }, @@ -222,7 +223,7 @@ func TestWatch(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "foo.bar", + SinkableURI: "foo.bar", }, }, }, @@ -237,7 +238,7 @@ func TestWatch(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "baz.qux", + CallableURI: "baz.qux", }, }, }, diff --git a/pkg/sidecar/configmap/parse_test.go b/pkg/sidecar/configmap/parse_test.go index 5cddb280b89..0afd25df0c0 100644 --- a/pkg/sidecar/configmap/parse_test.go +++ b/pkg/sidecar/configmap/parse_test.go @@ -17,13 +17,14 @@ limitations under the License. package configmap import ( + "strings" + "testing" + "github.com/google/go-cmp/cmp" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" - "strings" - "testing" ) func TestNewFanoutConfig(t *testing.T) { @@ -64,20 +65,20 @@ func TestNewFanoutConfig(t *testing.T) { name: c1 fanoutConfig: subscriptions: - - callableDomain: event-changer.default.svc.cluster.local - sinkableDomain: message-dumper-bar.default.svc.cluster.local - - callableDomain: message-dumper-foo.default.svc.cluster.local - - sinkableDomain: message-dumper-bar.default.svc.cluster.local + - callableURI: event-changer.default.svc.cluster.local + sinkableURI: message-dumper-bar.default.svc.cluster.local + - callableURI: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-bar.default.svc.cluster.local - namespace: default name: c2 fanoutConfig: subscriptions: - - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-foo.default.svc.cluster.local - namespace: other name: c3 fanoutConfig: subscriptions: - - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-foo.default.svc.cluster.local `, expected: &multichannelfanout.Config{ ChannelConfigs: []multichannelfanout.ChannelConfig{ @@ -87,14 +88,14 @@ func TestNewFanoutConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "event-changer.default.svc.cluster.local", - SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + CallableURI: "event-changer.default.svc.cluster.local", + SinkableURI: "message-dumper-bar.default.svc.cluster.local", }, { - CallableDomain: "message-dumper-foo.default.svc.cluster.local", + CallableURI: "message-dumper-foo.default.svc.cluster.local", }, { - SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + SinkableURI: "message-dumper-bar.default.svc.cluster.local", }, }, }, @@ -105,7 +106,7 @@ func TestNewFanoutConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, }, }, @@ -116,7 +117,7 @@ func TestNewFanoutConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, }, }, @@ -158,14 +159,14 @@ func TestSerializeConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "foo.example.com", - SinkableDomain: "bar.example.com", + CallableURI: "foo.example.com", + SinkableURI: "bar.example.com", }, { - SinkableDomain: "qux.example.com", + SinkableURI: "qux.example.com", }, { - CallableDomain: "baz.example.com", + CallableURI: "baz.example.com", }, {}, }, diff --git a/pkg/sidecar/configmap/watcher/watcher_test.go b/pkg/sidecar/configmap/watcher/watcher_test.go index e6296e5f201..2144da97240 100644 --- a/pkg/sidecar/configmap/watcher/watcher_test.go +++ b/pkg/sidecar/configmap/watcher/watcher_test.go @@ -18,6 +18,8 @@ package watcher import ( "errors" + "testing" + "github.com/google/go-cmp/cmp" sidecarconfigmap "github.com/knative/eventing/pkg/sidecar/configmap" "github.com/knative/eventing/pkg/sidecar/fanout" @@ -27,7 +29,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" ) const ( @@ -66,8 +67,8 @@ func TestReconcile(t *testing.T) { namespace: bar fanoutConfig: subscriptions: - - callableDomain: callable - sinkableDomain: sinkable`, + - callableURI: callable + sinkableURI: sinkable`, }, expectedConfig: &multichannelfanout.Config{ ChannelConfigs: []multichannelfanout.ChannelConfig{ @@ -77,8 +78,8 @@ func TestReconcile(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "callable", - SinkableDomain: "sinkable", + CallableURI: "callable", + SinkableURI: "sinkable", }, }, }, diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index 065a10d64f0..0324a4ae283 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -120,5 +120,5 @@ func (f *Handler) dispatch(msg *buses.Message) error { // makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and // the `sink` portions of the subscription. func (f *Handler) makeFanoutRequest(m buses.Message, sub duckv1alpha1.ChannelSubscriberSpec) error { - return f.dispatcher.DispatchMessage(&m, sub.CallableDomain, sub.SinkableDomain, buses.DispatchDefaults{}) + return f.dispatcher.DispatchMessage(&m, sub.CallableURI, sub.SinkableURI, buses.DispatchDefaults{}) } diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go index 9bdf020e544..563f9726058 100644 --- a/pkg/sidecar/fanout/fanout_handler_test.go +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -18,10 +18,6 @@ package fanout import ( "errors" - "github.com/knative/eventing/pkg/buses" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - "go.uber.org/atomic" - "go.uber.org/zap" "io" "io/ioutil" "net/http" @@ -29,6 +25,11 @@ import ( "strings" "testing" "time" + + "github.com/knative/eventing/pkg/buses" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/atomic" + "go.uber.org/zap" ) // Domains used in subscriptions, which will be replaced by the real domains of the started HTTP @@ -74,7 +75,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { timeout: time.Millisecond, subs: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceCallable, + CallableURI: replaceCallable, }, }, callable: func(writer http.ResponseWriter, _ *http.Request) { @@ -96,7 +97,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { "sinkable fails": { subs: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: replaceSinkable, + SinkableURI: replaceSinkable, }, }, sinkable: func(writer http.ResponseWriter, _ *http.Request) { @@ -107,7 +108,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { "callable fails": { subs: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceCallable, + CallableURI: replaceCallable, }, }, callable: func(writer http.ResponseWriter, _ *http.Request) { @@ -118,8 +119,8 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { "callable succeeds, sinkable fails": { subs: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceCallable, - SinkableDomain: replaceSinkable, + CallableURI: replaceCallable, + SinkableURI: replaceSinkable, }, }, callable: callableSucceed, @@ -131,8 +132,8 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { "one sub succeeds": { subs: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceCallable, - SinkableDomain: replaceSinkable, + CallableURI: replaceCallable, + SinkableURI: replaceSinkable, }, }, callable: callableSucceed, @@ -144,12 +145,12 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { "one sub succeeds, one sub fails": { subs: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceCallable, - SinkableDomain: replaceSinkable, + CallableURI: replaceCallable, + SinkableURI: replaceSinkable, }, { - CallableDomain: replaceCallable, - SinkableDomain: replaceSinkable, + CallableURI: replaceCallable, + SinkableURI: replaceSinkable, }, }, callable: callableSucceed, @@ -159,16 +160,16 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { "all subs succeed": { subs: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceCallable, - SinkableDomain: replaceSinkable, + CallableURI: replaceCallable, + SinkableURI: replaceSinkable, }, { - CallableDomain: replaceCallable, - SinkableDomain: replaceSinkable, + CallableURI: replaceCallable, + SinkableURI: replaceSinkable, }, { - CallableDomain: replaceCallable, - SinkableDomain: replaceSinkable, + CallableURI: replaceCallable, + SinkableURI: replaceSinkable, }, }, callable: callableSucceed, @@ -195,11 +196,11 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { // Rewrite the subs to use the servers we just started. subs := make([]duckv1alpha1.ChannelSubscriberSpec, 0) for _, sub := range tc.subs { - if sub.CallableDomain == replaceCallable { - sub.CallableDomain = callableServer.URL[7:] // strip the leading 'http://' + if sub.CallableURI == replaceCallable { + sub.CallableURI = callableServer.URL[7:] // strip the leading 'http://' } - if sub.SinkableDomain == replaceSinkable { - sub.SinkableDomain = sinkableServer.URL[7:] // strip the leading 'http://' + if sub.SinkableURI == replaceSinkable { + sub.SinkableURI = sinkableServer.URL[7:] // strip the leading 'http://' } subs = append(subs, sub) } diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go index 3bc5e6f0faf..e8d4c8418e7 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go @@ -18,14 +18,15 @@ package multichannelfanout import ( "fmt" - "github.com/google/go-cmp/cmp" - "github.com/knative/eventing/pkg/sidecar/fanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - "go.uber.org/zap" "net/http" "net/http/httptest" "strings" "testing" + + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/sidecar/fanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" ) const ( @@ -110,7 +111,7 @@ func TestCopyWithNewConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "callabledomain", + CallableURI: "callabledomain", }, }, }, @@ -125,7 +126,7 @@ func TestCopyWithNewConfig(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "sinkabledomain", + SinkableURI: "sinkabledomain", }, }, }, @@ -163,7 +164,7 @@ func TestConfigDiff(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "callabledomain", + CallableURI: "callabledomain", }, }, }, @@ -193,7 +194,7 @@ func TestConfigDiff(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "different", + CallableURI: "different", }, }, }, @@ -240,7 +241,7 @@ func TestServeHTTP(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: replaceDomain, + SinkableURI: replaceDomain, }, }, }, @@ -260,7 +261,7 @@ func TestServeHTTP(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "first-to-domain", + SinkableURI: "first-to-domain", }, }, }, @@ -271,7 +272,7 @@ func TestServeHTTP(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceDomain, + CallableURI: replaceDomain, }, }, }, @@ -317,11 +318,11 @@ func TestServeHTTP(t *testing.T) { func replaceDomains(config Config, replacement string) { for i, cc := range config.ChannelConfigs { for j, sub := range cc.FanoutConfig.Subscriptions { - if sub.CallableDomain == replaceDomain { - sub.CallableDomain = replacement + if sub.CallableURI == replaceDomain { + sub.CallableURI = replacement } - if sub.SinkableDomain == replaceDomain { - sub.SinkableDomain = replacement + if sub.SinkableURI == replaceDomain { + sub.SinkableURI = replacement } cc.FanoutConfig.Subscriptions[j] = sub } diff --git a/pkg/sidecar/multichannelfanout/parse_test.go b/pkg/sidecar/multichannelfanout/parse_test.go index 568ac7d3391..d0aed807667 100644 --- a/pkg/sidecar/multichannelfanout/parse_test.go +++ b/pkg/sidecar/multichannelfanout/parse_test.go @@ -17,12 +17,13 @@ limitations under the License. package multichannelfanout import ( + "strings" + "testing" + "github.com/google/go-cmp/cmp" "github.com/knative/eventing/pkg/sidecar/fanout" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" - "strings" - "testing" ) func TestConfigMapData(t *testing.T) { @@ -33,11 +34,11 @@ func TestConfigMapData(t *testing.T) { expectedErr bool }{ { - name: "no data", - expected: &Config {}, + name: "no data", + expected: &Config{}, }, { - name: "invalid YAML", + name: "invalid YAML", config: ` key: - value @@ -56,27 +57,27 @@ func TestConfigMapData(t *testing.T) { expectedErr: true, }, { - name: "valid", + name: "valid", config: ` channelConfigs: - namespace: default name: c1 fanoutConfig: subscriptions: - - callableDomain: event-changer.default.svc.cluster.local - sinkableDomain: message-dumper-bar.default.svc.cluster.local - - callableDomain: message-dumper-foo.default.svc.cluster.local - - sinkableDomain: message-dumper-bar.default.svc.cluster.local + - callableURI: event-changer.default.svc.cluster.local + sinkableURI: message-dumper-bar.default.svc.cluster.local + - callableURI: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-bar.default.svc.cluster.local - namespace: default name: c2 fanoutConfig: subscriptions: - - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-foo.default.svc.cluster.local - namespace: other name: c3 fanoutConfig: subscriptions: - - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableURI: message-dumper-foo.default.svc.cluster.local `, expected: &Config{ ChannelConfigs: []ChannelConfig{ @@ -86,14 +87,14 @@ func TestConfigMapData(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: "event-changer.default.svc.cluster.local", - SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + CallableURI: "event-changer.default.svc.cluster.local", + SinkableURI: "message-dumper-bar.default.svc.cluster.local", }, { - CallableDomain: "message-dumper-foo.default.svc.cluster.local", + CallableURI: "message-dumper-foo.default.svc.cluster.local", }, { - SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + SinkableURI: "message-dumper-bar.default.svc.cluster.local", }, }, }, @@ -104,7 +105,7 @@ func TestConfigMapData(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, }, }, @@ -115,7 +116,7 @@ func TestConfigMapData(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, }, }, @@ -126,7 +127,9 @@ func TestConfigMapData(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - if tc.name != "valid" { return } + if tc.name != "valid" { + return + } c, e := Parse(zap.NewNop(), formatData(tc.config)) if tc.expectedErr { if e == nil { diff --git a/pkg/sidecar/swappable/swappable_test.go b/pkg/sidecar/swappable/swappable_test.go index 2c663bf800b..92218d82336 100644 --- a/pkg/sidecar/swappable/swappable_test.go +++ b/pkg/sidecar/swappable/swappable_test.go @@ -18,14 +18,15 @@ package swappable import ( "fmt" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - "go.uber.org/zap" "net/http" "net/http/httptest" "strings" "testing" + + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" ) const ( @@ -48,7 +49,7 @@ func TestHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceDomain, + CallableURI: replaceDomain, }, }, }, @@ -63,7 +64,7 @@ func TestHandler(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - SinkableDomain: replaceDomain, + SinkableURI: replaceDomain, }, }, }, @@ -100,7 +101,7 @@ func TestHandler_InvalidConfigChange(t *testing.T) { FanoutConfig: fanout.Config{ Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ { - CallableDomain: replaceDomain, + CallableURI: replaceDomain, }, }, }, @@ -203,11 +204,11 @@ func makeRequest(namespace, name string) *http.Request { func replaceDomains(c multichannelfanout.Config, replacement string) multichannelfanout.Config { for i, cc := range c.ChannelConfigs { for j, sub := range cc.FanoutConfig.Subscriptions { - if sub.SinkableDomain == replaceDomain { - sub.SinkableDomain = replacement + if sub.SinkableURI == replaceDomain { + sub.SinkableURI = replacement } - if sub.CallableDomain == replaceDomain { - sub.CallableDomain = replacement + if sub.CallableURI == replaceDomain { + sub.CallableURI = replacement } cc.FanoutConfig.Subscriptions[j] = sub } diff --git a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go index 7e7ffd9ada1..adbc7858e67 100644 --- a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go +++ b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go @@ -38,9 +38,9 @@ type Channelable struct { // One of them must be present type ChannelSubscriberSpec struct { // +optional - CallableDomain string `json:"callableDomain,omitempty"` + CallableURI string `json:"callableURI,omitempty"` // +optional - SinkableDomain string `json:"sinkableDomain,omitempty"` + SinkableURI string `json:"sinkableURI,omitempty"` } From 353f81832eeab17aa5ddeae94e4859ff3f822b0e Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Wed, 24 Oct 2018 10:17:48 -0700 Subject: [PATCH 2/3] No longer use the subscribable interface. --- pkg/apis/eventing/v1alpha1/channel_types.go | 33 +- .../eventing/v1alpha1/channel_types_test.go | 87 ------ pkg/apis/eventing/v1alpha1/implements_test.go | 2 - pkg/apis/eventing/v1alpha1/source_types.go | 6 - .../subscribable_channelable_validation.go | 4 +- .../eventing/v1alpha1/subscription_types.go | 9 - .../v1alpha1/subscription_validation.go | 4 +- .../v1alpha1/zz_generated.deepcopy.go | 4 - .../eventing/inmemory/channel/reconcile.go | 1 - .../inmemory/channel/reconcile_test.go | 20 +- .../eventing/subscription/reconcile.go | 34 +-- .../eventing/subscription/reconcile_test.go | 283 +++++++----------- 12 files changed, 126 insertions(+), 361 deletions(-) diff --git a/pkg/apis/eventing/v1alpha1/channel_types.go b/pkg/apis/eventing/v1alpha1/channel_types.go index d5122b08344..697dda0fb79 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types.go +++ b/pkg/apis/eventing/v1alpha1/channel_types.go @@ -20,7 +20,6 @@ import ( "github.com/knative/pkg/apis" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/webhook" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -29,8 +28,8 @@ import ( // +genclient:noStatus // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object -// Channel is an abstract resource that implements the Subscribable and Sinkable -// contracts. The Provisioner provisions infrastructure to accepts events and +// Channel is an abstract resource that implements the Sinkable contract. +// The Provisioner provisions infrastructure to accepts events and // deliver to Subscriptions. type Channel struct { metav1.TypeMeta `json:",inline"` @@ -76,7 +75,7 @@ type ChannelSpec struct { Channelable *duckv1alpha1.Channelable `json:"channelable,omitempty"` } -var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionSinkable, ChannelConditionSubscribable) +var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionSinkable) // ChannelStatus represents the current state of a Channel. type ChannelStatus struct { @@ -93,9 +92,6 @@ type ChannelStatus struct { // It generally has the form {channel}.{namespace}.svc.cluster.local Sinkable duckv1alpha1.Sinkable `json:"sinkable,omitempty"` - // Channel is Subscribable. It just points to itself - Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"` - // Represents the latest available observations of a channel's current state. // +optional // +patchMergeKey=type @@ -115,10 +111,6 @@ const ( // ChannelConditionSinkable has status true when this Channel meets the Sinkable contract and // has a non-empty domainInternal. ChannelConditionSinkable duckv1alpha1.ConditionType = "Sinkable" - - // ChannelConditionSubscribable has status true when this Channel meets the Subscribable - // contract and has a non-empty Channelable object reference. - ChannelConditionSubscribable duckv1alpha1.ConditionType = "Subscribable" ) // GetCondition returns the condition currently associated with the given type, or nil. @@ -141,25 +133,6 @@ func (cs *ChannelStatus) MarkProvisioned() { chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisioned) } -// SetSubscribable makes this Channel Subscribable, by having it point at itself. The 'name' and -// 'namespace' should be the name and namespace of the Channel this ChannelStatus is on. It also -// sets the ChannelConditionSubscribable to true. -func (cs *ChannelStatus) SetSubscribable(namespace, name string) { - if namespace != "" || name != "" { - cs.Subscribable.Channelable = corev1.ObjectReference{ - Kind: "Channel", - APIVersion: SchemeGroupVersion.String(), - Namespace: namespace, - Name: name, - } - chanCondSet.Manage(cs).MarkTrue(ChannelConditionSubscribable) - } else { - cs.Subscribable.Channelable = corev1.ObjectReference{} - chanCondSet.Manage(cs).MarkFalse(ChannelConditionSubscribable, "notSubscribable", "not Subscribable") - } - -} - // SetSinkable makes this Channel sinkable by setting the domainInternal. It also sets the // ChannelConditionSinkable to true. func (cs *ChannelStatus) SetSinkable(domainInternal string) { diff --git a/pkg/apis/eventing/v1alpha1/channel_types_test.go b/pkg/apis/eventing/v1alpha1/channel_types_test.go index e2e6cac4104..3cdee5c281d 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types_test.go +++ b/pkg/apis/eventing/v1alpha1/channel_types_test.go @@ -102,9 +102,6 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionSinkable, Status: corev1.ConditionUnknown, - }, { - Type: ChannelConditionSubscribable, - Status: corev1.ConditionUnknown, }}, }, }, { @@ -125,9 +122,6 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionSinkable, Status: corev1.ConditionUnknown, - }, { - Type: ChannelConditionSubscribable, - Status: corev1.ConditionUnknown, }}, }, }, { @@ -148,9 +142,6 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionSinkable, Status: corev1.ConditionUnknown, - }, { - Type: ChannelConditionSubscribable, - Status: corev1.ConditionUnknown, }}}, }, } @@ -170,19 +161,16 @@ func TestChannelIsReady(t *testing.T) { tests := []struct { name string markProvisioned bool - setSubscribable bool setSinkable bool wantReady bool }{{ name: "all happy", markProvisioned: true, - setSubscribable: true, setSinkable: true, wantReady: true, }, { name: "one sad", markProvisioned: false, - setSubscribable: true, setSinkable: true, wantReady: false, }} @@ -192,9 +180,6 @@ func TestChannelIsReady(t *testing.T) { if test.markProvisioned { cs.MarkProvisioned() } - if test.setSubscribable { - cs.SetSubscribable("foo", "bar") - } if test.setSinkable { cs.SetSinkable("foo.bar") } @@ -206,78 +191,6 @@ func TestChannelIsReady(t *testing.T) { } } -func TestChannelStatus_SetSubscribable(t *testing.T) { - testCases := map[string]struct { - namespace string - name string - want *ChannelStatus - }{ - "empty namespace and name": { - want: &ChannelStatus{ - Conditions: []duckv1alpha1.Condition{ - // Note that Ready is here because when the condition is marked False, duck - // automatically sets Ready to false. - { - Type: ChannelConditionReady, - Status: corev1.ConditionFalse, - }, - { - Type: ChannelConditionSubscribable, - Status: corev1.ConditionFalse, - }, - }, - }, - }, - "empty namespace": { - name: "foobar", - want: &ChannelStatus{ - Subscribable: duckv1alpha1.Subscribable{ - Channelable: corev1.ObjectReference{ - APIVersion: SchemeGroupVersion.String(), - Kind: "Channel", - Name: "foobar", - }, - }, - Conditions: []duckv1alpha1.Condition{ - { - Type: ChannelConditionSubscribable, - Status: corev1.ConditionTrue, - }, - }, - }, - }, - "subscribable": { - namespace: "test-namespace", - name: "test-name", - want: &ChannelStatus{ - Subscribable: duckv1alpha1.Subscribable{ - Channelable: corev1.ObjectReference{ - APIVersion: SchemeGroupVersion.String(), - Kind: "Channel", - Namespace: "test-namespace", - Name: "test-name", - }, - }, - Conditions: []duckv1alpha1.Condition{ - { - Type: ChannelConditionSubscribable, - Status: corev1.ConditionTrue, - }, - }, - }, - }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - cs := &ChannelStatus{} - cs.SetSubscribable(tc.namespace, tc.name) - if diff := cmp.Diff(tc.want, cs, ignoreTransitionTimeMessageAndReason); diff != "" { - t.Errorf("unexpected conditions (-want, +got) = %v", diff) - } - }) - } -} - func TestChannelStatus_SetSinkable(t *testing.T) { testCases := map[string]struct { domainInternal string diff --git a/pkg/apis/eventing/v1alpha1/implements_test.go b/pkg/apis/eventing/v1alpha1/implements_test.go index 7b40b2d9af3..1e0198fe50c 100644 --- a/pkg/apis/eventing/v1alpha1/implements_test.go +++ b/pkg/apis/eventing/v1alpha1/implements_test.go @@ -28,14 +28,12 @@ func TestTypesImplements(t *testing.T) { // Channel {instance: &Channel{}, iface: &duckv1alpha1.Conditions{}}, {instance: &Channel{}, iface: &duckv1alpha1.Channelable{}}, - {instance: &Channel{}, iface: &duckv1alpha1.Subscribable{}}, {instance: &Channel{}, iface: &duckv1alpha1.Sinkable{}}, // ClusterProvisioner {instance: &ClusterProvisioner{}, iface: &duckv1alpha1.Conditions{}}, // Subscription {instance: &Subscription{}, iface: &duckv1alpha1.Conditions{}}, {instance: &Subscription{}, iface: &emptyGen}, - {instance: &Subscription{}, iface: &duckv1alpha1.Subscribable{}}, } for _, tc := range testCases { if err := duck.VerifyType(tc.instance, tc.iface); err != nil { diff --git a/pkg/apis/eventing/v1alpha1/source_types.go b/pkg/apis/eventing/v1alpha1/source_types.go index 5b58b416b95..2f7899c44d1 100644 --- a/pkg/apis/eventing/v1alpha1/source_types.go +++ b/pkg/apis/eventing/v1alpha1/source_types.go @@ -56,9 +56,6 @@ var _ = duck.VerifyType(&Source{}, &duckv1alpha1.Conditions{}) var emptyGenSource duckv1alpha1.Generation var _ = duck.VerifyType(&Source{}, &emptyGenSource) -// And it's Subscribable -var _ = duck.VerifyType(&Subscription{}, &duckv1alpha1.Subscribable{}) - // SourceSpec is the spec for a Source resource. type SourceSpec struct { // TODO: Generation does not work correctly with CRD. They are scrubbed @@ -115,9 +112,6 @@ type SourceStatus struct { // was last reconciled by the controller. // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` - - // Source might be Subscribable. This points to the Channelable object. - Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"` } // GetCondition returns the condition currently associated with the given type, or nil. diff --git a/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go b/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go index 7918fc3d481..452ab0dcbad 100644 --- a/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go +++ b/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/equality" ) -func isSubscribableEmpty(f corev1.ObjectReference) bool { +func isChannelEmpty(f corev1.ObjectReference) bool { return equality.Semantic.DeepEqual(f, corev1.ObjectReference{}) } @@ -33,7 +33,7 @@ func isSubscribableEmpty(f corev1.ObjectReference) bool { // - Kind == 'Channel' // - APIVersion == 'eventing.knative.dev/v1alpha1' // - Name == not empty -func isValidSubscribable(f corev1.ObjectReference) *apis.FieldError { +func isValidChannel(f corev1.ObjectReference) *apis.FieldError { errs := isValidObjectReference(f) if f.Kind != "Channel" { diff --git a/pkg/apis/eventing/v1alpha1/subscription_types.go b/pkg/apis/eventing/v1alpha1/subscription_types.go index a79e2d0d4c3..226707d5f38 100644 --- a/pkg/apis/eventing/v1alpha1/subscription_types.go +++ b/pkg/apis/eventing/v1alpha1/subscription_types.go @@ -71,8 +71,6 @@ type SubscriptionSpec struct { // for receiving events. The object must have spec.subscriptions // list which will then be modified accordingly. // - // This object must fulfill the Subscribable contract. - // // You can specify only the following fields of the ObjectReference: // - Kind // - APIVersion @@ -171,10 +169,6 @@ type SubscriptionStatus struct { // +patchStrategy=merge Conditions duckv1alpha1.Conditions `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` - // Subscription might be Subscribable. This depends if there's a Result channel - // In that case, this points to that resource. - Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"` - // PhysicalSubscription is the fully resolved values that this Subscription represents. PhysicalSubscription SubscriptionStatusPhysicalSubscription `json:"physicalSubscription,omitEmpty"` } @@ -182,9 +176,6 @@ type SubscriptionStatus struct { // SubscriptionStatusPhysicalSubscription represents the fully resolved values for this // Subscription. type SubscriptionStatusPhysicalSubscription struct { - // From is the object pointed to in status.from's Subscribable contract. - From corev1.ObjectReference `json:"from,omitEmpty"` - // CallURI is the fully resolved URI for spec.callable. CallURI string `json:"callURI,omitEmpty"` diff --git a/pkg/apis/eventing/v1alpha1/subscription_validation.go b/pkg/apis/eventing/v1alpha1/subscription_validation.go index ddbe9468f01..20d17797005 100644 --- a/pkg/apis/eventing/v1alpha1/subscription_validation.go +++ b/pkg/apis/eventing/v1alpha1/subscription_validation.go @@ -86,7 +86,7 @@ func isValidEndpointSpec(e EndpointSpec) *apis.FieldError { } func isFromEmpty(f corev1.ObjectReference) bool { - return isSubscribableEmpty(f) + return isChannelEmpty(f) } // Valid from only contains the following fields: @@ -94,7 +94,7 @@ func isFromEmpty(f corev1.ObjectReference) bool { // - APIVersion == 'eventing.knative.dev/v1alpha1' // - Name == not empty func isValidFrom(f corev1.ObjectReference) *apis.FieldError { - return isValidSubscribable(f) + return isValidChannel(f) } func isResultStrategyNilOrEmpty(r *ResultStrategy) bool { diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index ddde0c1d91b..76d9565a456 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -134,7 +134,6 @@ func (in *ChannelSpec) DeepCopy() *ChannelSpec { func (in *ChannelStatus) DeepCopyInto(out *ChannelStatus) { *out = *in out.Sinkable = in.Sinkable - out.Subscribable = in.Subscribable if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make(duck_v1alpha1.Conditions, len(*in)) @@ -454,7 +453,6 @@ func (in *SourceStatus) DeepCopyInto(out *SourceStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } - out.Subscribable = in.Subscribable return } @@ -574,7 +572,6 @@ func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } - out.Subscribable = in.Subscribable out.PhysicalSubscription = in.PhysicalSubscription return } @@ -592,7 +589,6 @@ func (in *SubscriptionStatus) DeepCopy() *SubscriptionStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SubscriptionStatusPhysicalSubscription) DeepCopyInto(out *SubscriptionStatusPhysicalSubscription) { *out = *in - out.From = in.From return } diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 439361e8978..704f8c99330 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -141,7 +141,6 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) } r.addFinalizer(c) - c.Status.SetSubscribable(c.Namespace, c.Name) if svc, err := r.createK8sService(ctx, c); err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 74c0d63278b..ab746136ef1 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -314,7 +314,7 @@ func TestReconcile(t *testing.T) { MockGets: errorGettingK8sService(), }, WantPresent: []runtime.Object{ - makeChannelWithFinalizerAndSubscribable(), + makeChannelWithFinalizer(), }, WantErrMsg: testErrorMessage, }, @@ -329,7 +329,7 @@ func TestReconcile(t *testing.T) { }, WantPresent: []runtime.Object{ // TODO: This should have a useful error message saying that the K8s Service failed. - makeChannelWithFinalizerAndSubscribable(), + makeChannelWithFinalizer(), }, WantErrMsg: testErrorMessage, }, @@ -358,7 +358,7 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ // TODO: This should have a useful error message saying that the VirtualService // failed. - makeChannelWithFinalizerAndSubscribableAndSinkable(), + makeChannelWithFinalizerAndSinkable(), }, WantErrMsg: testErrorMessage, }, @@ -375,7 +375,7 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ // TODO: This should have a useful error message saying that the VirtualService // failed. - makeChannelWithFinalizerAndSubscribableAndSinkable(), + makeChannelWithFinalizerAndSinkable(), }, WantErrMsg: testErrorMessage, }, @@ -483,21 +483,15 @@ func makeChannel() *eventingv1alpha1.Channel { return c } -func makeChannelWithFinalizerAndSubscribable() *eventingv1alpha1.Channel { +func makeChannelWithFinalizerAndSinkable() *eventingv1alpha1.Channel { c := makeChannelWithFinalizer() - c.Status.SetSubscribable(c.Namespace, c.Name) - return c -} - -func makeChannelWithFinalizerAndSubscribableAndSinkable() *eventingv1alpha1.Channel { - c := makeChannelWithFinalizerAndSubscribable() c.Status.SetSinkable(fmt.Sprintf("%s-channel.%s.svc.cluster.local", c.Name, c.Namespace)) return c } func makeReadyChannel() *eventingv1alpha1.Channel { - // Ready channels have the finalizer and are Subscribable and Sinkable. - c := makeChannelWithFinalizerAndSubscribableAndSinkable() + // Ready channels have the finalizer and are Sinkable. + c := makeChannelWithFinalizerAndSinkable() c.Status.MarkProvisioned() return c } diff --git a/pkg/controller/eventing/subscription/reconcile.go b/pkg/controller/eventing/subscription/reconcile.go index 8c7775fd185..4d4f91f886b 100644 --- a/pkg/controller/eventing/subscription/reconcile.go +++ b/pkg/controller/eventing/subscription/reconcile.go @@ -87,19 +87,12 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error { deletionTimestamp := accessor.GetDeletionTimestamp() glog.Infof("DeletionTimestamp: %v", deletionTimestamp) - // Reconcile the subscription to the From channel that's consuming events that are either - // going to the call or if there's no call, directly to result. - from, err := r.resolveFromChannelable(subscription.Namespace, &subscription.Spec.From) + // Verify that `from` exists. + _, err = r.fetchObjectReference(subscription.Namespace, &subscription.Spec.From) if err != nil { - glog.Warningf("Failed to resolve From %+v : %s", subscription.Spec.From, err) + glog.Warningf("Failed to validate `from` exists: %+v, %v", subscription.Spec.From, err) return err } - if from.Status.Subscribable == nil { - return fmt.Errorf("from is not subscribable %s %s/%s", subscription.Spec.From.Kind, subscription.Namespace, subscription.Spec.From.Name) - } - - subscription.Status.PhysicalSubscription.From = from.Status.Subscribable.Channelable - glog.Infof("Resolved from subscribable to: %+v", from.Status.Subscribable.Channelable) callURI := "" if subscription.Spec.Call != nil { @@ -227,19 +220,16 @@ func (r *reconciler) resolveResult(namespace string, resultStrategy v1alpha1.Res return "", fmt.Errorf("status does not contain sinkable") } -// resolveFromChannelable fetches an object based on ObjectReference. It assumes that the -// fetched object then implements Subscribable interface and returns the ObjectReference -// representing the Channelable interface. -func (r *reconciler) resolveFromChannelable(namespace string, ref *corev1.ObjectReference) (*duckv1alpha1.Subscription, error) { - obj, err := r.fetchObjectReference(namespace, ref) +// validateFrom gets `from` and verifies that it is a Channelable (in fact it should always be a +// Channel). +func (r *reconciler) validateFrom(namespace string, from *corev1.ObjectReference) error { + obj, err := r.fetchObjectReference(namespace, from) if err != nil { - glog.Warningf("Failed to fetch From target %+v: %s", ref, err) - return nil, err + return err } - - c := duckv1alpha1.Subscription{} + c := duckv1alpha1.Channel{} err = duck.FromUnstructured(obj, &c) - return &c, err + return err } // fetchObjectReference fetches an object based on ObjectReference. @@ -273,7 +263,7 @@ func (r *reconciler) syncPhysicalFromChannel(sub *v1alpha1.Subscription) error { channelable := r.createChannelable(subs) - return r.patchPhysicalFrom(sub.Namespace, sub.Status.PhysicalSubscription.From, channelable) + return r.patchPhysicalFrom(sub.Namespace, sub.Spec.From, channelable) } func (r *reconciler) listAllSubscriptionsWithPhysicalFrom(sub *v1alpha1.Subscription) ([]v1alpha1.Subscription, error) { @@ -306,7 +296,7 @@ func (r *reconciler) listAllSubscriptionsWithPhysicalFrom(sub *v1alpha1.Subscrip // This is the sub that is being reconciled. It has already been added to the list. continue } - if equality.Semantic.DeepEqual(sub.Status.PhysicalSubscription.From, s.Status.PhysicalSubscription.From) { + if equality.Semantic.DeepEqual(sub.Spec.From, s.Spec.From) { subs = append(subs, s) } } diff --git a/pkg/controller/eventing/subscription/reconcile_test.go b/pkg/controller/eventing/subscription/reconcile_test.go index ad55b49d493..43793edc044 100644 --- a/pkg/controller/eventing/subscription/reconcile_test.go +++ b/pkg/controller/eventing/subscription/reconcile_test.go @@ -72,35 +72,64 @@ var testCases = []controllertesting.TestCase{ }, WantErrMsg: `channels.eventing.knative.dev "fromchannel" not found`, }, { - Name: "subscription, but From is not subscribable", + Name: "subscription, but From is not channelable", InitialState: []runtime.Object{ - getNewSubscription(), + getNewSourceSubscription(), }, - WantErrMsg: "from is not subscribable Channel testnamespace/fromchannel", + // TODO: JSON patch is not working on the fake, see + // https://github.com/kubernetes/client-go/issues/478. Marking this as expecting a specific + // failure for now, until upstream is fixed. It should actually fail saying that there is no + // Spec.Subscribers field. + WantErrMsg: "invalid JSON document", Scheme: scheme.Scheme, Objects: []runtime.Object{ // Source channel + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), + "kind": sourceKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, + }, + "spec": map[string]interface{}{}, + }, + }, + // Call (using knative route) + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "serving.knative.dev/v1alpha1", + "kind": routeKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": routeName, + }, + "status": map[string]interface{}{ + "targetable": map[string]interface{}{ + "domainInternal": targetDNS, + }, + }, + }, + }, + // Result channel &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), "kind": channelKind, "metadata": map[string]interface{}{ "namespace": testNS, - "name": fromChannelName, + "name": resultChannelName, }, "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, "status": map[string]interface{}{ - "notsubscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, + "sinkable": map[string]interface{}{ + "domainInternal": sinkableDNS, }, }, - }}, + }, + }, }, }, { Name: "Valid from, call does not exist", @@ -109,7 +138,7 @@ var testCases = []controllertesting.TestCase{ }, WantErrMsg: `routes.serving.knative.dev "callroute" not found`, WantPresent: []runtime.Object{ - getNewSubscriptionWithUnknownConditionsAndPhysicalFrom(), + getNewSubscriptionWithUnknownConditions(), }, Scheme: scheme.Scheme, Objects: []runtime.Object{ @@ -125,16 +154,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, }, }, { Name: "Valid from, call is not targetable", @@ -142,7 +163,7 @@ var testCases = []controllertesting.TestCase{ getNewSubscription(), }, WantPresent: []runtime.Object{ - getNewSubscriptionWithUnknownConditionsAndPhysicalFrom(), + getNewSubscriptionWithUnknownConditions(), }, WantErrMsg: "status does not contain targetable", Scheme: scheme.Scheme, @@ -159,16 +180,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Call (using knative route) &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -181,7 +194,8 @@ var testCases = []controllertesting.TestCase{ "status": map[string]interface{}{ "someotherstuff": targetDNS, }, - }}, + }, + }, }, }, { Name: "Valid from and call, result does not exist", @@ -189,7 +203,7 @@ var testCases = []controllertesting.TestCase{ getNewSubscription(), }, WantPresent: []runtime.Object{ - getNewSubscriptionWithUnknownConditionsAndPhysicalFromCall(), + getNewSubscriptionWithUnknownConditionsAndPhysicalCall(), }, WantErrMsg: `channels.eventing.knative.dev "resultchannel" not found`, Scheme: scheme.Scheme, @@ -206,16 +220,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Call (using knative route) &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -230,7 +236,8 @@ var testCases = []controllertesting.TestCase{ "domainInternal": targetDNS, }, }, - }}, + }, + }, }, }, { Name: "valid from, call, result is not sinkable", @@ -242,7 +249,7 @@ var testCases = []controllertesting.TestCase{ // TODO: Again this works on gke cluster, but I need to set // something else up here. later... // getNewSubscriptionWithReferencesResolvedStatus(), - getNewSubscriptionWithUnknownConditionsAndPhysicalFromCall(), + getNewSubscriptionWithUnknownConditionsAndPhysicalCall(), }, Scheme: scheme.Scheme, Objects: []runtime.Object{ @@ -258,16 +265,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Call (using knative route) &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -282,7 +281,8 @@ var testCases = []controllertesting.TestCase{ "domainInternal": targetDNS, }, }, - }}, + }, + }, // Result channel &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -295,16 +295,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, }, }, { Name: "new subscription: adds status, all targets resolved, subscribers modified", @@ -316,7 +308,7 @@ var testCases = []controllertesting.TestCase{ // failure for now, until upstream is fixed. WantResult: reconcile.Result{}, WantPresent: []runtime.Object{ - getNewSubscriptionWithReferencesResolvedAndPhysicalFromCallResult(), + getNewSubscriptionWithReferencesResolvedAndPhysicalCallResult(), }, WantErrMsg: "invalid JSON document", Scheme: scheme.Scheme, @@ -333,16 +325,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Call (using knative route) &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -357,7 +341,8 @@ var testCases = []controllertesting.TestCase{ "domainInternal": targetDNS, }, }, - }}, + }, + }, // Result channel &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -371,18 +356,12 @@ var testCases = []controllertesting.TestCase{ "channelable": map[string]interface{}{}, }, "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, "sinkable": map[string]interface{}{ "domainInternal": sinkableDNS, }, }, - }}, + }, + }, }, }, { Name: "new subscription to K8s Service: adds status, all targets resolved, subscribers modified", @@ -412,16 +391,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Call (using K8s Service) &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -431,7 +402,8 @@ var testCases = []controllertesting.TestCase{ "namespace": testNS, "name": k8sServiceName, }, - }}, + }, + }, // Result channel &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -445,18 +417,12 @@ var testCases = []controllertesting.TestCase{ "channelable": map[string]interface{}{}, }, "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, "sinkable": map[string]interface{}{ "domainInternal": sinkableDNS, }, }, - }}, + }, + }, }, }, { Name: "new subscription with from channel: adds status, all targets resolved, subscribers modified", @@ -485,16 +451,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Source channel &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -507,16 +465,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Call (using knative route) &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -531,7 +481,8 @@ var testCases = []controllertesting.TestCase{ "domainInternal": targetDNS, }, }, - }}, + }, + }, // Result channel &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -545,13 +496,6 @@ var testCases = []controllertesting.TestCase{ "channelable": map[string]interface{}{}, }, "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, "sinkable": map[string]interface{}{ "domainInternal": sinkableDNS, }, @@ -566,7 +510,7 @@ var testCases = []controllertesting.TestCase{ // The first two Subscriptions both have the same physical From, so we should see that // Channel updated with both Subscriptions. getNewSubscriptionWithFromChannel(), - rename(getNewSubscriptionWithReferencesResolvedAndPhysicalFromCallResult()), + rename(getNewSubscriptionWithReferencesResolvedAndPhysicalCallResult()), // This subscription has a different physical From, so we should not see it in the same // Channel as the first two. getSubscriptionWithDifferentChannel(), @@ -585,7 +529,7 @@ var testCases = []controllertesting.TestCase{ //getChannelWithMultipleSubscriptions(), getNewSubscriptionWithSourceWithReferencesResolvedAndPhysicalFromCallResult(), // Unaltered because this Subscription was not reconciled. - rename(getNewSubscriptionWithReferencesResolvedAndPhysicalFromCallResult()), + rename(getNewSubscriptionWithReferencesResolvedAndPhysicalCallResult()), getSubscriptionWithDifferentChannel(), }, Scheme: scheme.Scheme, @@ -602,16 +546,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Source channel &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -624,16 +560,8 @@ var testCases = []controllertesting.TestCase{ "spec": map[string]interface{}{ "channelable": map[string]interface{}{}, }, - "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, - }, - }}, + }, + }, // Call (using knative route) &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -648,7 +576,8 @@ var testCases = []controllertesting.TestCase{ "domainInternal": targetDNS, }, }, - }}, + }, + }, // Result channel &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -662,13 +591,6 @@ var testCases = []controllertesting.TestCase{ "channelable": map[string]interface{}{}, }, "status": map[string]interface{}{ - "subscribable": map[string]interface{}{ - "channelable": map[string]interface{}{ - "kind": channelKind, - "name": fromChannelName, - "apiVersion": eventingv1alpha1.SchemeGroupVersion.String(), - }, - }, "sinkable": map[string]interface{}{ "domainInternal": sinkableDNS, }, @@ -762,6 +684,16 @@ func getNewSubscription() *eventingv1alpha1.Subscription { return subscription } +func getNewSourceSubscription() *eventingv1alpha1.Subscription { + sub := getNewSubscription() + sub.Spec.From = corev1.ObjectReference{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: sourceKind, + Name: sourceName, + } + return sub +} + func getNewSubscriptionToK8sService() *eventingv1alpha1.Subscription { sub := getNewSubscription() sub.Spec.Call = &eventingv1alpha1.EndpointSpec{ @@ -812,22 +744,14 @@ func getNewSubscriptionWithUnknownConditions() *eventingv1alpha1.Subscription { s.Status.InitializeConditions() return s } - -func getNewSubscriptionWithUnknownConditionsAndPhysicalFrom() *eventingv1alpha1.Subscription { +func getNewSubscriptionWithUnknownConditionsAndPhysicalCall() *eventingv1alpha1.Subscription { s := getNewSubscriptionWithUnknownConditions() - // The original From is a Channel, so points at itself. - s.Status.PhysicalSubscription.From = s.Spec.From - return s -} - -func getNewSubscriptionWithUnknownConditionsAndPhysicalFromCall() *eventingv1alpha1.Subscription { - s := getNewSubscriptionWithUnknownConditionsAndPhysicalFrom() s.Status.PhysicalSubscription.CallURI = domainToURL(targetDNS) return s } -func getNewSubscriptionWithReferencesResolvedAndPhysicalFromCallResult() *eventingv1alpha1.Subscription { - s := getNewSubscriptionWithUnknownConditionsAndPhysicalFrom() +func getNewSubscriptionWithReferencesResolvedAndPhysicalCallResult() *eventingv1alpha1.Subscription { + s := getNewSubscriptionWithUnknownConditions() s.Status.MarkReferencesResolved() s.Status.PhysicalSubscription.CallURI = domainToURL(targetDNS) s.Status.PhysicalSubscription.ResultURI = domainToURL(sinkableDNS) @@ -839,7 +763,6 @@ func getNewSubscriptionToK8sServiceWithReferencesResolvedAndPhysicalFromCallResu s.Status.InitializeConditions() s.Status.MarkReferencesResolved() s.Status.PhysicalSubscription = eventingv1alpha1.SubscriptionStatusPhysicalSubscription{ - From: s.Spec.From, CallURI: domainToURL(k8sServiceDNS), ResultURI: domainToURL(sinkableDNS), } @@ -851,11 +774,6 @@ func getNewSubscriptionWithSourceWithReferencesResolvedAndPhysicalFromCallResult s.Status.InitializeConditions() s.Status.MarkReferencesResolved() s.Status.PhysicalSubscription = eventingv1alpha1.SubscriptionStatusPhysicalSubscription{ - From: corev1.ObjectReference{ - APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - Kind: channelKind, - Name: fromChannelName, - }, CallURI: domainToURL(targetDNS), ResultURI: domainToURL(sinkableDNS), } @@ -872,7 +790,6 @@ func getSubscriptionWithDifferentChannel() *eventingv1alpha1.Subscription { s := getNewSubscriptionWithSourceWithReferencesResolvedAndPhysicalFromCallResult() s.Name = "different-channel" s.UID = "different-channel-UID" - s.Status.PhysicalSubscription.From.Name = "other-channel" s.Status.PhysicalSubscription.CallURI = "some-other-domain" return s } From 8c2a4cb85b3300648719adee18d079d37d53693a Mon Sep 17 00:00:00 2001 From: Adam Harwayne Date: Wed, 24 Oct 2018 15:08:32 -0700 Subject: [PATCH 3/3] Move the Channelable duck type into the eventing repo. --- hack/update-codegen.sh | 6 + pkg/apis/duck/v1alpha1/channelable_types.go | 87 +++++++++++ pkg/apis/duck/v1alpha1/doc.go | 20 +++ .../duck/v1alpha1/zz_generated.deepcopy.go | 139 ++++++++++++++++++ pkg/apis/eventing/v1alpha1/channel_types.go | 3 +- .../v1alpha1/channel_validation_test.go | 14 +- pkg/apis/eventing/v1alpha1/implements_test.go | 3 +- .../v1alpha1/zz_generated.deepcopy.go | 11 +- .../inmemory/channel/reconcile_test.go | 18 +-- .../eventing/subscription/reconcile.go | 11 +- .../eventing/subscription/reconcile_test.go | 5 +- .../filesystem/filesystem_watcher_test.go | 16 +- pkg/sidecar/configmap/parse_test.go | 12 +- pkg/sidecar/configmap/watcher/watcher_test.go | 4 +- pkg/sidecar/fanout/fanout_handler.go | 8 +- pkg/sidecar/fanout/fanout_handler_test.go | 24 +-- .../multi_channel_fanout_handler_test.go | 16 +- pkg/sidecar/multichannelfanout/parse_test.go | 8 +- pkg/sidecar/swappable/swappable_test.go | 8 +- .../apis/duck/v1alpha1/channelable_types.go | 4 +- 20 files changed, 337 insertions(+), 80 deletions(-) create mode 100644 pkg/apis/duck/v1alpha1/channelable_types.go create mode 100644 pkg/apis/duck/v1alpha1/doc.go create mode 100644 pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index 9301dc91d9d..6422e616532 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -31,5 +31,11 @@ ${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \ "channels:v1alpha1 feeds:v1alpha1 flows:v1alpha1 eventing:v1alpha1" \ --go-header-file ${REPO_ROOT_DIR}/hack/boilerplate/boilerplate.go.txt +# Only deepcopy the Duck types, as they are not real resources. +${CODEGEN_PKG}/generate-groups.sh "deepcopy" \ + github.com/knative/eventing/pkg/client github.com/knative/eventing/pkg/apis \ + "duck:v1alpha1" \ + --go-header-file ${REPO_ROOT_DIR}/hack/boilerplate/boilerplate.go.txt + # Make sure our dependencies are up-to-date ${REPO_ROOT_DIR}/hack/update-deps.sh diff --git a/pkg/apis/duck/v1alpha1/channelable_types.go b/pkg/apis/duck/v1alpha1/channelable_types.go new file mode 100644 index 00000000000..ab33f4db72b --- /dev/null +++ b/pkg/apis/duck/v1alpha1/channelable_types.go @@ -0,0 +1,87 @@ +/* + * Copyright 2018 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "github.com/knative/pkg/apis/duck" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// Channelable is the schema for the channelable portion of the spec +// section of the resource. +type Channelable struct { + // TODO: What is actually required here for Channel spec. + // This is the list of subscriptions for this channel. + Subscribers []ChannelSubscriberSpec `json:"subscribers,omitempty"` +} + +// ChannelSubscriberSpec defines a single subscriber to a Channel. +// CallableURI is the endpoint for the call +// SinkableURI is the endpoint for the result +// At least one of them must be present +type ChannelSubscriberSpec struct { + // +optional + CallableURI string `json:"callableURI,omitempty"` + // +optional + SinkableURI string `json:"sinkableURI,omitempty"` +} + +// DuckChannel is a skeleton type wrapping Channelable in the manner we expect resource writers +// defining compatible resources to embed it. We will typically use this type to deserialize +// Channelable ObjectReferences and access the Channelable data. This is not a real resource. +type Channel struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + // ChannelSpec is the part where Channelable object is + // configured as to be compatible with Channelable contract. + Spec ChannelSpec `json:"spec"` +} + +// DuckChannelSpec shows how we expect folks to embed Channelable in their Spec field. +type ChannelSpec struct { + Channelable *Channelable `json:"channelable,omitempty"` +} + +// GetFullType implements duck.Implementable +func (_ *Channelable) GetFullType() duck.Populatable { + return &Channel{} +} + +// Populate implements duck.Populatable +func (t *Channel) Populate() { + t.Spec.Channelable = &Channelable{ + // Populate ALL fields + Subscribers: []ChannelSubscriberSpec{{"call1", "sink2"}, {"call2", "sink2"}}, + } +} + +// GetListType implements apis.Listable +func (r *Channel) GetListType() runtime.Object { + return &ChannelList{} +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ChannelList is a list of Channel resources +type ChannelList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []Channel `json:"items"` +} diff --git a/pkg/apis/duck/v1alpha1/doc.go b/pkg/apis/duck/v1alpha1/doc.go new file mode 100644 index 00000000000..0dcbcff0129 --- /dev/null +++ b/pkg/apis/duck/v1alpha1/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2018 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Api versions allow the api contract for a resource to be changed while keeping +// backward compatibility by support multiple concurrent versions +// of the same resource + +// +k8s:deepcopy-gen=package +// +groupName=duck.knative.dev +package v1alpha1 diff --git a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..c3707fddd3d --- /dev/null +++ b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,139 @@ +// +build !ignore_autogenerated + +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Channel) DeepCopyInto(out *Channel) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Channel. +func (in *Channel) DeepCopy() *Channel { + if in == nil { + return nil + } + out := new(Channel) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelList) DeepCopyInto(out *ChannelList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Channel, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelList. +func (in *ChannelList) DeepCopy() *ChannelList { + if in == nil { + return nil + } + out := new(ChannelList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ChannelList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelSpec) DeepCopyInto(out *ChannelSpec) { + *out = *in + if in.Channelable != nil { + in, out := &in.Channelable, &out.Channelable + if *in == nil { + *out = nil + } else { + *out = new(Channelable) + (*in).DeepCopyInto(*out) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelSpec. +func (in *ChannelSpec) DeepCopy() *ChannelSpec { + if in == nil { + return nil + } + out := new(ChannelSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelSubscriberSpec) DeepCopyInto(out *ChannelSubscriberSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelSubscriberSpec. +func (in *ChannelSubscriberSpec) DeepCopy() *ChannelSubscriberSpec { + if in == nil { + return nil + } + out := new(ChannelSubscriberSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Channelable) DeepCopyInto(out *Channelable) { + *out = *in + if in.Subscribers != nil { + in, out := &in.Subscribers, &out.Subscribers + *out = make([]ChannelSubscriberSpec, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Channelable. +func (in *Channelable) DeepCopy() *Channelable { + if in == nil { + return nil + } + out := new(Channelable) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/eventing/v1alpha1/channel_types.go b/pkg/apis/eventing/v1alpha1/channel_types.go index 697dda0fb79..56702f56d00 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types.go +++ b/pkg/apis/eventing/v1alpha1/channel_types.go @@ -17,6 +17,7 @@ package v1alpha1 import ( + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/apis" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/webhook" @@ -72,7 +73,7 @@ type ChannelSpec struct { Arguments *runtime.RawExtension `json:"arguments,omitempty"` // Channel conforms to Duck type Channelable. - Channelable *duckv1alpha1.Channelable `json:"channelable,omitempty"` + Channelable *eventingduck.Channelable `json:"channelable,omitempty"` } var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionSinkable) diff --git a/pkg/apis/eventing/v1alpha1/channel_validation_test.go b/pkg/apis/eventing/v1alpha1/channel_validation_test.go index c79ea80a82d..6255252a45d 100644 --- a/pkg/apis/eventing/v1alpha1/channel_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/channel_validation_test.go @@ -20,8 +20,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/apis" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -56,8 +56,8 @@ func TestChannelValidation(t *testing.T) { Name: "foo", }, }, - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{ + Channelable: &eventingduck.Channelable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{{ CallableURI: "callableendpoint", SinkableURI: "resultendpoint", }}, @@ -73,8 +73,8 @@ func TestChannelValidation(t *testing.T) { Name: "foo", }, }, - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{ + Channelable: &eventingduck.Channelable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{{ CallableURI: "callableendpoint", SinkableURI: "callableendpoint", }, {}}, @@ -94,8 +94,8 @@ func TestChannelValidation(t *testing.T) { Name: "foo", }, }, - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{}, {}}, + Channelable: &eventingduck.Channelable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{{}, {}}, }, }, }, diff --git a/pkg/apis/eventing/v1alpha1/implements_test.go b/pkg/apis/eventing/v1alpha1/implements_test.go index 1e0198fe50c..1443a89a455 100644 --- a/pkg/apis/eventing/v1alpha1/implements_test.go +++ b/pkg/apis/eventing/v1alpha1/implements_test.go @@ -15,6 +15,7 @@ package v1alpha1 import ( "testing" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/apis/duck" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" ) @@ -27,7 +28,7 @@ func TestTypesImplements(t *testing.T) { }{ // Channel {instance: &Channel{}, iface: &duckv1alpha1.Conditions{}}, - {instance: &Channel{}, iface: &duckv1alpha1.Channelable{}}, + {instance: &Channel{}, iface: &eventingduck.Channelable{}}, {instance: &Channel{}, iface: &duckv1alpha1.Sinkable{}}, // ClusterProvisioner {instance: &ClusterProvisioner{}, iface: &duckv1alpha1.Conditions{}}, diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 76d9565a456..4ecc2861b8d 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -21,7 +21,8 @@ limitations under the License. package v1alpha1 import ( - duck_v1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + duck_v1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + apis_duck_v1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -136,7 +137,7 @@ func (in *ChannelStatus) DeepCopyInto(out *ChannelStatus) { out.Sinkable = in.Sinkable if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make(duck_v1alpha1.Conditions, len(*in)) + *out = make(apis_duck_v1alpha1.Conditions, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -237,7 +238,7 @@ func (in *ClusterProvisionerStatus) DeepCopyInto(out *ClusterProvisionerStatus) *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make(duck_v1alpha1.Conditions, len(*in)) + *out = make(apis_duck_v1alpha1.Conditions, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -448,7 +449,7 @@ func (in *SourceStatus) DeepCopyInto(out *SourceStatus) { *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make(duck_v1alpha1.Conditions, len(*in)) + *out = make(apis_duck_v1alpha1.Conditions, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -567,7 +568,7 @@ func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) { *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make(duck_v1alpha1.Conditions, len(*in)) + *out = make(apis_duck_v1alpha1.Conditions, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index ab746136ef1..1d31a9d02a6 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -24,12 +24,12 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" "github.com/knative/eventing/pkg/sidecar/configmap" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -73,7 +73,7 @@ var ( Namespace: cNamespace, Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "foo", }, @@ -91,7 +91,7 @@ var ( Namespace: cNamespace, Name: "c3", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "steve", }, @@ -116,8 +116,8 @@ var ( Name: cpName, }, }, - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ + Channelable: &eventingduck.Channelable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "foo", }, @@ -146,8 +146,8 @@ var ( Name: "some-other-provisioner", }, }, - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ + Channelable: &eventingduck.Channelable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "anything", }, @@ -169,8 +169,8 @@ var ( Name: cpName, }, }, - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ + Channelable: &eventingduck.Channelable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "steve", }, diff --git a/pkg/controller/eventing/subscription/reconcile.go b/pkg/controller/eventing/subscription/reconcile.go index 4d4f91f886b..5b008c34a1f 100644 --- a/pkg/controller/eventing/subscription/reconcile.go +++ b/pkg/controller/eventing/subscription/reconcile.go @@ -22,6 +22,7 @@ import ( "net/url" "github.com/golang/glog" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/controller" duckapis "github.com/knative/pkg/apis" @@ -308,11 +309,11 @@ func (r *reconciler) listAllSubscriptionsWithPhysicalFrom(sub *v1alpha1.Subscrip } } -func (r *reconciler) createChannelable(subs []v1alpha1.Subscription) *duckv1alpha1.Channelable { - rv := &duckv1alpha1.Channelable{} +func (r *reconciler) createChannelable(subs []v1alpha1.Subscription) *eventingduck.Channelable { + rv := &eventingduck.Channelable{} for _, sub := range subs { if sub.Status.PhysicalSubscription.CallURI != "" || sub.Status.PhysicalSubscription.ResultURI != "" { - rv.Subscribers = append(rv.Subscribers, duckv1alpha1.ChannelSubscriberSpec{ + rv.Subscribers = append(rv.Subscribers, eventingduck.ChannelSubscriberSpec{ CallableURI: sub.Status.PhysicalSubscription.CallURI, SinkableURI: sub.Status.PhysicalSubscription.ResultURI, }) @@ -321,13 +322,13 @@ func (r *reconciler) createChannelable(subs []v1alpha1.Subscription) *duckv1alph return rv } -func (r *reconciler) patchPhysicalFrom(namespace string, physicalFrom corev1.ObjectReference, subs *duckv1alpha1.Channelable) error { +func (r *reconciler) patchPhysicalFrom(namespace string, physicalFrom corev1.ObjectReference, subs *eventingduck.Channelable) error { // First get the original object and convert it to only the bits we care about s, err := r.fetchObjectReference(namespace, &physicalFrom) if err != nil { return err } - original := duckv1alpha1.Channel{} + original := eventingduck.Channel{} err = duck.FromUnstructured(s, &original) if err != nil { return err diff --git a/pkg/controller/eventing/subscription/reconcile_test.go b/pkg/controller/eventing/subscription/reconcile_test.go index 43793edc044..d8dec2b7f7c 100644 --- a/pkg/controller/eventing/subscription/reconcile_test.go +++ b/pkg/controller/eventing/subscription/reconcile_test.go @@ -20,6 +20,7 @@ import ( "fmt" "testing" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" @@ -829,8 +830,8 @@ func getChannelWithMultipleSubscriptions() *eventingv1alpha1.Channel { }, ObjectMeta: om(testNS, fromChannelName), Spec: eventingv1alpha1.ChannelSpec{ - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{ + Channelable: &eventingduck.Channelable{ + Subscribers: []eventingduck.ChannelSubscriberSpec{ { CallableURI: targetDNS, SinkableURI: sinkableDNS, diff --git a/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go b/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go index 54504e8d94f..b9dcb8ed476 100644 --- a/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go +++ b/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go @@ -27,10 +27,10 @@ import ( "time" "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/sidecar/configmap" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" yaml "gopkg.in/yaml.v2" ) @@ -104,7 +104,7 @@ func TestReadConfigMap(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "event-changer.default.svc.cluster.local", SinkableURI: "message-dumper-bar.default.svc.cluster.local", @@ -122,7 +122,7 @@ func TestReadConfigMap(t *testing.T) { Namespace: "default", Name: "c2", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, @@ -133,7 +133,7 @@ func TestReadConfigMap(t *testing.T) { Namespace: "other", Name: "c3", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, @@ -186,7 +186,7 @@ func TestWatch(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "foo.bar", }, @@ -203,7 +203,7 @@ func TestWatch(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "foo.bar", }, @@ -221,7 +221,7 @@ func TestWatch(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "foo.bar", }, @@ -236,7 +236,7 @@ func TestWatch(t *testing.T) { Namespace: "default", Name: "new-channel", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "baz.qux", }, diff --git a/pkg/sidecar/configmap/parse_test.go b/pkg/sidecar/configmap/parse_test.go index 0afd25df0c0..135f7620424 100644 --- a/pkg/sidecar/configmap/parse_test.go +++ b/pkg/sidecar/configmap/parse_test.go @@ -21,9 +21,9 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" ) @@ -86,7 +86,7 @@ func TestNewFanoutConfig(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "event-changer.default.svc.cluster.local", SinkableURI: "message-dumper-bar.default.svc.cluster.local", @@ -104,7 +104,7 @@ func TestNewFanoutConfig(t *testing.T) { Namespace: "default", Name: "c2", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, @@ -115,7 +115,7 @@ func TestNewFanoutConfig(t *testing.T) { Namespace: "other", Name: "c3", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, @@ -157,7 +157,7 @@ func TestSerializeConfig(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "foo.example.com", SinkableURI: "bar.example.com", @@ -176,7 +176,7 @@ func TestSerializeConfig(t *testing.T) { Namespace: "other", Name: "no-subs", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{}, + Subscriptions: []eventingduck.ChannelSubscriberSpec{}, }, }, }, diff --git a/pkg/sidecar/configmap/watcher/watcher_test.go b/pkg/sidecar/configmap/watcher/watcher_test.go index 2144da97240..fd92d6e7ed1 100644 --- a/pkg/sidecar/configmap/watcher/watcher_test.go +++ b/pkg/sidecar/configmap/watcher/watcher_test.go @@ -21,10 +21,10 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" sidecarconfigmap "github.com/knative/eventing/pkg/sidecar/configmap" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/configmap" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -76,7 +76,7 @@ func TestReconcile(t *testing.T) { Name: "foo", Namespace: "bar", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "callable", SinkableURI: "sinkable", diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index 0324a4ae283..48ff82a5988 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -26,8 +26,8 @@ import ( "net/http" "time" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/buses" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" ) @@ -39,7 +39,7 @@ const ( // Configuration for a fanout.Handler. type Config struct { - Subscriptions []duckv1alpha1.ChannelSubscriberSpec `json:"subscriptions"` + Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"` } // http.Handler that takes a single request in and fans it out to N other servers. @@ -96,7 +96,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (f *Handler) dispatch(msg *buses.Message) error { errorCh := make(chan error, len(f.config.Subscriptions)) for _, sub := range f.config.Subscriptions { - go func(s duckv1alpha1.ChannelSubscriberSpec) { + go func(s eventingduck.ChannelSubscriberSpec) { errorCh <- f.makeFanoutRequest(*msg, s) }(sub) } @@ -119,6 +119,6 @@ func (f *Handler) dispatch(msg *buses.Message) error { // makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and // the `sink` portions of the subscription. -func (f *Handler) makeFanoutRequest(m buses.Message, sub duckv1alpha1.ChannelSubscriberSpec) error { +func (f *Handler) makeFanoutRequest(m buses.Message, sub eventingduck.ChannelSubscriberSpec) error { return f.dispatcher.DispatchMessage(&m, sub.CallableURI, sub.SinkableURI, buses.DispatchDefaults{}) } diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go index 563f9726058..b1d2376a408 100644 --- a/pkg/sidecar/fanout/fanout_handler_test.go +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -26,8 +26,8 @@ import ( "testing" "time" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/buses" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -60,7 +60,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { testCases := map[string]struct { receiverFunc func(buses.ChannelReference, *buses.Message) error timeout time.Duration - subs []duckv1alpha1.ChannelSubscriberSpec + subs []eventingduck.ChannelSubscriberSpec callable func(http.ResponseWriter, *http.Request) sinkable func(http.ResponseWriter, *http.Request) expectedStatus int @@ -73,7 +73,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { }, "fanout times out": { timeout: time.Millisecond, - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceCallable, }, @@ -85,17 +85,17 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { expectedStatus: http.StatusInternalServerError, }, "zero subs succeed": { - subs: []duckv1alpha1.ChannelSubscriberSpec{}, + subs: []eventingduck.ChannelSubscriberSpec{}, expectedStatus: http.StatusAccepted, }, "empty sub succeeds": { - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ {}, }, expectedStatus: http.StatusAccepted, }, "sinkable fails": { - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: replaceSinkable, }, @@ -106,7 +106,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { expectedStatus: http.StatusInternalServerError, }, "callable fails": { - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceCallable, }, @@ -117,7 +117,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { expectedStatus: http.StatusInternalServerError, }, "callable succeeds, sinkable fails": { - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceCallable, SinkableURI: replaceSinkable, @@ -130,7 +130,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { expectedStatus: http.StatusInternalServerError, }, "one sub succeeds": { - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceCallable, SinkableURI: replaceSinkable, @@ -143,7 +143,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { expectedStatus: http.StatusAccepted, }, "one sub succeeds, one sub fails": { - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceCallable, SinkableURI: replaceSinkable, @@ -158,7 +158,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { expectedStatus: http.StatusInternalServerError, }, "all subs succeed": { - subs: []duckv1alpha1.ChannelSubscriberSpec{ + subs: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceCallable, SinkableURI: replaceSinkable, @@ -194,7 +194,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { defer sinkableServer.Close() // Rewrite the subs to use the servers we just started. - subs := make([]duckv1alpha1.ChannelSubscriberSpec, 0) + subs := make([]eventingduck.ChannelSubscriberSpec, 0) for _, sub := range tc.subs { if sub.CallableURI == replaceCallable { sub.CallableURI = callableServer.URL[7:] // strip the leading 'http://' diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go index e8d4c8418e7..82693b0c3a6 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go @@ -24,8 +24,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/sidecar/fanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" ) @@ -109,7 +109,7 @@ func TestCopyWithNewConfig(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "callabledomain", }, @@ -124,7 +124,7 @@ func TestCopyWithNewConfig(t *testing.T) { Namespace: "default", Name: "somethingdifferent", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "sinkabledomain", }, @@ -162,7 +162,7 @@ func TestConfigDiff(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "callabledomain", }, @@ -192,7 +192,7 @@ func TestConfigDiff(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "different", }, @@ -239,7 +239,7 @@ func TestServeHTTP(t *testing.T) { Namespace: "default", Name: "first-channel", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: replaceDomain, }, @@ -259,7 +259,7 @@ func TestServeHTTP(t *testing.T) { Namespace: "default", Name: "first-channel", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "first-to-domain", }, @@ -270,7 +270,7 @@ func TestServeHTTP(t *testing.T) { Namespace: "default", Name: "second-channel", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceDomain, }, diff --git a/pkg/sidecar/multichannelfanout/parse_test.go b/pkg/sidecar/multichannelfanout/parse_test.go index d0aed807667..c510db6b890 100644 --- a/pkg/sidecar/multichannelfanout/parse_test.go +++ b/pkg/sidecar/multichannelfanout/parse_test.go @@ -21,8 +21,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/sidecar/fanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" ) @@ -85,7 +85,7 @@ func TestConfigMapData(t *testing.T) { Namespace: "default", Name: "c1", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: "event-changer.default.svc.cluster.local", SinkableURI: "message-dumper-bar.default.svc.cluster.local", @@ -103,7 +103,7 @@ func TestConfigMapData(t *testing.T) { Namespace: "default", Name: "c2", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, @@ -114,7 +114,7 @@ func TestConfigMapData(t *testing.T) { Namespace: "other", Name: "c3", FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: "message-dumper-foo.default.svc.cluster.local", }, diff --git a/pkg/sidecar/swappable/swappable_test.go b/pkg/sidecar/swappable/swappable_test.go index 92218d82336..13a8b7936f3 100644 --- a/pkg/sidecar/swappable/swappable_test.go +++ b/pkg/sidecar/swappable/swappable_test.go @@ -23,9 +23,9 @@ import ( "strings" "testing" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" ) @@ -47,7 +47,7 @@ func TestHandler(t *testing.T) { Namespace: namespace, Name: name, FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceDomain, }, @@ -62,7 +62,7 @@ func TestHandler(t *testing.T) { Namespace: namespace, Name: name, FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { SinkableURI: replaceDomain, }, @@ -99,7 +99,7 @@ func TestHandler_InvalidConfigChange(t *testing.T) { Namespace: namespace, Name: name, FanoutConfig: fanout.Config{ - Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ { CallableURI: replaceDomain, }, diff --git a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go index adbc7858e67..7e7ffd9ada1 100644 --- a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go +++ b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/channelable_types.go @@ -38,9 +38,9 @@ type Channelable struct { // One of them must be present type ChannelSubscriberSpec struct { // +optional - CallableURI string `json:"callableURI,omitempty"` + CallableDomain string `json:"callableDomain,omitempty"` // +optional - SinkableURI string `json:"sinkableURI,omitempty"` + SinkableDomain string `json:"sinkableDomain,omitempty"` }