diff --git a/test/base/constants.go b/test/base/constants.go new file mode 100644 index 00000000000..fde6bbd37b2 --- /dev/null +++ b/test/base/constants.go @@ -0,0 +1,59 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +const ( + // InMemoryProvisioner is the in-memory provisioner, which is also the default one. + InMemoryProvisioner = "in-memory" + // GCPPubSubProvisioner is the gcp-pubsub provisioner, which is under contrib/gcppubsub. + GCPPubSubProvisioner = "gcp-pubsub" + // KafkaProvisioner is the kafka provisioner, which is under contrib/kafka. + KafkaProvisioner = "kafka" + // NatssProvisioner is the natss provisioner, which is under contrib/natss + NatssProvisioner = "natss" +) + +// API versions for the resources. +const ( + EventingAPIVersion = "eventing.knative.dev/v1alpha1" + SourcesAPIVersion = "sources.eventing.knative.dev/v1alpha1" + MessagingAPIVersion = "messaging.knative.dev/v1alpha1" +) + +// Kind for eventing resources. +const ( + ChannelKind string = "Channel" + SubscriptionKind string = "Subscription" + ClusterChannelProvisionerKind string = "ClusterChannelProvisioner" + + BrokerKind string = "Broker" + TriggerKind string = "Trigger" +) + +// Kind for messaging resources. +const ( + InMemoryChannelKind string = "InMemoryChannel" + KafkaChannelKind string = "KafkaChannel" + NatssChannelKind string = "NatssChannel" +) + +// Kind for sources resources. +const ( + CronJobSourceKind string = "CronJobSource" + ContainerSourceKind string = "ContainerSource" + ApiServerSourceKind string = "ApiServerSource" +) diff --git a/test/base/generics.go b/test/base/generics.go index 93fe161fc4d..40dcf205521 100644 --- a/test/base/generics.go +++ b/test/base/generics.go @@ -21,8 +21,11 @@ import ( "github.com/knative/pkg/apis/duck" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" ) // MetaResource includes necessary meta data to retrieve the generic Kubernetes resource. @@ -31,37 +34,66 @@ type MetaResource struct { metav1.ObjectMeta `json:"metadata,omitempty"` } -// MetaEventing returns a MetaResource that can represent a Knative Eventing resource. -func MetaEventing(name, namespace, kind string) *MetaResource { - return Meta(name, namespace, kind, eventingAPIVersion) +// MetaResourceList includes necessary meta data to retrieve the generic Kubernetes resource list. +type MetaResourceList struct { + metav1.TypeMeta `json:",inline"` + Namespace string } -// MetaSource returns a MetaResource that can represent a Knative Sources resource. -func MetaSource(name, namespace, kind string) *MetaResource { - return Meta(name, namespace, kind, sourcesAPIVersion) -} - -// Meta returns a MetaResource built from the given name, namespace and kind. -func Meta(name, namespace, kind, apiVersion string) *MetaResource { +// NewMetaResource returns a MetaResource built from the given name, namespace and typemeta. +func NewMetaResource(name, namespace string, typemeta *metav1.TypeMeta) *MetaResource { return &MetaResource{ + TypeMeta: *typemeta, ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: name, }, - TypeMeta: metav1.TypeMeta{ - Kind: kind, - APIVersion: apiVersion, - }, + } +} + +// NewMetaResourceList returns a MetaResourceList built from the given namespace and typemeta. +func NewMetaResourceList(namespace string, typemeta *metav1.TypeMeta) *MetaResourceList { + return &MetaResourceList{ + TypeMeta: *typemeta, + Namespace: namespace, } } // GetGenericObject returns a generic object representing a Kubernetes resource. // Callers can cast this returned object to other objects that implement the corresponding duck-type. -func GetGenericObject(dynamicClient dynamic.Interface, obj *MetaResource, rtype apis.Listable) (runtime.Object, error) { - // get the resource's name, namespace and gvr - name := obj.Name - namespace := obj.Namespace - gvk := obj.GroupVersionKind() +func GetGenericObject( + dynamicClient dynamic.Interface, + obj *MetaResource, + rtype apis.Listable, +) (runtime.Object, error) { + lister, err := getGenericLister(dynamicClient, obj.GroupVersionKind(), obj.Namespace, rtype) + if err != nil { + return nil, err + } + return lister.Get(obj.Name) +} + +// GetGenericObjectList returns a generic object list representing a list of Kubernetes resource. +func GetGenericObjectList( + dynamicClient dynamic.Interface, + objList *MetaResourceList, + rtype apis.Listable, +) ([]runtime.Object, error) { + lister, err := getGenericLister(dynamicClient, objList.GroupVersionKind(), objList.Namespace, rtype) + if err != nil { + return nil, err + } + return lister.List(labels.Everything()) +} + +// getGenericLister returns a GenericNamespacedLister, which can be used to get resources in the namespace. +func getGenericLister( + dynamicClient dynamic.Interface, + gvk schema.GroupVersionKind, + namespace string, + rtype apis.Listable, +) (cache.GenericNamespaceLister, error) { + // get the resource's namespace and gvr gvr, _ := meta.UnsafeGuessKindToResource(gvk) stopChannel := make(chan struct{}) @@ -73,5 +105,5 @@ func GetGenericObject(dynamicClient dynamic.Interface, obj *MetaResource, rtype if err != nil { return nil, err } - return lister.ByNamespace(namespace).Get(name) + return lister.ByNamespace(namespace), nil } diff --git a/test/base/resource_checks.go b/test/base/resource_checks.go index a00376ee7ce..c5cd459fbbf 100644 --- a/test/base/resource_checks.go +++ b/test/base/resource_checks.go @@ -28,6 +28,7 @@ import ( duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" "go.opencensus.io/trace" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" ) @@ -44,18 +45,35 @@ const ( // name the metric that is emitted to track how long it took for // the resource to get into the state checked by isResourceReady. func WaitForResourceReady(dynamicClient dynamic.Interface, obj *MetaResource) error { - metricName := fmt.Sprintf("WaitForResourceReady/%s", obj.Name) + metricName := fmt.Sprintf("WaitForResourceReady/%s/%s", obj.Namespace, obj.Name) _, span := trace.StartSpan(context.Background(), metricName) defer span.End() return wait.PollImmediate(interval, timeout, func() (bool, error) { - return isResourceReady(dynamicClient, obj) + untyped, err := GetGenericObject(dynamicClient, obj, &duckv1beta1.KResource{}) + return isResourceReady(untyped, err) + }) +} + +// WaitForResourcesReady waits until all the specified resources in the given namespace are ready. +func WaitForResourcesReady(dynamicClient dynamic.Interface, objList *MetaResourceList) error { + metricName := fmt.Sprintf("WaitForResourcesReady/%s", objList.Namespace) + _, span := trace.StartSpan(context.Background(), metricName) + defer span.End() + + return wait.PollImmediate(interval, timeout, func() (bool, error) { + untypeds, err := GetGenericObjectList(dynamicClient, objList, &duckv1beta1.KResource{}) + for _, untyped := range untypeds { + if isReady, err := isResourceReady(untyped, err); !isReady { + return isReady, err + } + } + return true, nil }) } // isResourceReady leverage duck-type to check if the given MetaResource is in ready state -func isResourceReady(dynamicClient dynamic.Interface, obj *MetaResource) (bool, error) { - untyped, err := GetGenericObject(dynamicClient, obj, &duckv1beta1.KResource{}) +func isResourceReady(obj runtime.Object, err error) (bool, error) { if k8serrors.IsNotFound(err) { // Return false as we are not done yet. // We swallow the error to keep on polling. @@ -66,6 +84,6 @@ func isResourceReady(dynamicClient dynamic.Interface, obj *MetaResource) (bool, return false, err } - kr := untyped.(*duckv1beta1.KResource) + kr := obj.(*duckv1beta1.KResource) return kr.Status.GetCondition(apis.ConditionReady).IsTrue(), nil } diff --git a/test/base/resources.go b/test/base/resources.go index eafaee2f3bf..ae60aac9528 100644 --- a/test/base/resources.go +++ b/test/base/resources.go @@ -17,10 +17,12 @@ limitations under the License. package base // resources contains functions that construct Eventing CRs and other Kubernetes resources. +// TODO(Fredy-Z): break this file into multiple files when it grows too large. import ( "fmt" + kafkamessagingv1alpha1 "github.com/knative/eventing/contrib/kafka/pkg/apis/messaging/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" pkgTest "github.com/knative/pkg/test" @@ -31,20 +33,29 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" ) -const eventingAPIVersion = "eventing.knative.dev/v1alpha1" -const sourcesAPIVersion = "sources.eventing.knative.dev/v1alpha1" +// TriggerOption enables further configuration of a Trigger. +type TriggerOption func(*eventingv1alpha1.Trigger) + +// SubscriptionOption enables further configuration of a Subscription. +type SubscriptionOption func(*eventingv1alpha1.Subscription) + +// CronJobSourceOption enables further configuration of a CronJobSource. +type CronJobSourceOption func(*sourcesv1alpha1.CronJobSource) + +// ContainerSourceOption enables further configuration of a ContainerSource. +type ContainerSourceOption func(*sourcesv1alpha1.ContainerSource) // clusterChannelProvisioner returns a ClusterChannelProvisioner for a given name. func clusterChannelProvisioner(name string) *corev1.ObjectReference { if name == "" { return nil } - return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", eventingAPIVersion, name) + return pkgTest.CoreV1ObjectReference(ClusterChannelProvisionerKind, EventingAPIVersion, name) } // channelRef returns an ObjectReference for a given Channel Name. -func channelRef(name string) *corev1.ObjectReference { - return pkgTest.CoreV1ObjectReference("Channel", eventingAPIVersion, name) +func channelRef(name string, typemeta *metav1.TypeMeta) *corev1.ObjectReference { + return pkgTest.CoreV1ObjectReference(typemeta.Kind, typemeta.APIVersion, name) } // Channel returns a Channel with the specified provisioner. @@ -59,8 +70,17 @@ func Channel(name, provisioner string) *eventingv1alpha1.Channel { } } +// KafkaChannel returns a KafkaChannel. +func KafkaChannel(name string) *kafkamessagingv1alpha1.KafkaChannel { + return &kafkamessagingv1alpha1.KafkaChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } +} + // WithSubscriberForSubscription returns an option that adds a Subscriber for the given Subscription. -func WithSubscriberForSubscription(name string) func(*eventingv1alpha1.Subscription) { +func WithSubscriberForSubscription(name string) SubscriptionOption { return func(s *eventingv1alpha1.Subscription) { if name != "" { s.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ @@ -71,11 +91,11 @@ func WithSubscriberForSubscription(name string) func(*eventingv1alpha1.Subscript } // WithReply returns an options that adds a ReplyStrategy for the given Subscription. -func WithReply(name string) func(*eventingv1alpha1.Subscription) { +func WithReply(name string, typemeta *metav1.TypeMeta) SubscriptionOption { return func(s *eventingv1alpha1.Subscription) { if name != "" { s.Spec.Reply = &eventingv1alpha1.ReplyStrategy{ - Channel: pkgTest.CoreV1ObjectReference("Channel", eventingAPIVersion, name), + Channel: pkgTest.CoreV1ObjectReference(typemeta.Kind, typemeta.APIVersion, name), } } } @@ -83,16 +103,16 @@ func WithReply(name string) func(*eventingv1alpha1.Subscription) { // Subscription returns a Subscription. func Subscription( - name, - channelName string, - options ...func(*eventingv1alpha1.Subscription), + name, channelName string, + channelTypeMeta *metav1.TypeMeta, + options ...SubscriptionOption, ) *eventingv1alpha1.Subscription { subscription := &eventingv1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: eventingv1alpha1.SubscriptionSpec{ - Channel: *channelRef(channelName), + Channel: *channelRef(channelName, channelTypeMeta), }, } for _, option := range options { @@ -116,7 +136,7 @@ func Broker(name, provisioner string) *eventingv1alpha1.Broker { } // WithTriggerFilter returns an option that adds a TriggerFilter for the given Trigger. -func WithTriggerFilter(eventSource, eventType string) func(*eventingv1alpha1.Trigger) { +func WithTriggerFilter(eventSource, eventType string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { triggerFilter := &eventingv1alpha1.TriggerFilter{ SourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ @@ -129,14 +149,14 @@ func WithTriggerFilter(eventSource, eventType string) func(*eventingv1alpha1.Tri } // WithBroker returns an option that adds a Broker for the given Trigger. -func WithBroker(brokerName string) func(*eventingv1alpha1.Trigger) { +func WithBroker(brokerName string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { t.Spec.Broker = brokerName } } // WithSubscriberRefForTrigger returns an option that adds a Subscriber Ref for the given Trigger. -func WithSubscriberRefForTrigger(name string) func(*eventingv1alpha1.Trigger) { +func WithSubscriberRefForTrigger(name string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { if name != "" { t.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ @@ -147,7 +167,7 @@ func WithSubscriberRefForTrigger(name string) func(*eventingv1alpha1.Trigger) { } // WithSubscriberURIForTrigger returns an option that adds a Subscriber URI for the given Trigger. -func WithSubscriberURIForTrigger(uri string) func(*eventingv1alpha1.Trigger) { +func WithSubscriberURIForTrigger(uri string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { t.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ URI: &uri, @@ -156,7 +176,7 @@ func WithSubscriberURIForTrigger(uri string) func(*eventingv1alpha1.Trigger) { } // Trigger returns a Trigger. -func Trigger(name string, options ...func(*eventingv1alpha1.Trigger)) *eventingv1alpha1.Trigger { +func Trigger(name string, options ...TriggerOption) *eventingv1alpha1.Trigger { trigger := &eventingv1alpha1.Trigger{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -169,14 +189,14 @@ func Trigger(name string, options ...func(*eventingv1alpha1.Trigger)) *eventingv } // WithSinkServiceForCronJobSource returns an option that adds a Kubernetes Service sink for the given CronJobSource. -func WithSinkServiceForCronJobSource(name string) func(*sourcesv1alpha1.CronJobSource) { +func WithSinkServiceForCronJobSource(name string) CronJobSourceOption { return func(cjs *sourcesv1alpha1.CronJobSource) { cjs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name) } } // WithServiceAccountForCronJobSource returns an option that adds a ServiceAccount for the given CronJobSource. -func WithServiceAccountForCronJobSource(saName string) func(*sourcesv1alpha1.CronJobSource) { +func WithServiceAccountForCronJobSource(saName string) CronJobSourceOption { return func(cjs *sourcesv1alpha1.CronJobSource) { cjs.Spec.ServiceAccountName = saName } @@ -187,7 +207,7 @@ func CronJobSource( name, schedule, data string, - options ...func(*sourcesv1alpha1.CronJobSource), + options ...CronJobSourceOption, ) *sourcesv1alpha1.CronJobSource { cronJobSource := &sourcesv1alpha1.CronJobSource{ ObjectMeta: metav1.ObjectMeta{ @@ -205,14 +225,14 @@ func CronJobSource( } // WithTemplateForContainerSource returns an option that adds a template for the given ContainerSource. -func WithTemplateForContainerSource(template *corev1.PodTemplateSpec) func(*sourcesv1alpha1.ContainerSource) { +func WithTemplateForContainerSource(template *corev1.PodTemplateSpec) ContainerSourceOption { return func(cs *sourcesv1alpha1.ContainerSource) { cs.Spec.Template = template } } // WithSinkServiceForContainerSource returns an option that adds a Kubernetes Service sink for the given ContainerSource. -func WithSinkServiceForContainerSource(name string) func(*sourcesv1alpha1.ContainerSource) { +func WithSinkServiceForContainerSource(name string) ContainerSourceOption { return func(cs *sourcesv1alpha1.ContainerSource) { cs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name) } @@ -221,7 +241,7 @@ func WithSinkServiceForContainerSource(name string) func(*sourcesv1alpha1.Contai // ContainerSource returns a Container EventSource. func ContainerSource( name string, - options ...func(*sourcesv1alpha1.ContainerSource), + options ...ContainerSourceOption, ) *sourcesv1alpha1.ContainerSource { containerSource := &sourcesv1alpha1.ContainerSource{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/common/client.go b/test/common/client.go index 53e8a612442..aac19e4abd7 100644 --- a/test/common/client.go +++ b/test/common/client.go @@ -22,6 +22,7 @@ package common import ( "testing" + kafkachannel "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned" eventing "github.com/knative/eventing/pkg/client/clientset/versioned" "github.com/knative/pkg/test" "k8s.io/client-go/dynamic" @@ -32,6 +33,7 @@ type Client struct { Kube *test.KubeClient Eventing *eventing.Clientset Dynamic dynamic.Interface + Kafka *kafkachannel.Clientset Namespace string T *testing.T @@ -61,6 +63,11 @@ func NewClient(configPath string, clusterName string, namespace string, t *testi return nil, err } + client.Kafka, err = kafkachannel.NewForConfig(cfg) + if err != nil { + return nil, err + } + client.Namespace = namespace client.T = t client.Cleaner = NewCleaner(t.Logf, client.Dynamic) diff --git a/test/common/config.go b/test/common/config.go index 43d31ff8de4..773e218600f 100644 --- a/test/common/config.go +++ b/test/common/config.go @@ -16,32 +16,40 @@ limitations under the License. package common -import "github.com/knative/eventing/pkg/reconciler/namespace/resources" +import ( + "github.com/knative/eventing/test/base" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// DefaultClusterChannelProvisioner is the default ClusterChannelProvisioner we will run tests against. +const DefaultClusterChannelProvisioner = base.InMemoryProvisioner // ValidProvisionersMap saves the provisioner-features mapping. // Each pair means the provisioner support the list of features. -var ValidProvisionersMap = map[string][]Feature{ - InMemoryProvisioner: {FeatureBasic}, - GCPPubSubProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, - KafkaProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, - NatssProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, +var ValidProvisionersMap = map[string]ChannelConfig{ + base.InMemoryProvisioner: ChannelConfig{ + Features: []Feature{FeatureBasic}, + }, + base.GCPPubSubProvisioner: ChannelConfig{ + Features: []Feature{FeatureBasic, FeatureRedelivery, FeaturePersistence}, + }, + base.KafkaProvisioner: ChannelConfig{ + Features: []Feature{FeatureBasic, FeatureRedelivery, FeaturePersistence}, + CRDSupported: true, + }, + base.NatssProvisioner: ChannelConfig{ + Features: []Feature{FeatureBasic, FeatureRedelivery, FeaturePersistence}, + }, } -const ( - // DefaultBrokerName is the name of the Broker that is automatically created after the current namespace is labeled. - DefaultBrokerName = resources.DefaultBrokerName - // DefaultClusterChannelProvisioner is the default ClusterChannelProvisioner we will run tests against. - DefaultClusterChannelProvisioner = InMemoryProvisioner - - // InMemoryProvisioner is the in-memory provisioner, which is also the default one. - InMemoryProvisioner = "in-memory" - // GCPPubSubProvisioner is the gcp-pubsub provisioner, which is under contrib/gcppubsub. - GCPPubSubProvisioner = "gcp-pubsub" - // KafkaProvisioner is the kafka provisioner, which is under contrib/kafka. - KafkaProvisioner = "kafka" - // NatssProvisioner is the natss provisioner, which is under contrib/natss - NatssProvisioner = "natss" -) +type ChannelConfig struct { + Features []Feature + CRDSupported bool +} + +var OperatorChannelMap = map[string]*metav1.TypeMeta{ + base.KafkaProvisioner: KafkaChannelTypeMeta, +} // Feature is the feature supported by the Channel provisioner. type Feature string diff --git a/test/common/creation.go b/test/common/creation.go index c96872408cd..ca27eb37401 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -17,47 +17,59 @@ limitations under the License. package common import ( - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" "github.com/knative/eventing/test/base" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// TODO(Fredy-Z): break this file into multiple files when it grows too large. + var coreAPIGroup = corev1.SchemeGroupVersion.Group var coreAPIVersion = corev1.SchemeGroupVersion.Version var rbacAPIGroup = rbacv1.SchemeGroupVersion.Group var rbacAPIVersion = rbacv1.SchemeGroupVersion.Version -// CreateChannelOrFail will create a Channel or fail the test if there is an error. -func (client *Client) CreateChannelOrFail(name, provisonerName string) { +// CreateChannelOrFail will create a Channel Resource in Eventing. +// TODO(Fredy-Z): This is a workaround when there are both provisioner and Channel CRDs in this repo. +// isCRD needs to be deleted when the provisioner implementation is removed. +func (client *Client) CreateChannelOrFail(name string, channelTypeMeta *metav1.TypeMeta, provisionerName string) { namespace := client.Namespace - channel := base.Channel(name, provisonerName) - - channels := client.Eventing.EventingV1alpha1().Channels(namespace) - // update channel with the new reference - channel, err := channels.Create(channel) - if err != nil { - client.T.Fatalf("Failed to create channel %q: %v", name, err) + switch channelTypeMeta.Kind { + case base.ChannelKind: + channel := base.Channel(name, provisionerName) + channels := client.Eventing.EventingV1alpha1().Channels(namespace) + channel, err := channels.Create(channel) + if err != nil { + client.T.Fatalf("Failed to create channel %q: %v", name, err) + } + client.Cleaner.AddObj(channel) + case base.KafkaChannelKind: + channel := base.KafkaChannel(name) + channels := client.Kafka.MessagingV1alpha1().KafkaChannels(namespace) + channel, err := channels.Create(channel) + if err != nil { + client.T.Fatalf("Failed to create %q %q: %v", channelTypeMeta.Kind, name, err) + } + client.Cleaner.AddObj(channel) } - client.Cleaner.AddObj(channel) } // CreateChannelsOrFail will create a list of Channel Resources in Eventing. -func (client *Client) CreateChannelsOrFail(names []string, provisionerName string) { +func (client *Client) CreateChannelsOrFail(names []string, channelTypeMeta *metav1.TypeMeta, provisionerName string) { for _, name := range names { - client.CreateChannelOrFail(name, provisionerName) + client.CreateChannelOrFail(name, channelTypeMeta, provisionerName) } } // CreateSubscriptionOrFail will create a Subscription or fail the test if there is an error. func (client *Client) CreateSubscriptionOrFail( - name, - channelName string, - options ...func(*eventingv1alpha1.Subscription), + name, channelName string, + channelTypeMeta *metav1.TypeMeta, + options ...base.SubscriptionOption, ) { namespace := client.Namespace - subscription := base.Subscription(name, channelName, options...) + subscription := base.Subscription(name, channelName, channelTypeMeta, options...) subscriptions := client.Eventing.EventingV1alpha1().Subscriptions(namespace) // update subscription with the new reference @@ -72,10 +84,11 @@ func (client *Client) CreateSubscriptionOrFail( func (client *Client) CreateSubscriptionsOrFail( names []string, channelName string, - options ...func(*eventingv1alpha1.Subscription), + channelTypeMeta *metav1.TypeMeta, + options ...base.SubscriptionOption, ) { for _, name := range names { - client.CreateSubscriptionOrFail(name, channelName, options...) + client.CreateSubscriptionOrFail(name, channelName, channelTypeMeta, options...) } } @@ -101,7 +114,7 @@ func (client *Client) CreateBrokersOrFail(names []string, provisionerName string } // CreateTriggerOrFail will create a Trigger or fail the test if there is an error. -func (client *Client) CreateTriggerOrFail(name string, options ...func(*eventingv1alpha1.Trigger)) { +func (client *Client) CreateTriggerOrFail(name string, options ...base.TriggerOption) { namespace := client.Namespace trigger := base.Trigger(name, options...) @@ -119,7 +132,7 @@ func (client *Client) CreateCronJobSourceOrFail( name, schedule, data string, - options ...func(*sourcesv1alpha1.CronJobSource), + options ...base.CronJobSourceOption, ) { namespace := client.Namespace cronJobSource := base.CronJobSource(name, schedule, data, options...) @@ -136,7 +149,7 @@ func (client *Client) CreateCronJobSourceOrFail( // CreateContainerSourceOrFail will create a ContainerSource. func (client *Client) CreateContainerSourceOrFail( name string, - options ...func(*sourcesv1alpha1.ContainerSource), + options ...base.ContainerSourceOption, ) { namespace := client.Namespace containerSource := base.ContainerSource(name, options...) diff --git a/test/common/operation.go b/test/common/operation.go index ac0670dd01d..574a475d13b 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -41,47 +41,36 @@ func (client *Client) LabelNamespace(labels map[string]string) error { return err } -// SendFakeEventToChannel will send the given event to the given channel. -func (client *Client) SendFakeEventToChannel(senderName, channelName string, event *base.CloudEvent) error { - url, err := client.GetChannelURL(channelName) - if err != nil { - return err - } - return client.sendFakeEventToAddress(senderName, url, event) -} - -// GetChannelURL will return the url for the given channel. -func (client *Client) GetChannelURL(name string) (string, error) { - namespace := client.Namespace - channelMeta := base.MetaEventing(name, namespace, "Channel") - return base.GetAddressableURI(client.Dynamic, channelMeta) -} - -// SendFakeEventToBroker will send the given event to the given broker. -func (client *Client) SendFakeEventToBroker(senderName, brokerName string, event *base.CloudEvent) error { - url, err := client.GetBrokerURL(brokerName) +// SendFakeEventToAddressable will send the given event to the given Addressable. +func (client *Client) SendFakeEventToAddressable( + senderName, + addressableName string, + typemeta *metav1.TypeMeta, + event *base.CloudEvent, +) error { + uri, err := client.GetAddressableURI(addressableName, typemeta) if err != nil { return err } - return client.sendFakeEventToAddress(senderName, url, event) + return client.sendFakeEventToAddress(senderName, uri, event) } -// GetBrokerURL will return the url for the given broker. -func (client *Client) GetBrokerURL(name string) (string, error) { +// GetAddressableURI returns the URI of the addressable resource. +// To use this function, the given resource must have implemented the Addressable duck-type. +func (client *Client) GetAddressableURI(addressableName string, typemeta *metav1.TypeMeta) (string, error) { namespace := client.Namespace - brokerMeta := base.MetaEventing(name, namespace, "Broker") - return base.GetAddressableURI(client.Dynamic, brokerMeta) + metaAddressable := base.NewMetaResource(addressableName, namespace, typemeta) + return base.GetAddressableURI(client.Dynamic, metaAddressable) } // sendFakeEventToAddress will create a sender pod, which will send the given event to the given url. func (client *Client) sendFakeEventToAddress( senderName string, - url string, + uri string, event *base.CloudEvent, ) error { namespace := client.Namespace - client.T.Logf("Sending fake CloudEvent") - pod := base.EventSenderPod(senderName, url, event) + pod := base.EventSenderPod(senderName, uri, event) client.CreatePodOrFail(pod) if err := pkgTest.WaitForPodRunning(client.Kube, senderName, namespace); err != nil { return err @@ -89,177 +78,45 @@ func (client *Client) sendFakeEventToAddress( return nil } -// WaitForBrokerReady waits until the broker is Ready. -func (client *Client) WaitForBrokerReady(name string) error { - namespace := client.Namespace - brokerMeta := base.MetaEventing(name, namespace, "Broker") - if err := base.WaitForResourceReady(client.Dynamic, brokerMeta); err != nil { - return err - } - return nil -} - -// WaitForBrokersReady waits until all brokers in the namespace are Ready. -func (client *Client) WaitForBrokersReady() error { - namespace := client.Namespace - brokers, err := client.Eventing.EventingV1alpha1().Brokers(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, broker := range brokers.Items { - if err := client.WaitForBrokerReady(broker.Name); err != nil { - return err - } - } - return nil -} - -// WaitForTriggerReady waits until the trigger is Ready. -func (client *Client) WaitForTriggerReady(name string) error { - namespace := client.Namespace - triggerMeta := base.MetaEventing(name, namespace, "Trigger") - if err := base.WaitForResourceReady(client.Dynamic, triggerMeta); err != nil { - return err - } - return nil -} - -// WaitForTriggersReady waits until all triggers in the namespace are Ready. -func (client *Client) WaitForTriggersReady() error { +// WaitForResourceReady waits for the resource to become ready. +// To use this function, the given resource must have implemented the Status duck-type. +func (client *Client) WaitForResourceReady(name string, typemeta *metav1.TypeMeta) error { namespace := client.Namespace - triggers, err := client.Eventing.EventingV1alpha1().Triggers(namespace).List(metav1.ListOptions{}) - if err != nil { + metaResource := base.NewMetaResource(name, namespace, typemeta) + if err := base.WaitForResourceReady(client.Dynamic, metaResource); err != nil { return err } - for _, trigger := range triggers.Items { - if err := client.WaitForTriggerReady(trigger.Name); err != nil { - return err - } - } return nil } -// WaitForChannelReady waits until the channel is Ready. -func (client *Client) WaitForChannelReady(name string) error { +// WaitForResourcesReady waits for resources of the given type in the namespace to become ready. +// To use this function, the given resource must have implemented the Status duck-type. +func (client *Client) WaitForResourcesReady(typemeta *metav1.TypeMeta) error { namespace := client.Namespace - channelMeta := base.MetaEventing(name, namespace, "Channel") - if err := base.WaitForResourceReady(client.Dynamic, channelMeta); err != nil { + metaResourceList := base.NewMetaResourceList(namespace, typemeta) + if err := base.WaitForResourcesReady(client.Dynamic, metaResourceList); err != nil { return err } return nil } -// WaitForChannelsReady waits until all channels in the namespace are Ready. -func (client *Client) WaitForChannelsReady() error { - namespace := client.Namespace - channels, err := client.Eventing.EventingV1alpha1().Channels(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, channel := range channels.Items { - if err := client.WaitForChannelReady(channel.Name); err != nil { - return err - } - } - return nil -} - -// WaitForSubscriptionReady waits until the subscription is Ready. -func (client *Client) WaitForSubscriptionReady(name string) error { - namespace := client.Namespace - subscriptionMeta := base.MetaEventing(name, namespace, "Subscription") - if err := base.WaitForResourceReady(client.Dynamic, subscriptionMeta); err != nil { - return err - } - return nil -} - -// WaitForSubscriptionsReady waits until all subscriptions in the namespace are Ready. -func (client *Client) WaitForSubscriptionsReady() error { - namespace := client.Namespace - subscriptions, err := client.Eventing.EventingV1alpha1().Subscriptions(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, subscription := range subscriptions.Items { - if err := client.WaitForSubscriptionReady(subscription.Name); err != nil { - return err - } - } - return nil -} - -// WaitForCronJobSourceReady waits until the cronjobsource is Ready. -func (client *Client) WaitForCronJobSourceReady(name string) error { - namespace := client.Namespace - cronJobSourceMeta := base.MetaSource(name, namespace, "CronJobSource") - if err := base.WaitForResourceReady(client.Dynamic, cronJobSourceMeta); err != nil { - return err - } - return nil -} - -// WaitForCronJobSourcesReady waits until all cronjobsources in the namespace are Ready. -func (client *Client) WaitForCronJobSourcesReady() error { - namespace := client.Namespace - cronJobSources, err := client.Eventing.SourcesV1alpha1().CronJobSources(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, cronJobSource := range cronJobSources.Items { - if err := client.WaitForCronJobSourceReady(cronJobSource.Name); err != nil { - return err - } - } - return nil -} - -// WaitForContainerSourceReady waits until the containersource is Ready. -func (client *Client) WaitForContainerSourceReady(name string) error { - namespace := client.Namespace - containerSourceMeta := base.MetaSource(name, namespace, "ContainerSource") - if err := base.WaitForResourceReady(client.Dynamic, containerSourceMeta); err != nil { - return err - } - return nil -} - -// WaitForContainerSourcesReady waits until all containersources in the namespace are Ready. -func (client *Client) WaitForContainerSourcesReady() error { - namespace := client.Namespace - containerSources, err := client.Eventing.SourcesV1alpha1().ContainerSources(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, containerSource := range containerSources.Items { - if err := client.WaitForContainerSourceReady(containerSource.Name); err != nil { - return err - } - } - return nil -} - // WaitForAllTestResourcesReady waits until all test resources in the namespace are Ready. -// Currently the test resources include Pod, Channel, Subscription, Broker and Trigger. -// If there are new resources, this function needs to be changed. +// If there is a new resource, its TypeMeta needs to be added into the list. +// TODO(Fredy-Z): make this function more generic by only checking existed resources in the current namespace. func (client *Client) WaitForAllTestResourcesReady() error { - if err := client.WaitForChannelsReady(); err != nil { - return err - } - if err := client.WaitForSubscriptionsReady(); err != nil { - return err - } - if err := client.WaitForBrokersReady(); err != nil { - return err - } - if err := client.WaitForTriggersReady(); err != nil { - return err - } - if err := client.WaitForCronJobSourcesReady(); err != nil { - return err - } - if err := client.WaitForContainerSourcesReady(); err != nil { - return err + typemetas := []*metav1.TypeMeta{ + ChannelTypeMeta, + SubscriptionTypeMeta, + BrokerTypeMeta, + TriggerTypeMeta, + KafkaChannelTypeMeta, + CronJobSourceTypeMeta, + ContainerSourceTypeMeta, + } + for _, typemeta := range typemetas { + if err := client.WaitForResourcesReady(typemeta); err != nil { + return err + } } if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil { return err diff --git a/test/common/typemeta.go b/test/common/typemeta.go new file mode 100644 index 00000000000..db900b336bb --- /dev/null +++ b/test/common/typemeta.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "github.com/knative/eventing/test/base" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ChannelTypeMeta is the TypeMeta ref for Channel. +var ChannelTypeMeta = EventingTypeMeta(base.ChannelKind) + +// SubscriptionTypeMeta is the TypeMeta ref for Subscription. +var SubscriptionTypeMeta = EventingTypeMeta(base.SubscriptionKind) + +// BrokerTypeMeta is the TypeMeta ref for Broker. +var BrokerTypeMeta = EventingTypeMeta(base.BrokerKind) + +// TriggerTypeMeta is the TypeMeta ref for Trigger. +var TriggerTypeMeta = EventingTypeMeta(base.TriggerKind) + +// EventingTypeMeta returns the TypeMeta ref for an eventing resource. +func EventingTypeMeta(kind string) *metav1.TypeMeta { + return &metav1.TypeMeta{ + Kind: kind, + APIVersion: base.EventingAPIVersion, + } +} + +// CronJobSourceTypeMeta is the TypeMeta ref for CronJobSource. +var CronJobSourceTypeMeta = SourcesTypeMeta(base.CronJobSourceKind) + +// ContainerSourceTypeMeta is the TypeMeta ref for ContainerSource. +var ContainerSourceTypeMeta = SourcesTypeMeta(base.ContainerSourceKind) + +// SourcesTypeMeta returns the TypeMeta ref for an eventing sources resource. +func SourcesTypeMeta(kind string) *metav1.TypeMeta { + return &metav1.TypeMeta{ + Kind: kind, + APIVersion: base.SourcesAPIVersion, + } +} + +// KafkaChannelTypeMeta is the TypeMeta ref for KafkaChannel. +var KafkaChannelTypeMeta = MessagingTypeMeta(base.KafkaChannelKind) + +// MessagingTypeMeta returns the TypeMeta ref for an eventing messaing resource. +func MessagingTypeMeta(kind string) *metav1.TypeMeta { + return &metav1.TypeMeta{ + Kind: kind, + APIVersion: base.MessagingAPIVersion, + } +} diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 1425f0931f7..6f0034a3d07 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -60,6 +60,12 @@ readonly KAFKA_CONFIG_TEMPLATE="contrib/kafka/config/provisioner/kafka.yaml" readonly KAFKA_CONFIG="$(mktemp)" # Kafka cluster URL for our installation readonly KAFKA_CLUSTER_URL="my-cluster-kafka-bootstrap.kafka:9092" +# Kafka channel CRD config template directory. +readonly KAFKA_CRD_CONFIG_TEMPLATE_DIR="contrib/kafka/config" +# Kafka channel CRD config template file. It needs to be modified to be the real config file. +readonly KAFKA_CRD_CONFIG_TEMPLATE="400-kafka-config.yaml" +# Real Kafka channel CRD config , generated from the template directory and modified template file. +readonly KAFKA_CRD_CONFIG_DIR="$(mktemp -d)" # Setup the Knative environment for running tests. function knative_setup() { @@ -101,6 +107,12 @@ function test_setup() { ko apply -f ${KAFKA_CONFIG} || return 1 wait_until_pods_running knative-eventing || fail_test "Failed to install the Kafka ClusterChannelProvisioner" + echo "Installing Kafka Channel CRD" + cp ${KAFKA_CRD_CONFIG_TEMPLATE_DIR}/*yaml ${KAFKA_CRD_CONFIG_DIR} + sed -i "s/REPLACE_WITH_CLUSTER_URL/${KAFKA_CLUSTER_URL}/" ${KAFKA_CRD_CONFIG_DIR}/${KAFKA_CRD_CONFIG_TEMPLATE} + ko apply -f ${KAFKA_CRD_CONFIG_DIR} || return 1 + wait_until_pods_running knative-eventing || fail_test "Failed to install the Kafka Channel CRD" + # Publish test images. echo ">> Publishing test images" $(dirname $0)/upload-test-images.sh e2e || fail_test "Error uploading test images" @@ -124,6 +136,9 @@ function test_teardown() { kafka_teardown ko delete --ignore-not-found=true --now --timeout 60s -f ${KAFKA_CONFIG} + echo "Uninstalling Kafka Channel CRD" + ko delete --ignore-not-found=true --now --timeout 60s -f ${KAFKA_CRD_CONFIG_DIR} + wait_until_object_does_not_exist namespaces knative-eventing } @@ -211,6 +226,6 @@ function dump_extra_cluster_state() { initialize $@ --skip-istio-addon -go_test_e2e -timeout=20m ./test/e2e -clusterChannelProvisioners=in-memory,natss,kafka || fail_test +go_test_e2e -timeout=20m -parallel=12 ./test/e2e -clusterChannelProvisioners=in-memory,natss,kafka || fail_test success diff --git a/test/e2e/broker_channel_flow_test.go b/test/e2e/broker_channel_flow_test.go index f4807d59407..ea37e538095 100644 --- a/test/e2e/broker_channel_flow_test.go +++ b/test/e2e/broker_channel_flow_test.go @@ -51,7 +51,7 @@ func TestBrokerChannelFlow(t *testing.T) { RunTests(t, common.FeatureBasic, testBrokerChannelFlow) } -func testBrokerChannelFlow(t *testing.T, provisioner string) { +func testBrokerChannelFlow(t *testing.T, provisioner string, isCRD bool) { const ( senderName = "e2e-brokerchannel-sender" brokerName = "e2e-brokerchannel-broker" @@ -90,7 +90,7 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { // create a new broker client.CreateBrokerOrFail(brokerName, provisioner) - client.WaitForBrokerReady(brokerName) + client.WaitForResourceReady(brokerName, common.BrokerTypeMeta) // create the event we want to transform to transformedEventBody := fmt.Sprintf("%s %s", eventBody, string(uuid.NewUUID())) @@ -126,11 +126,12 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { ) // create channel for trigger3 - client.CreateChannelOrFail(channelName, provisioner) - client.WaitForChannelReady(channelName) + channelTypeMeta := getChannelTypeMeta(provisioner, isCRD) + client.CreateChannelOrFail(channelName, channelTypeMeta, provisioner) + client.WaitForResourceReady(channelName, channelTypeMeta) // create trigger3 to receive the transformed event, and send it to the channel - channelURL, err := client.GetChannelURL(channelName) + channelURL, err := client.GetAddressableURI(channelName, channelTypeMeta) if err != nil { t.Fatalf("Failed to get the url for the channel %q: %v", channelName, err) } @@ -149,6 +150,7 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { client.CreateSubscriptionOrFail( subscriptionName, channelName, + channelTypeMeta, base.WithSubscriberForSubscription(loggerPodName2), ) @@ -164,7 +166,7 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { Data: fmt.Sprintf(`{"msg":%q}`, eventBody), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToBroker(senderName, brokerName, eventToSend); err != nil { + if err := client.SendFakeEventToAddressable(senderName, brokerName, common.BrokerTypeMeta, eventToSend); err != nil { t.Fatalf("Failed to send fake CloudEvent to the broker %q", brokerName) } diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index f70b4ba5412..ecd6b9938db 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/reconciler/namespace/resources" "github.com/knative/eventing/test/base" "github.com/knative/eventing/test/common" @@ -36,11 +37,12 @@ const ( waitForFilterPodRunning = 30 * time.Second selectorKey = "end2end-test-broker-trigger" - any = v1alpha1.TriggerAnyFilter - eventType1 = "type1" - eventType2 = "type2" - eventSource1 = "source1" - eventSource2 = "source2" + defaultBrokerName = resources.DefaultBrokerName + any = v1alpha1.TriggerAnyFilter + eventType1 = "type1" + eventType2 = "type2" + eventSource1 = "source1" + eventSource2 = "source2" ) // Helper struct to tie the type and sources of the events we expect to receive @@ -64,7 +66,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } // Wait for default broker ready. - if err := client.WaitForBrokerReady(common.DefaultBrokerName); err != nil { + if err := client.WaitForResourceReady(defaultBrokerName, common.BrokerTypeMeta); err != nil { t.Fatalf("Error waiting for default broker to become ready: %v", err) } @@ -121,7 +123,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } // Create sender pod. senderPodName := name("sender", eventToSend.Type, eventToSend.Source) - if err := client.SendFakeEventToBroker(senderPodName, common.DefaultBrokerName, cloudEvent); err != nil { + if err := client.SendFakeEventToAddressable(senderPodName, defaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { t.Fatalf("Error send cloud event to broker: %v", err) } diff --git a/test/e2e/broker_event_transformation_test.go b/test/e2e/broker_event_transformation_test.go index d9ec7a52c90..3bb2b4d028d 100644 --- a/test/e2e/broker_event_transformation_test.go +++ b/test/e2e/broker_event_transformation_test.go @@ -45,7 +45,7 @@ func TestEventTransformationForTrigger(t *testing.T) { RunTests(t, common.FeatureBasic, testEventTransformationForTrigger) } -func testEventTransformationForTrigger(t *testing.T, provisioner string) { +func testEventTransformationForTrigger(t *testing.T, provisioner string, isCRD bool) { const ( senderName = "e2e-eventtransformation-sender" brokerName = "e2e-eventtransformation-broker" @@ -79,7 +79,7 @@ func testEventTransformationForTrigger(t *testing.T, provisioner string) { // create a new broker client.CreateBrokerOrFail(brokerName, provisioner) - client.WaitForBrokerReady(brokerName) + client.WaitForResourceReady(brokerName, common.BrokerTypeMeta) // create the event we want to transform to transformedEventBody := fmt.Sprintf("%s %s", eventBody, string(uuid.NewUUID())) @@ -126,7 +126,7 @@ func testEventTransformationForTrigger(t *testing.T, provisioner string) { Data: fmt.Sprintf(`{"msg":%q}`, eventBody), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToBroker(senderName, brokerName, eventToSend); err != nil { + if err := client.SendFakeEventToAddressable(senderName, brokerName, common.BrokerTypeMeta, eventToSend); err != nil { t.Fatalf("Failed to send fake CloudEvent to the broker %q", brokerName) } diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 2446ba68b29..1483a377076 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -37,7 +37,7 @@ func TestChannelChain(t *testing.T) { RunTests(t, common.FeatureBasic, testChannelChain) } -func testChannelChain(t *testing.T, provisioner string) { +func testChannelChain(t *testing.T, provisioner string, isCRD bool) { const ( senderName = "e2e-channelchain-sender" loggerPodName = "e2e-channelchain-logger-pod" @@ -52,17 +52,28 @@ func testChannelChain(t *testing.T, provisioner string) { defer TearDown(client) // create channels - client.CreateChannelsOrFail(channelNames, provisioner) - client.WaitForChannelsReady() + channelTypeMeta := getChannelTypeMeta(provisioner, isCRD) + client.CreateChannelsOrFail(channelNames, channelTypeMeta, provisioner) + client.WaitForResourcesReady(channelTypeMeta) // create loggerPod and expose it as a service pod := base.EventLoggerPod(loggerPodName) client.CreatePodOrFail(pod, common.WithService(loggerPodName)) // create subscriptions that subscribe the first channel, and reply events directly to the second channel - client.CreateSubscriptionsOrFail(subscriptionNames1, channelNames[0], base.WithReply(channelNames[1])) + client.CreateSubscriptionsOrFail( + subscriptionNames1, + channelNames[0], + channelTypeMeta, + base.WithReply(channelNames[1], channelTypeMeta), + ) // create subscriptions that subscribe the second channel, and call the logging service - client.CreateSubscriptionsOrFail(subscriptionNames2, channelNames[1], base.WithSubscriberForSubscription(loggerPodName)) + client.CreateSubscriptionsOrFail( + subscriptionNames2, + channelNames[1], + channelTypeMeta, + base.WithSubscriberForSubscription(loggerPodName), + ) // wait for all test resources to be ready, so that we can start sending events if err := client.WaitForAllTestResourcesReady(); err != nil { @@ -77,7 +88,7 @@ func testChannelChain(t *testing.T, provisioner string) { Data: fmt.Sprintf(`{"msg":%q}`, body), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToChannel(senderName, channelNames[0], event); err != nil { + if err := client.SendFakeEventToAddressable(senderName, channelNames[0], channelTypeMeta, event); err != nil { t.Fatalf("Failed to send fake CloudEvent to the channel %q", channelNames[0]) } diff --git a/test/e2e/channel_event_transformation_test.go b/test/e2e/channel_event_transformation_test.go index e405f9dbba6..bca1811a7e7 100644 --- a/test/e2e/channel_event_transformation_test.go +++ b/test/e2e/channel_event_transformation_test.go @@ -48,13 +48,14 @@ func TestEventTransformationForSubscription(t *testing.T) { transformationPodName := "e2e-eventtransformation-transformation-pod" loggerPodName := "e2e-eventtransformation-logger-pod" - RunTests(t, common.FeatureBasic, func(st *testing.T, provisioner string) { + RunTests(t, common.FeatureBasic, func(st *testing.T, provisioner string, isCRD bool) { client := Setup(st, true) defer TearDown(client) // create channels - client.CreateChannelsOrFail(channelNames, provisioner) - client.WaitForChannelsReady() + channelTypeMeta := getChannelTypeMeta(provisioner, isCRD) + client.CreateChannelsOrFail(channelNames, channelTypeMeta, provisioner) + client.WaitForResourcesReady(channelTypeMeta) // create transformation pod and service transformedEventBody := fmt.Sprintf("eventBody %s", uuid.NewUUID()) @@ -75,13 +76,15 @@ func TestEventTransformationForSubscription(t *testing.T) { client.CreateSubscriptionsOrFail( subscriptionNames1, channelNames[0], + channelTypeMeta, base.WithSubscriberForSubscription(transformationPodName), - base.WithReply(channelNames[1]), + base.WithReply(channelNames[1], channelTypeMeta), ) // create subscriptions that subscribe the second channel, and forward the received events to the logger service client.CreateSubscriptionsOrFail( subscriptionNames2, channelNames[1], + channelTypeMeta, base.WithSubscriberForSubscription(loggerPodName), ) @@ -98,7 +101,7 @@ func TestEventTransformationForSubscription(t *testing.T) { Data: fmt.Sprintf(`{"msg":%q}`, eventBody), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToChannel(senderName, channelNames[0], eventToSend); err != nil { + if err := client.SendFakeEventToAddressable(senderName, channelNames[0], channelTypeMeta, eventToSend); err != nil { st.Fatalf("Failed to send fake CloudEvent to the channel %q", channelNames[0]) } diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index 138a804683b..cfa0827e647 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -47,13 +47,14 @@ func singleEvent(t *testing.T, encoding string) { subscriptionName := "e2e-singleevent-subscription-" + encoding loggerPodName := "e2e-singleevent-logger-pod-" + encoding - RunTests(t, common.FeatureBasic, func(st *testing.T, provisioner string) { + RunTests(t, common.FeatureBasic, func(st *testing.T, provisioner string, isCRD bool) { st.Logf("Run test with provisioner %q", provisioner) client := Setup(st, true) defer TearDown(client) // create channel - client.CreateChannelOrFail(channelName, provisioner) + channelTypeMeta := getChannelTypeMeta(provisioner, isCRD) + client.CreateChannelOrFail(channelName, channelTypeMeta, provisioner) // create logger service as the subscriber pod := base.EventLoggerPod(loggerPodName) @@ -63,6 +64,7 @@ func singleEvent(t *testing.T, encoding string) { client.CreateSubscriptionOrFail( subscriptionName, channelName, + channelTypeMeta, base.WithSubscriberForSubscription(loggerPodName), ) @@ -79,7 +81,8 @@ func singleEvent(t *testing.T, encoding string) { Data: fmt.Sprintf(`{"msg":%q}`, body), Encoding: encoding, } - if err := client.SendFakeEventToChannel(senderName, channelName, event); err != nil { + + if err := client.SendFakeEventToAddressable(senderName, channelName, channelTypeMeta, event); err != nil { st.Fatalf("Failed to send fake CloudEvent to the channel %q", channelName) } diff --git a/test/e2e/test_runner.go b/test/e2e/test_runner.go index b13e16b336a..c9995ef3161 100644 --- a/test/e2e/test_runner.go +++ b/test/e2e/test_runner.go @@ -42,20 +42,26 @@ import ( // RunTests will use all provisioners that support the given feature, to run // a test for the testFunc. -func RunTests(t *testing.T, feature common.Feature, testFunc func(st *testing.T, provisioner string)) { +func RunTests(t *testing.T, feature common.Feature, testFunc func(st *testing.T, provisioner string, isCRD bool)) { t.Parallel() for _, provisioner := range test.EventingFlags.Provisioners { - supportedFeatures := common.ValidProvisionersMap[provisioner] - if contains(supportedFeatures, feature) { - t.Run(t.Name()+"-"+provisioner, func(st *testing.T) { - testFunc(st, provisioner) + channelConfig := common.ValidProvisionersMap[provisioner] + if contains(channelConfig.Features, feature) { + t.Run(fmt.Sprintf("%s-%s", t.Name(), provisioner), func(st *testing.T) { + testFunc(st, provisioner, false) }) + + if channelConfig.CRDSupported { + t.Run(fmt.Sprintf("%s-crd-%s", t.Name(), provisioner), func(st *testing.T) { + testFunc(st, provisioner, true) + }) + } } } } // Setup creates the client objects needed in the e2e tests, -// and does other setups, like creating namespaces, run the test case in parallel, etc. +// and does other setups, like creating namespaces, set the test case to run in parallel, etc. func Setup(t *testing.T, runInParallel bool) *common.Client { // Create a new namespace to run this test case. baseFuncName := getBaseFuncName(t.Name()) @@ -107,6 +113,17 @@ func contains(features []common.Feature, feature common.Feature) bool { return false } +// Get the actual typemeta of the Channel type. +// TODO(Fredy-Z): This function is a workaround when there are both provisioner and Channel CRD in this repo. +// It needs to be removed when the provisioner implementation is removed. +func getChannelTypeMeta(provisioner string, isCRD bool) *metav1.TypeMeta { + channelTypeMeta := common.ChannelTypeMeta + if isCRD { + channelTypeMeta = common.OperatorChannelMap[provisioner] + } + return channelTypeMeta +} + // CreateNamespaceIfNeeded creates a new namespace if it does not exist. func CreateNamespaceIfNeeded(t *testing.T, client *common.Client, namespace string) { nsSpec, err := client.Kube.Kube.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{})