diff --git a/Gopkg.lock b/Gopkg.lock index 3e4923fdb87..730d5821578 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1420,7 +1420,9 @@ "github.com/Shopify/sarama", "github.com/bsm/sarama-cluster", "github.com/cloudevents/sdk-go", + "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", + "github.com/cloudevents/sdk-go/pkg/cloudevents/types", "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/uuid", diff --git a/test/base/resources.go b/test/base/resources.go index 88591a805b4..eafaee2f3bf 100644 --- a/test/base/resources.go +++ b/test/base/resources.go @@ -204,6 +204,73 @@ func CronJobSource( return cronJobSource } +// WithTemplateForContainerSource returns an option that adds a template for the given ContainerSource. +func WithTemplateForContainerSource(template *corev1.PodTemplateSpec) func(*sourcesv1alpha1.ContainerSource) { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Template = template + } +} + +// WithSinkServiceForContainerSource returns an option that adds a Kubernetes Service sink for the given ContainerSource. +func WithSinkServiceForContainerSource(name string) func(*sourcesv1alpha1.ContainerSource) { + return func(cs *sourcesv1alpha1.ContainerSource) { + cs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name) + } +} + +// ContainerSource returns a Container EventSource. +func ContainerSource( + name string, + options ...func(*sourcesv1alpha1.ContainerSource), +) *sourcesv1alpha1.ContainerSource { + containerSource := &sourcesv1alpha1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + for _, option := range options { + option(containerSource) + } + return containerSource +} + +// ContainerSourceBasicTemplate returns a basic template that can be used in ContainerSource. +func ContainerSourceBasicTemplate( + name, + namespace, + imageName string, + args []string, +) *corev1.PodTemplateSpec { + envVars := []corev1.EnvVar{ + corev1.EnvVar{ + Name: "POD_NAME", + Value: name, + }, + corev1.EnvVar{ + Name: "POD_NAMESPACE", + Value: namespace, + }, + } + + podTemplateSpec := &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: imageName, + Image: pkgTest.ImagePath(imageName), + ImagePullPolicy: corev1.PullAlways, + Args: args, + Env: envVars, + }, + }, + }, + } + return podTemplateSpec +} + // CloudEvent specifies the arguments for a CloudEvent sent by the sendevent // binary. type CloudEvent struct { diff --git a/test/common/creation.go b/test/common/creation.go index ea8bfed5c3c..c96872408cd 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -133,6 +133,23 @@ func (client *Client) CreateCronJobSourceOrFail( client.Cleaner.AddObj(cronJobSource) } +// CreateContainerSourceOrFail will create a ContainerSource. +func (client *Client) CreateContainerSourceOrFail( + name string, + options ...func(*sourcesv1alpha1.ContainerSource), +) { + namespace := client.Namespace + containerSource := base.ContainerSource(name, options...) + + containerSources := client.Eventing.SourcesV1alpha1().ContainerSources(namespace) + // update containerSource with the new reference + containerSource, err := containerSources.Create(containerSource) + if err != nil { + client.T.Fatalf("Failed to create containersource %q: %v", name, err) + } + client.Cleaner.AddObj(containerSource) +} + // WithService returns an option that creates a Service binded with the given pod. func WithService(name string) func(*corev1.Pod, *Client) error { return func(pod *corev1.Pod, client *Client) error { diff --git a/test/common/operation.go b/test/common/operation.go index 7e8fe951080..ac0670dd01d 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -214,6 +214,31 @@ func (client *Client) WaitForCronJobSourcesReady() error { return nil } +// WaitForContainerSourceReady waits until the containersource is Ready. +func (client *Client) WaitForContainerSourceReady(name string) error { + namespace := client.Namespace + containerSourceMeta := base.MetaSource(name, namespace, "ContainerSource") + if err := base.WaitForResourceReady(client.Dynamic, containerSourceMeta); err != nil { + return err + } + return nil +} + +// WaitForContainerSourcesReady waits until all containersources in the namespace are Ready. +func (client *Client) WaitForContainerSourcesReady() error { + namespace := client.Namespace + containerSources, err := client.Eventing.SourcesV1alpha1().ContainerSources(namespace).List(metav1.ListOptions{}) + if err != nil { + return err + } + for _, containerSource := range containerSources.Items { + if err := client.WaitForContainerSourceReady(containerSource.Name); err != nil { + return err + } + } + return nil +} + // WaitForAllTestResourcesReady waits until all test resources in the namespace are Ready. // Currently the test resources include Pod, Channel, Subscription, Broker and Trigger. // If there are new resources, this function needs to be changed. @@ -233,6 +258,9 @@ func (client *Client) WaitForAllTestResourcesReady() error { if err := client.WaitForCronJobSourcesReady(); err != nil { return err } + if err := client.WaitForContainerSourcesReady(); err != nil { + return err + } if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil { return err } diff --git a/test/common/validation.go b/test/common/validation.go index b46a636973b..f375c86fc4d 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -73,6 +73,13 @@ func CheckerContainsCount(content string, count int) func(string) bool { } } +// CheckerContainsAtLeast returns a checker function to check if the log contains at least the count number of given content. +func CheckerContainsAtLeast(content string, count int) func(string) bool { + return func(log string) bool { + return strings.Count(log, content) >= count + } +} + // FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents. // It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true. func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) { diff --git a/test/e2e/source_container_test.go b/test/e2e/source_container_test.go new file mode 100644 index 00000000000..a0c48d79fda --- /dev/null +++ b/test/e2e/source_container_test.go @@ -0,0 +1,66 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "testing" + + "github.com/knative/eventing/test/base" + "github.com/knative/eventing/test/common" + "k8s.io/apimachinery/pkg/util/uuid" +) + +func TestContainerSource(t *testing.T) { + const ( + containerSourceName = "e2e-container-source" + templateName = "e2e-container-source-template" + // the heartbeats image is built from test_images/heartbeats + imageName = "heartbeats" + + loggerPodName = "e2e-container-source-logger-pod" + ) + + client := Setup(t, true) + defer TearDown(client) + + // create event logger pod and service + loggerPod := base.EventLoggerPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + data := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID()) + // args are the arguments passing to the container, msg is used in the heartbeats image + args := []string{"--msg=" + data} + + // create container source + template := base.ContainerSourceBasicTemplate(templateName, client.Namespace, imageName, args) + templateOption := base.WithTemplateForContainerSource(template) + sinkOption := base.WithSinkServiceForContainerSource(loggerPodName) + client.CreateContainerSourceOrFail(containerSourceName, templateOption, sinkOption) + + // wait for all test resources to be ready + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // verify the logger service receives the event + expectedCount := 2 + if err := client.CheckLog(loggerPodName, common.CheckerContainsAtLeast(data, expectedCount)); err != nil { + t.Fatalf("String %q does not appear at least %d times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) + } +} diff --git a/test/test_images/heartbeats/main.go b/test/test_images/heartbeats/main.go new file mode 100644 index 00000000000..8c1b6d9abca --- /dev/null +++ b/test/test_images/heartbeats/main.go @@ -0,0 +1,120 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/knative/eventing/pkg/kncloudevents" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/kelseyhightower/envconfig" +) + +type Heartbeat struct { + Sequence int `json:"id"` + Msg string `json:"msg"` +} + +var ( + sink string + msg string + periodStr string +) + +func init() { + flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") + flag.StringVar(&msg, "msg", "", "the data message") + flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") +} + +type envConfig struct { + // Sink URL where to send heartbeat cloudevents + Sink string `envconfig:"SINK"` + + // Name of this pod. + Name string `envconfig:"POD_NAME" required:"true"` + + // Namespace this pod exists in. + Namespace string `envconfig:"POD_NAMESPACE" required:"true"` +} + +func main() { + flag.Parse() + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process env var: %s", err) + os.Exit(1) + } + + if env.Sink != "" { + sink = env.Sink + } + + c, err := kncloudevents.NewDefaultClient(sink) + if err != nil { + log.Fatalf("failed to create client: %s", err.Error()) + } + + var period time.Duration + if p, err := strconv.Atoi(periodStr); err != nil { + period = time.Duration(5) * time.Second + } else { + period = time.Duration(p) * time.Second + } + + source := types.ParseURLRef( + fmt.Sprintf("https://github.com/knative/eventing/test/heartbeats/#%s/%s", env.Namespace, env.Name)) + log.Printf("Heartbeats Source: %s", source) + + hb := &Heartbeat{ + Sequence: 0, + Msg: msg, + } + ticker := time.NewTicker(period) + for { + hb.Sequence++ + + event := cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.eventing.samples.heartbeat", + Source: *source, + Extensions: map[string]interface{}{ + "the": 42, + "heart": "yes", + "beats": true, + }, + }.AsV02(), + Data: hb, + } + + log.Printf("sending cloudevent to %s", sink) + if _, err := c.Send(context.Background(), event); err != nil { + log.Printf("failed to send cloudevent: %s", err.Error()) + } + // Wait for next tick + <-ticker.C + } +}