diff --git a/Gopkg.lock b/Gopkg.lock index 6d0c8a933cd..53f2f847c47 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1420,7 +1420,9 @@ "github.com/Shopify/sarama", "github.com/bsm/sarama-cluster", "github.com/cloudevents/sdk-go", + "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", + "github.com/cloudevents/sdk-go/pkg/cloudevents/types", "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/uuid", diff --git a/test/base/constants.go b/test/base/constants.go new file mode 100644 index 00000000000..1bc9207bdac --- /dev/null +++ b/test/base/constants.go @@ -0,0 +1,75 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import "github.com/knative/eventing/pkg/reconciler/namespace/resources" + +const ( + // DefaultBrokerName is the name of the Broker that is automatically created after the current namespace is labeled. + DefaultBrokerName = resources.DefaultBrokerName + + // InMemoryProvisioner is the in-memory provisioner, which is also the default one. + InMemoryProvisioner = "in-memory" + // GCPPubSubProvisioner is the gcp-pubsub provisioner, which is under contrib/gcppubsub. + GCPPubSubProvisioner = "gcp-pubsub" + // KafkaProvisioner is the kafka provisioner, which is under contrib/kafka. + KafkaProvisioner = "kafka" + // NatssProvisioner is the natss provisioner, which is under contrib/natss + NatssProvisioner = "natss" +) + +// API versions for the resources. +const ( + EventingAPIVersion = "eventing.knative.dev/v1alpha1" + SourcesAPIVersion = "sources.eventing.knative.dev/v1alpha1" + MessagingAPIVersion = "messaging.knative.dev/v1alpha1" +) + +// kind for eventing resources. +const ( + // ChannelKind is the kind for Channel. + ChannelKind string = "Channel" + // SubscriptionKind is the kind for Subscription. + SubscriptionKind string = "Subscription" + // ClusterChannelProvisionerKind is the kind for ClusterChannelProvisioner. + ClusterChannelProvisionerKind string = "ClusterChannelProvisioner" + + // BrokerKind is the kind for Broker. + BrokerKind string = "Broker" + // TriggerKind is the kind for Trigger. + TriggerKind string = "Trigger" +) + +// kind for messaging resources. +const ( + // InMemoryChannelKind is the kind for InMemoryChannel. + InMemoryChannelKind string = "InMemoryChannel" + // KafkaChannelKind is the kind for KafkaChannel. + KafkaChannelKind string = "KafkaChannel" + // NatssChannelKind string = "NatssChannel" + // GCPPubSubChannelKind string = "GCPPubSubChannel" +) + +// kind for sources resources. +const ( + // CronJobSourceKind is the kind for CronJobSource. + CronJobSourceKind string = "CronJobSource" + // ContainerSourceKind is the kind for ContainerSource. + ContainerSourceKind string = "ContainerSource" + // ApiServerSourceKind is the kind for ApiServerSource. + ApiServerSourceKind string = "ApiServerSource" +) diff --git a/test/base/generics.go b/test/base/generics.go index ede044bf141..40dcf205521 100644 --- a/test/base/generics.go +++ b/test/base/generics.go @@ -21,8 +21,11 @@ import ( "github.com/knative/pkg/apis/duck" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" ) // MetaResource includes necessary meta data to retrieve the generic Kubernetes resource. @@ -31,27 +34,66 @@ type MetaResource struct { metav1.ObjectMeta `json:"metadata,omitempty"` } -// Meta returns a MetaResource built from the given name, namespace and kind. -func Meta(name, namespace, kind string) *MetaResource { +// MetaResourceList includes necessary meta data to retrieve the generic Kubernetes resource list. +type MetaResourceList struct { + metav1.TypeMeta `json:",inline"` + Namespace string +} + +// NewMetaResource returns a MetaResource built from the given name, namespace and typemeta. +func NewMetaResource(name, namespace string, typemeta *metav1.TypeMeta) *MetaResource { return &MetaResource{ + TypeMeta: *typemeta, ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, Name: name, }, - TypeMeta: metav1.TypeMeta{ - Kind: kind, - APIVersion: EventingAPIVersion, - }, + } +} + +// NewMetaResourceList returns a MetaResourceList built from the given namespace and typemeta. +func NewMetaResourceList(namespace string, typemeta *metav1.TypeMeta) *MetaResourceList { + return &MetaResourceList{ + TypeMeta: *typemeta, + Namespace: namespace, } } // GetGenericObject returns a generic object representing a Kubernetes resource. // Callers can cast this returned object to other objects that implement the corresponding duck-type. -func GetGenericObject(dynamicClient dynamic.Interface, obj *MetaResource, rtype apis.Listable) (runtime.Object, error) { - // get the resource's name, namespace and gvr - name := obj.Name - namespace := obj.Namespace - gvk := obj.GroupVersionKind() +func GetGenericObject( + dynamicClient dynamic.Interface, + obj *MetaResource, + rtype apis.Listable, +) (runtime.Object, error) { + lister, err := getGenericLister(dynamicClient, obj.GroupVersionKind(), obj.Namespace, rtype) + if err != nil { + return nil, err + } + return lister.Get(obj.Name) +} + +// GetGenericObjectList returns a generic object list representing a list of Kubernetes resource. +func GetGenericObjectList( + dynamicClient dynamic.Interface, + objList *MetaResourceList, + rtype apis.Listable, +) ([]runtime.Object, error) { + lister, err := getGenericLister(dynamicClient, objList.GroupVersionKind(), objList.Namespace, rtype) + if err != nil { + return nil, err + } + return lister.List(labels.Everything()) +} + +// getGenericLister returns a GenericNamespacedLister, which can be used to get resources in the namespace. +func getGenericLister( + dynamicClient dynamic.Interface, + gvk schema.GroupVersionKind, + namespace string, + rtype apis.Listable, +) (cache.GenericNamespaceLister, error) { + // get the resource's namespace and gvr gvr, _ := meta.UnsafeGuessKindToResource(gvk) stopChannel := make(chan struct{}) @@ -63,5 +105,5 @@ func GetGenericObject(dynamicClient dynamic.Interface, obj *MetaResource, rtype if err != nil { return nil, err } - return lister.ByNamespace(namespace).Get(name) + return lister.ByNamespace(namespace), nil } diff --git a/test/base/resource_checks.go b/test/base/resource_checks.go index a00376ee7ce..c5cd459fbbf 100644 --- a/test/base/resource_checks.go +++ b/test/base/resource_checks.go @@ -28,6 +28,7 @@ import ( duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" "go.opencensus.io/trace" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" ) @@ -44,18 +45,35 @@ const ( // name the metric that is emitted to track how long it took for // the resource to get into the state checked by isResourceReady. func WaitForResourceReady(dynamicClient dynamic.Interface, obj *MetaResource) error { - metricName := fmt.Sprintf("WaitForResourceReady/%s", obj.Name) + metricName := fmt.Sprintf("WaitForResourceReady/%s/%s", obj.Namespace, obj.Name) _, span := trace.StartSpan(context.Background(), metricName) defer span.End() return wait.PollImmediate(interval, timeout, func() (bool, error) { - return isResourceReady(dynamicClient, obj) + untyped, err := GetGenericObject(dynamicClient, obj, &duckv1beta1.KResource{}) + return isResourceReady(untyped, err) + }) +} + +// WaitForResourcesReady waits until all the specified resources in the given namespace are ready. +func WaitForResourcesReady(dynamicClient dynamic.Interface, objList *MetaResourceList) error { + metricName := fmt.Sprintf("WaitForResourcesReady/%s", objList.Namespace) + _, span := trace.StartSpan(context.Background(), metricName) + defer span.End() + + return wait.PollImmediate(interval, timeout, func() (bool, error) { + untypeds, err := GetGenericObjectList(dynamicClient, objList, &duckv1beta1.KResource{}) + for _, untyped := range untypeds { + if isReady, err := isResourceReady(untyped, err); !isReady { + return isReady, err + } + } + return true, nil }) } // isResourceReady leverage duck-type to check if the given MetaResource is in ready state -func isResourceReady(dynamicClient dynamic.Interface, obj *MetaResource) (bool, error) { - untyped, err := GetGenericObject(dynamicClient, obj, &duckv1beta1.KResource{}) +func isResourceReady(obj runtime.Object, err error) (bool, error) { if k8serrors.IsNotFound(err) { // Return false as we are not done yet. // We swallow the error to keep on polling. @@ -66,6 +84,6 @@ func isResourceReady(dynamicClient dynamic.Interface, obj *MetaResource) (bool, return false, err } - kr := untyped.(*duckv1beta1.KResource) + kr := obj.(*duckv1beta1.KResource) return kr.Status.GetCondition(apis.ConditionReady).IsTrue(), nil } diff --git a/test/base/resources.go b/test/base/resources.go index 5732d7b0cad..8e0da6fb89a 100644 --- a/test/base/resources.go +++ b/test/base/resources.go @@ -17,11 +17,13 @@ limitations under the License. package base // resources contains functions that construct Eventing CRs and other Kubernetes resources. +// TODO(Fredy-Z): break this file into multiple files when it grows too large. import ( "fmt" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" pkgTest "github.com/knative/pkg/test" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -30,38 +32,48 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" ) -const EventingAPIVersion = "eventing.knative.dev/v1alpha1" +// TriggerOption enables further configuration of a Trigger. +type TriggerOption func(*eventingv1alpha1.Trigger) + +// SubscriptionOption enables further configuration of a Subscription. +type SubscriptionOption func(*eventingv1alpha1.Subscription) + +// CronJobSourceOption enables further configuration of a CronJobSource. +type CronJobSourceOption func(*sourcesv1alpha1.CronJobSource) + +// ContainerSourceOption enables further configuration of a ContainerSource. +type ContainerSourceOption func(*sourcesv1alpha1.ContainerSource) // clusterChannelProvisioner returns a ClusterChannelProvisioner for a given name. func clusterChannelProvisioner(name string) *corev1.ObjectReference { if name == "" { return nil } - return pkgTest.CoreV1ObjectReference("ClusterChannelProvisioner", EventingAPIVersion, name) + return pkgTest.CoreV1ObjectReference(ClusterChannelProvisionerKind, EventingAPIVersion, name) } // channelRef returns an ObjectReference for a given Channel Name. func channelRef(name string) *corev1.ObjectReference { - return pkgTest.CoreV1ObjectReference("Channel", EventingAPIVersion, name) + return pkgTest.CoreV1ObjectReference(ChannelKind, EventingAPIVersion, name) } // Channel returns a Channel with the specified provisioner. -func Channel(name, provisioner string) *v1alpha1.Channel { - return &v1alpha1.Channel{ +func Channel(name, provisioner string) *eventingv1alpha1.Channel { + return &eventingv1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, - Spec: v1alpha1.ChannelSpec{ + Spec: eventingv1alpha1.ChannelSpec{ Provisioner: clusterChannelProvisioner(provisioner), }, } } // WithSubscriberForSubscription returns an option that adds a Subscriber for the given Subscription. -func WithSubscriberForSubscription(name string) func(*v1alpha1.Subscription) { - return func(s *v1alpha1.Subscription) { +func WithSubscriberForSubscription(name string) SubscriptionOption { + return func(s *eventingv1alpha1.Subscription) { if name != "" { - s.Spec.Subscriber = &v1alpha1.SubscriberSpec{ + s.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ Ref: pkgTest.CoreV1ObjectReference("Service", "v1", name), } } @@ -69,23 +81,23 @@ func WithSubscriberForSubscription(name string) func(*v1alpha1.Subscription) { } // WithReply returns an options that adds a ReplyStrategy for the given Subscription. -func WithReply(name string) func(*v1alpha1.Subscription) { - return func(s *v1alpha1.Subscription) { +func WithReply(name string) SubscriptionOption { + return func(s *eventingv1alpha1.Subscription) { if name != "" { - s.Spec.Reply = &v1alpha1.ReplyStrategy{ - Channel: pkgTest.CoreV1ObjectReference("Channel", EventingAPIVersion, name), + s.Spec.Reply = &eventingv1alpha1.ReplyStrategy{ + Channel: pkgTest.CoreV1ObjectReference(ChannelKind, EventingAPIVersion, name), } } } } // Subscription returns a Subscription. -func Subscription(name, channelName string, options ...func(*v1alpha1.Subscription)) *v1alpha1.Subscription { - subscription := &v1alpha1.Subscription{ +func Subscription(name, channelName string, options ...SubscriptionOption) *eventingv1alpha1.Subscription { + subscription := &eventingv1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, - Spec: v1alpha1.SubscriptionSpec{ + Spec: eventingv1alpha1.SubscriptionSpec{ Channel: *channelRef(channelName), }, } @@ -96,13 +108,13 @@ func Subscription(name, channelName string, options ...func(*v1alpha1.Subscripti } // Broker returns a Broker. -func Broker(name, provisioner string) *v1alpha1.Broker { - return &v1alpha1.Broker{ +func Broker(name, provisioner string) *eventingv1alpha1.Broker { + return &eventingv1alpha1.Broker{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, - Spec: v1alpha1.BrokerSpec{ - ChannelTemplate: &v1alpha1.ChannelSpec{ + Spec: eventingv1alpha1.BrokerSpec{ + ChannelTemplate: &eventingv1alpha1.ChannelSpec{ Provisioner: clusterChannelProvisioner(provisioner), }, }, @@ -110,10 +122,10 @@ func Broker(name, provisioner string) *v1alpha1.Broker { } // WithTriggerFilter returns an option that adds a TriggerFilter for the given Trigger. -func WithTriggerFilter(eventSource, eventType string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { - triggerFilter := &v1alpha1.TriggerFilter{ - SourceAndType: &v1alpha1.TriggerFilterSourceAndType{ +func WithTriggerFilter(eventSource, eventType string) TriggerOption { + return func(t *eventingv1alpha1.Trigger) { + triggerFilter := &eventingv1alpha1.TriggerFilter{ + SourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ Type: eventType, Source: eventSource, }, @@ -123,17 +135,17 @@ func WithTriggerFilter(eventSource, eventType string) func(*v1alpha1.Trigger) { } // WithBroker returns an option that adds a Broker for the given Trigger. -func WithBroker(brokerName string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { +func WithBroker(brokerName string) TriggerOption { + return func(t *eventingv1alpha1.Trigger) { t.Spec.Broker = brokerName } } // WithSubscriberRefForTrigger returns an option that adds a Subscriber Ref for the given Trigger. -func WithSubscriberRefForTrigger(name string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { +func WithSubscriberRefForTrigger(name string) TriggerOption { + return func(t *eventingv1alpha1.Trigger) { if name != "" { - t.Spec.Subscriber = &v1alpha1.SubscriberSpec{ + t.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ Ref: pkgTest.CoreV1ObjectReference("Service", "v1", name), } } @@ -141,17 +153,17 @@ func WithSubscriberRefForTrigger(name string) func(*v1alpha1.Trigger) { } // WithSubscriberURIForTrigger returns an option that adds a Subscriber URI for the given Trigger. -func WithSubscriberURIForTrigger(uri string) func(*v1alpha1.Trigger) { - return func(t *v1alpha1.Trigger) { - t.Spec.Subscriber = &v1alpha1.SubscriberSpec{ +func WithSubscriberURIForTrigger(uri string) TriggerOption { + return func(t *eventingv1alpha1.Trigger) { + t.Spec.Subscriber = &eventingv1alpha1.SubscriberSpec{ URI: &uri, } } } // Trigger returns a Trigger. -func Trigger(name string, options ...func(*v1alpha1.Trigger)) *v1alpha1.Trigger { - trigger := &v1alpha1.Trigger{ +func Trigger(name string, options ...TriggerOption) *eventingv1alpha1.Trigger { + trigger := &eventingv1alpha1.Trigger{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -162,6 +174,90 @@ func Trigger(name string, options ...func(*v1alpha1.Trigger)) *v1alpha1.Trigger return trigger } +// WithSinkServiceForCronJobSource returns an option that adds a Kubernetes Service sink for the given CronJobSource. +func WithSinkServiceForCronJobSource(name string) CronJobSourceOption { + return func(cjs *sourcesv1alpha1.CronJobSource) { + cjs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name) + } +} + +// WithServiceAccountForCronJobSource returns an option that adds a ServiceAccount for the given CronJobSource. +func WithServiceAccountForCronJobSource(saName string) CronJobSourceOption { + return func(cjs *sourcesv1alpha1.CronJobSource) { + cjs.Spec.ServiceAccountName = saName + } +} + +// CronJobSource returns a CronJob EventSource. +func CronJobSource( + name, + schedule, + data string, + options ...CronJobSourceOption, +) *sourcesv1alpha1.CronJobSource { + cronJobSource := &sourcesv1alpha1.CronJobSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: sourcesv1alpha1.CronJobSourceSpec{ + Schedule: schedule, + Data: data, + }, + } + for _, option := range options { + option(cronJobSource) + } + return cronJobSource +} + +// WithArgsForContainerSource returns an option that adds args for the given ContainerSource. +func WithArgsForContainerSource(args []string) ContainerSourceOption { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Args = args + } +} + +// WithEnvVarsForContainerSource returns an option that adds environment vars for the given ContainerSource. +func WithEnvVarsForContainerSource(envVars []corev1.EnvVar) ContainerSourceOption { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Env = envVars + } +} + +// WithServiceAccountForContainerSource returns an option that adds a ServiceAccount for the given ContainerSource. +func WithServiceAccountForContainerSource(saName string) ContainerSourceOption { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.ServiceAccountName = saName + } +} + +// WithSinkServiceForContainerSource returns an option that adds a Kubernetes Service sink for the given ContainerSource. +func WithSinkServiceForContainerSource(name string) ContainerSourceOption { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name) + } +} + +// ContainerSource returns a Container EventSource. +func ContainerSource( + name, + imageName string, + options ...ContainerSourceOption, +) *sourcesv1alpha1.ContainerSource { + containerSource := &sourcesv1alpha1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: sourcesv1alpha1.ContainerSourceSpec{ + Image: pkgTest.ImagePath(imageName), + }, + } + for _, option := range options { + option(containerSource) + } + return containerSource +} + // CloudEvent specifies the arguments for a CloudEvent sent by the sendevent // binary. type CloudEvent struct { diff --git a/test/common/config.go b/test/common/config.go index 43d31ff8de4..fb7193f4003 100644 --- a/test/common/config.go +++ b/test/common/config.go @@ -16,33 +16,20 @@ limitations under the License. package common -import "github.com/knative/eventing/pkg/reconciler/namespace/resources" +import "github.com/knative/eventing/test/base" + +// DefaultClusterChannelProvisioner is the default ClusterChannelProvisioner we will run tests against. +const DefaultClusterChannelProvisioner = base.InMemoryProvisioner // ValidProvisionersMap saves the provisioner-features mapping. // Each pair means the provisioner support the list of features. var ValidProvisionersMap = map[string][]Feature{ - InMemoryProvisioner: {FeatureBasic}, - GCPPubSubProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, - KafkaProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, - NatssProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, + base.InMemoryProvisioner: {FeatureBasic}, + base.GCPPubSubProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, + base.KafkaProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, + base.NatssProvisioner: {FeatureBasic, FeatureRedelivery, FeaturePersistence}, } -const ( - // DefaultBrokerName is the name of the Broker that is automatically created after the current namespace is labeled. - DefaultBrokerName = resources.DefaultBrokerName - // DefaultClusterChannelProvisioner is the default ClusterChannelProvisioner we will run tests against. - DefaultClusterChannelProvisioner = InMemoryProvisioner - - // InMemoryProvisioner is the in-memory provisioner, which is also the default one. - InMemoryProvisioner = "in-memory" - // GCPPubSubProvisioner is the gcp-pubsub provisioner, which is under contrib/gcppubsub. - GCPPubSubProvisioner = "gcp-pubsub" - // KafkaProvisioner is the kafka provisioner, which is under contrib/kafka. - KafkaProvisioner = "kafka" - // NatssProvisioner is the natss provisioner, which is under contrib/natss - NatssProvisioner = "natss" -) - // Feature is the feature supported by the Channel provisioner. type Feature string diff --git a/test/common/creation.go b/test/common/creation.go index 4eca1a06dc5..7b5c056551b 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -17,12 +17,13 @@ limitations under the License. package common import ( - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/test/base" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" ) +// TODO(Fredy-Z): break this file into multiple files when it grows too large. + var coreAPIGroup = corev1.SchemeGroupVersion.Group var coreAPIVersion = corev1.SchemeGroupVersion.Version var rbacAPIGroup = rbacv1.SchemeGroupVersion.Group @@ -50,7 +51,7 @@ func (client *Client) CreateChannelsOrFail(names []string, provisionerName strin } // CreateSubscriptionOrFail will create a Subscription. -func (client *Client) CreateSubscriptionOrFail(name, channelName string, options ...func(*v1alpha1.Subscription)) { +func (client *Client) CreateSubscriptionOrFail(name, channelName string, options ...base.SubscriptionOption) { namespace := client.Namespace subscription := base.Subscription(name, channelName, options...) @@ -64,7 +65,7 @@ func (client *Client) CreateSubscriptionOrFail(name, channelName string, options } // CreateSubscriptionsOrFail will create a list of Subscriptions with the same configuration except the name. -func (client *Client) CreateSubscriptionsOrFail(names []string, channelName string, options ...func(*v1alpha1.Subscription)) { +func (client *Client) CreateSubscriptionsOrFail(names []string, channelName string, options ...base.SubscriptionOption) { for _, name := range names { client.CreateSubscriptionOrFail(name, channelName, options...) } @@ -92,7 +93,7 @@ func (client *Client) CreateBrokersOrFail(names []string, provisionerName string } // CreateTriggerOrFail will create a Trigger. -func (client *Client) CreateTriggerOrFail(name string, options ...func(*v1alpha1.Trigger)) { +func (client *Client) CreateTriggerOrFail(name string, options ...base.TriggerOption) { namespace := client.Namespace trigger := base.Trigger(name, options...) @@ -105,6 +106,43 @@ func (client *Client) CreateTriggerOrFail(name string, options ...func(*v1alpha1 client.Cleaner.AddObj(trigger) } +// CreateCronJobSourceOrFail will create a CronJobSource. +func (client *Client) CreateCronJobSourceOrFail( + name, + schedule, + data string, + options ...base.CronJobSourceOption, +) { + namespace := client.Namespace + cronJobSource := base.CronJobSource(name, schedule, data, options...) + + cronJobSources := client.Eventing.SourcesV1alpha1().CronJobSources(namespace) + // update cronJobSource with the new reference + cronJobSource, err := cronJobSources.Create(cronJobSource) + if err != nil { + client.T.Fatalf("Failed to create cronjobsource %q: %v", name, err) + } + client.Cleaner.AddObj(cronJobSource) +} + +// CreateContainerSourceOrFail will create a ContainerSource. +func (client *Client) CreateContainerSourceOrFail( + name, + imageName string, + options ...base.ContainerSourceOption, +) { + namespace := client.Namespace + containerSource := base.ContainerSource(name, imageName, options...) + + containerSources := client.Eventing.SourcesV1alpha1().ContainerSources(namespace) + // update containerSource with the new reference + containerSource, err := containerSources.Create(containerSource) + if err != nil { + client.T.Fatalf("Failed to create containersource %q: %v", name, err) + } + client.Cleaner.AddObj(containerSource) +} + // WithService returns an option that creates a Service binded with the given pod. func WithService(name string) func(*corev1.Pod, *Client) error { return func(pod *corev1.Pod, client *Client) error { diff --git a/test/common/operation.go b/test/common/operation.go index f8eb07a5bd1..1d778e1bdb0 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -41,47 +41,36 @@ func (client *Client) LabelNamespace(labels map[string]string) error { return err } -// SendFakeEventToChannel will send the given event to the given channel. -func (client *Client) SendFakeEventToChannel(senderName, channelName string, event *base.CloudEvent) error { - url, err := client.GetChannelURL(channelName) - if err != nil { - return err - } - return client.sendFakeEventToAddress(senderName, url, event) -} - -// GetChannelURL will return the url for the given channel. -func (client *Client) GetChannelURL(name string) (string, error) { - namespace := client.Namespace - channelMeta := base.Meta(name, namespace, "Channel") - return base.GetAddressableURI(client.Dynamic, channelMeta) -} - -// SendFakeEventToBroker will send the given event to the given broker. -func (client *Client) SendFakeEventToBroker(senderName, brokerName string, event *base.CloudEvent) error { - url, err := client.GetBrokerURL(brokerName) +// SendFakeEventToAddressable will send the given event to the given Addressable. +func (client *Client) SendFakeEventToAddressable( + senderName, + addressableName string, + typemeta *metav1.TypeMeta, + event *base.CloudEvent, +) error { + uri, err := client.GetAddressableURI(addressableName, typemeta) if err != nil { return err } - return client.sendFakeEventToAddress(senderName, url, event) + return client.sendFakeEventToAddress(senderName, uri, event) } -// GetBrokerURL will return the url for the given broker. -func (client *Client) GetBrokerURL(name string) (string, error) { +// GetAddressableURI returns the URI of the addressable resource. +// To use this function, the given resource must have implemented the Addressable duck-type. +func (client *Client) GetAddressableURI(addressableName string, typemeta *metav1.TypeMeta) (string, error) { namespace := client.Namespace - brokerMeta := base.Meta(name, namespace, "Broker") - return base.GetAddressableURI(client.Dynamic, brokerMeta) + metaAddressable := base.NewMetaResource(addressableName, namespace, typemeta) + return base.GetAddressableURI(client.Dynamic, metaAddressable) } // sendFakeEventToAddress will create a sender pod, which will send the given event to the given url. func (client *Client) sendFakeEventToAddress( senderName string, - url string, + uri string, event *base.CloudEvent, ) error { namespace := client.Namespace - client.T.Logf("Sending fake CloudEvent") - pod := base.EventSenderPod(senderName, url, event) + pod := base.EventSenderPod(senderName, uri, event) client.CreatePodOrFail(pod) if err := pkgTest.WaitForPodRunning(client.Kube, senderName, namespace); err != nil { return err @@ -89,121 +78,44 @@ func (client *Client) sendFakeEventToAddress( return nil } -// WaitForBrokerReady waits until the broker is Ready. -func (client *Client) WaitForBrokerReady(name string) error { - namespace := client.Namespace - brokerMeta := base.Meta(name, namespace, "Broker") - if err := base.WaitForResourceReady(client.Dynamic, brokerMeta); err != nil { - return err - } - return nil -} - -// WaitForBrokersReady waits until all brokers in the namespace are Ready. -func (client *Client) WaitForBrokersReady() error { - namespace := client.Namespace - brokers, err := client.Eventing.EventingV1alpha1().Brokers(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, broker := range brokers.Items { - if err := client.WaitForBrokerReady(broker.Name); err != nil { - return err - } - } - return nil -} - -// WaitForTriggerReady waits until the trigger is Ready. -func (client *Client) WaitForTriggerReady(name string) error { - namespace := client.Namespace - triggerMeta := base.Meta(name, namespace, "Trigger") - if err := base.WaitForResourceReady(client.Dynamic, triggerMeta); err != nil { - return err - } - return nil -} - -// WaitForTriggersReady waits until all triggers in the namespace are Ready. -func (client *Client) WaitForTriggersReady() error { +// WaitForResourceReady waits for the resource to become ready. +// To use this function, the given resource must have implemented the Status duck-type. +func (client *Client) WaitForResourceReady(name string, typemeta *metav1.TypeMeta) error { namespace := client.Namespace - triggers, err := client.Eventing.EventingV1alpha1().Triggers(namespace).List(metav1.ListOptions{}) - if err != nil { + metaResource := base.NewMetaResource(name, namespace, typemeta) + if err := base.WaitForResourceReady(client.Dynamic, metaResource); err != nil { return err } - for _, trigger := range triggers.Items { - if err := client.WaitForTriggerReady(trigger.Name); err != nil { - return err - } - } return nil } -// WaitForChannelReady waits until the channel is Ready. -func (client *Client) WaitForChannelReady(name string) error { +// WaitForResourcesReady waits for resources of the given type in the namespace to become ready. +// To use this function, the given resource must have implemented the Status duck-type. +func (client *Client) WaitForResourcesReady(typemeta *metav1.TypeMeta) error { namespace := client.Namespace - channelMeta := base.Meta(name, namespace, "Channel") - if err := base.WaitForResourceReady(client.Dynamic, channelMeta); err != nil { + metaResourceList := base.NewMetaResourceList(namespace, typemeta) + if err := base.WaitForResourcesReady(client.Dynamic, metaResourceList); err != nil { return err } return nil } -// WaitForChannelsReady waits until all channels in the namespace are Ready. -func (client *Client) WaitForChannelsReady() error { - namespace := client.Namespace - channels, err := client.Eventing.EventingV1alpha1().Channels(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, channel := range channels.Items { - if err := client.WaitForChannelReady(channel.Name); err != nil { - return err - } - } - return nil -} - -// WaitForSubscriptionReady waits until the subscription is Ready. -func (client *Client) WaitForSubscriptionReady(name string) error { - namespace := client.Namespace - subscriptionMeta := base.Meta(name, namespace, "Subscription") - if err := base.WaitForResourceReady(client.Dynamic, subscriptionMeta); err != nil { - return err - } - return nil -} - -// WaitForSubscriptionsReady waits until all subscriptions in the namespace are Ready. -func (client *Client) WaitForSubscriptionsReady() error { - namespace := client.Namespace - subscriptions, err := client.Eventing.EventingV1alpha1().Subscriptions(namespace).List(metav1.ListOptions{}) - if err != nil { - return err - } - for _, subscription := range subscriptions.Items { - if err := client.WaitForSubscriptionReady(subscription.Name); err != nil { - return err - } - } - return nil -} - // WaitForAllTestResourcesReady waits until all test resources in the namespace are Ready. -// Currently the test resources include Pod, Channel, Subscription, Broker and Trigger. -// If there are new resources, this function needs to be changed. +// If there is a new resource, its TypeMeta needs to be added into the list. +// TODO(Fredy-Z): make this function more generic by only checking existed resources in the current namespace. func (client *Client) WaitForAllTestResourcesReady() error { - if err := client.WaitForChannelsReady(); err != nil { - return err - } - if err := client.WaitForSubscriptionsReady(); err != nil { - return err - } - if err := client.WaitForBrokersReady(); err != nil { - return err - } - if err := client.WaitForTriggersReady(); err != nil { - return err + typemetas := []*metav1.TypeMeta{ + ChannelTypeMeta, + SubscriptionTypeMeta, + BrokerTypeMeta, + TriggerTypeMeta, + CronJobSourceTypeMeta, + ContainerSourceTypeMeta, + } + for _, typemeta := range typemetas { + if err := client.WaitForResourcesReady(typemeta); err != nil { + return err + } } if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil { return err diff --git a/test/common/typemeta.go b/test/common/typemeta.go new file mode 100644 index 00000000000..db900b336bb --- /dev/null +++ b/test/common/typemeta.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "github.com/knative/eventing/test/base" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ChannelTypeMeta is the TypeMeta ref for Channel. +var ChannelTypeMeta = EventingTypeMeta(base.ChannelKind) + +// SubscriptionTypeMeta is the TypeMeta ref for Subscription. +var SubscriptionTypeMeta = EventingTypeMeta(base.SubscriptionKind) + +// BrokerTypeMeta is the TypeMeta ref for Broker. +var BrokerTypeMeta = EventingTypeMeta(base.BrokerKind) + +// TriggerTypeMeta is the TypeMeta ref for Trigger. +var TriggerTypeMeta = EventingTypeMeta(base.TriggerKind) + +// EventingTypeMeta returns the TypeMeta ref for an eventing resource. +func EventingTypeMeta(kind string) *metav1.TypeMeta { + return &metav1.TypeMeta{ + Kind: kind, + APIVersion: base.EventingAPIVersion, + } +} + +// CronJobSourceTypeMeta is the TypeMeta ref for CronJobSource. +var CronJobSourceTypeMeta = SourcesTypeMeta(base.CronJobSourceKind) + +// ContainerSourceTypeMeta is the TypeMeta ref for ContainerSource. +var ContainerSourceTypeMeta = SourcesTypeMeta(base.ContainerSourceKind) + +// SourcesTypeMeta returns the TypeMeta ref for an eventing sources resource. +func SourcesTypeMeta(kind string) *metav1.TypeMeta { + return &metav1.TypeMeta{ + Kind: kind, + APIVersion: base.SourcesAPIVersion, + } +} + +// KafkaChannelTypeMeta is the TypeMeta ref for KafkaChannel. +var KafkaChannelTypeMeta = MessagingTypeMeta(base.KafkaChannelKind) + +// MessagingTypeMeta returns the TypeMeta ref for an eventing messaing resource. +func MessagingTypeMeta(kind string) *metav1.TypeMeta { + return &metav1.TypeMeta{ + Kind: kind, + APIVersion: base.MessagingAPIVersion, + } +} diff --git a/test/common/validation.go b/test/common/validation.go index 4d2cbaca62c..f375c86fc4d 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -27,7 +27,7 @@ import ( const ( // The interval and timeout used for polling pod logs. interval = 1 * time.Second - timeout = 2 * time.Minute + timeout = 4 * time.Minute ) // CheckLog waits until logs for the logger Pod satisfy the checker. @@ -66,13 +66,20 @@ func CheckerContainsAll(contents []string) func(string) bool { } } -// CheckerContainsCount returns a checker functions to check if the log contains the count number of given content. +// CheckerContainsCount returns a checker function to check if the log contains the count number of given content. func CheckerContainsCount(content string, count int) func(string) bool { return func(log string) bool { return strings.Count(log, content) == count } } +// CheckerContainsAtLeast returns a checker function to check if the log contains at least the count number of given content. +func CheckerContainsAtLeast(content string, count int) func(string) bool { + return func(log string) bool { + return strings.Count(log, content) >= count + } +} + // FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents. // It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true. func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) { diff --git a/test/e2e/broker_channel_flow_test.go b/test/e2e/broker_channel_flow_test.go index f4807d59407..b38c33dee88 100644 --- a/test/e2e/broker_channel_flow_test.go +++ b/test/e2e/broker_channel_flow_test.go @@ -90,7 +90,7 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { // create a new broker client.CreateBrokerOrFail(brokerName, provisioner) - client.WaitForBrokerReady(brokerName) + client.WaitForResourceReady(brokerName, common.BrokerTypeMeta) // create the event we want to transform to transformedEventBody := fmt.Sprintf("%s %s", eventBody, string(uuid.NewUUID())) @@ -127,10 +127,10 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { // create channel for trigger3 client.CreateChannelOrFail(channelName, provisioner) - client.WaitForChannelReady(channelName) + client.WaitForResourceReady(channelName, common.ChannelTypeMeta) // create trigger3 to receive the transformed event, and send it to the channel - channelURL, err := client.GetChannelURL(channelName) + channelURL, err := client.GetAddressableURI(channelName, common.ChannelTypeMeta) if err != nil { t.Fatalf("Failed to get the url for the channel %q: %v", channelName, err) } @@ -164,7 +164,7 @@ func testBrokerChannelFlow(t *testing.T, provisioner string) { Data: fmt.Sprintf(`{"msg":%q}`, eventBody), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToBroker(senderName, brokerName, eventToSend); err != nil { + if err := client.SendFakeEventToAddressable(senderName, brokerName, common.BrokerTypeMeta, eventToSend); err != nil { t.Fatalf("Failed to send fake CloudEvent to the broker %q", brokerName) } diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index f70b4ba5412..fb037220877 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -64,7 +64,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } // Wait for default broker ready. - if err := client.WaitForBrokerReady(common.DefaultBrokerName); err != nil { + if err := client.WaitForResourceReady(base.DefaultBrokerName, common.BrokerTypeMeta); err != nil { t.Fatalf("Error waiting for default broker to become ready: %v", err) } @@ -121,7 +121,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } // Create sender pod. senderPodName := name("sender", eventToSend.Type, eventToSend.Source) - if err := client.SendFakeEventToBroker(senderPodName, common.DefaultBrokerName, cloudEvent); err != nil { + if err := client.SendFakeEventToAddressable(senderPodName, base.DefaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { t.Fatalf("Error send cloud event to broker: %v", err) } diff --git a/test/e2e/broker_event_transformation_test.go b/test/e2e/broker_event_transformation_test.go index d9ec7a52c90..72a8b7a0565 100644 --- a/test/e2e/broker_event_transformation_test.go +++ b/test/e2e/broker_event_transformation_test.go @@ -79,7 +79,7 @@ func testEventTransformationForTrigger(t *testing.T, provisioner string) { // create a new broker client.CreateBrokerOrFail(brokerName, provisioner) - client.WaitForBrokerReady(brokerName) + client.WaitForResourceReady(brokerName, common.BrokerTypeMeta) // create the event we want to transform to transformedEventBody := fmt.Sprintf("%s %s", eventBody, string(uuid.NewUUID())) @@ -126,7 +126,7 @@ func testEventTransformationForTrigger(t *testing.T, provisioner string) { Data: fmt.Sprintf(`{"msg":%q}`, eventBody), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToBroker(senderName, brokerName, eventToSend); err != nil { + if err := client.SendFakeEventToAddressable(senderName, brokerName, common.BrokerTypeMeta, eventToSend); err != nil { t.Fatalf("Failed to send fake CloudEvent to the broker %q", brokerName) } diff --git a/test/e2e/channel_chain_test.go b/test/e2e/channel_chain_test.go index 2446ba68b29..06c4dc3ee3d 100644 --- a/test/e2e/channel_chain_test.go +++ b/test/e2e/channel_chain_test.go @@ -53,7 +53,7 @@ func testChannelChain(t *testing.T, provisioner string) { // create channels client.CreateChannelsOrFail(channelNames, provisioner) - client.WaitForChannelsReady() + client.WaitForResourcesReady(common.ChannelTypeMeta) // create loggerPod and expose it as a service pod := base.EventLoggerPod(loggerPodName) @@ -77,7 +77,7 @@ func testChannelChain(t *testing.T, provisioner string) { Data: fmt.Sprintf(`{"msg":%q}`, body), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToChannel(senderName, channelNames[0], event); err != nil { + if err := client.SendFakeEventToAddressable(senderName, channelNames[0], common.ChannelTypeMeta, event); err != nil { t.Fatalf("Failed to send fake CloudEvent to the channel %q", channelNames[0]) } diff --git a/test/e2e/channel_event_transformation_test.go b/test/e2e/channel_event_transformation_test.go index e405f9dbba6..8bc4be0a8b9 100644 --- a/test/e2e/channel_event_transformation_test.go +++ b/test/e2e/channel_event_transformation_test.go @@ -54,7 +54,7 @@ func TestEventTransformationForSubscription(t *testing.T) { // create channels client.CreateChannelsOrFail(channelNames, provisioner) - client.WaitForChannelsReady() + client.WaitForResourcesReady(common.ChannelTypeMeta) // create transformation pod and service transformedEventBody := fmt.Sprintf("eventBody %s", uuid.NewUUID()) @@ -98,7 +98,7 @@ func TestEventTransformationForSubscription(t *testing.T) { Data: fmt.Sprintf(`{"msg":%q}`, eventBody), Encoding: base.CloudEventDefaultEncoding, } - if err := client.SendFakeEventToChannel(senderName, channelNames[0], eventToSend); err != nil { + if err := client.SendFakeEventToAddressable(senderName, channelNames[0], common.ChannelTypeMeta, eventToSend); err != nil { st.Fatalf("Failed to send fake CloudEvent to the channel %q", channelNames[0]) } diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index 138a804683b..d2c05612f30 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -79,7 +79,7 @@ func singleEvent(t *testing.T, encoding string) { Data: fmt.Sprintf(`{"msg":%q}`, body), Encoding: encoding, } - if err := client.SendFakeEventToChannel(senderName, channelName, event); err != nil { + if err := client.SendFakeEventToAddressable(senderName, channelName, common.ChannelTypeMeta, event); err != nil { st.Fatalf("Failed to send fake CloudEvent to the channel %q", channelName) } diff --git a/test/e2e/source_container_test.go b/test/e2e/source_container_test.go new file mode 100644 index 00000000000..c42fc3b4281 --- /dev/null +++ b/test/e2e/source_container_test.go @@ -0,0 +1,76 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/test/base" + "github.com/knative/eventing/test/common" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/uuid" +) + +func TestContainerSource(t *testing.T) { + const ( + containerSourceName = "e2e-container-source" + // the heartbeats image is built from test_images/heartbeats + imageName = "heartbeats" + + loggerPodName = "e2e-container-source-logger-pod" + ) + + client := Setup(t, true) + defer TearDown(client) + + // create event logger pod and service + loggerPod := base.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + data := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID()) + // args are the arguments passing to the container, msg is used in the heartbeats image + args := []string{"--msg=" + data} + // envVars are the environment variables passing to the container + envVars := []corev1.EnvVar{ + corev1.EnvVar{ + Name: "POD_NAME", + Value: "e2e-container-source-pod", + }, + corev1.EnvVar{ + Name: "POD_NAMESPACE", + Value: client.Namespace, + }, + } + // create container source + sinkOption := base.WithSinkServiceForContainerSource(loggerPodName) + argsOption := base.WithArgsForContainerSource(args) + envVarOption := base.WithEnvVarsForContainerSource(envVars) + client.CreateContainerSourceOrFail(containerSourceName, imageName, argsOption, sinkOption, envVarOption) + + // wait for all test resources to be ready + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // verify the logger service receives the event + expectedCount := 2 + if err := client.CheckLog(loggerPodName, common.CheckerContainsAtLeast(data, expectedCount)); err != nil { + t.Fatalf("String %q does not appear at least %d times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) + } +} diff --git a/test/e2e/source_cron_job_test.go b/test/e2e/source_cron_job_test.go new file mode 100644 index 00000000000..70936a93fa8 --- /dev/null +++ b/test/e2e/source_cron_job_test.go @@ -0,0 +1,59 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/test/base" + "github.com/knative/eventing/test/common" + "k8s.io/apimachinery/pkg/util/uuid" +) + +func TestCronJobSource(t *testing.T) { + const ( + cronJobSourceName = "e2e-cron-job-source" + // Every 1 minute starting from now + schedule = "*/1 * * * *" + + loggerPodName = "e2e-cron-job-source-logger-pod" + ) + + client := Setup(t, true) + defer TearDown(client) + + // create event logger pod and service + loggerPod := base.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + // create cron job source + data := fmt.Sprintf("TestCronJobSource %s", uuid.NewUUID()) + sinkOption := base.WithSinkServiceForCronJobSource(loggerPodName) + client.CreateCronJobSourceOrFail(cronJobSourceName, schedule, data, sinkOption) + + // wait for all test resources to be ready + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // verify the logger service receives the event + if err := client.CheckLog(loggerPodName, common.CheckerContains(data)); err != nil { + t.Fatalf("String %q not found in logs of logger pod %q: %v", data, loggerPodName, err) + } +} diff --git a/test/test_images/heartbeats/main.go b/test/test_images/heartbeats/main.go new file mode 100644 index 00000000000..8c1b6d9abca --- /dev/null +++ b/test/test_images/heartbeats/main.go @@ -0,0 +1,120 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/knative/eventing/pkg/kncloudevents" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/kelseyhightower/envconfig" +) + +type Heartbeat struct { + Sequence int `json:"id"` + Msg string `json:"msg"` +} + +var ( + sink string + msg string + periodStr string +) + +func init() { + flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") + flag.StringVar(&msg, "msg", "", "the data message") + flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") +} + +type envConfig struct { + // Sink URL where to send heartbeat cloudevents + Sink string `envconfig:"SINK"` + + // Name of this pod. + Name string `envconfig:"POD_NAME" required:"true"` + + // Namespace this pod exists in. + Namespace string `envconfig:"POD_NAMESPACE" required:"true"` +} + +func main() { + flag.Parse() + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + + if env.Sink != "" { + sink = env.Sink + } + + c, err := kncloudevents.NewDefaultClient(sink) + if err != nil { + log.Fatalf("failed to create client: %s", err.Error()) + } + + var period time.Duration + if p, err := strconv.Atoi(periodStr); err != nil { + period = time.Duration(5) * time.Second + } else { + period = time.Duration(p) * time.Second + } + + source := types.ParseURLRef( + fmt.Sprintf("https://github.com/knative/eventing/test/heartbeats/#%s/%s", env.Namespace, env.Name)) + log.Printf("Heartbeats Source: %s", source) + + hb := &Heartbeat{ + Sequence: 0, + Msg: msg, + } + ticker := time.NewTicker(period) + for { + hb.Sequence++ + + event := cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.eventing.samples.heartbeat", + Source: *source, + Extensions: map[string]interface{}{ + "the": 42, + "heart": "yes", + "beats": true, + }, + }.AsV02(), + Data: hb, + } + + log.Printf("sending cloudevent to %s", sink) + if _, err := c.Send(context.Background(), event); err != nil { + log.Printf("failed to send cloudevent: %s", err.Error()) + } + // Wait for next tick + <-ticker.C + } +}