Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions test/base/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,73 @@ func CronJobSource(
return cronJobSource
}

// WithTemplateForContainerSource returns an option that adds a template for the given ContainerSource.
func WithTemplateForContainerSource(template *corev1.PodTemplateSpec) func(*sourcesv1alpha1.ContainerSource) {
return func(cs *sourcesv1alpha1.ContainerSource) {
cs.Spec.Template = template
}
}

// WithSinkServiceForContainerSource returns an option that adds a Kubernetes Service sink for the given ContainerSource.
func WithSinkServiceForContainerSource(name string) func(*sourcesv1alpha1.ContainerSource) {
return func(cs *sourcesv1alpha1.ContainerSource) {
cs.Spec.Sink = pkgTest.CoreV1ObjectReference("Service", "v1", name)
}
}

// ContainerSource returns a Container EventSource.
func ContainerSource(
name string,
options ...func(*sourcesv1alpha1.ContainerSource),
) *sourcesv1alpha1.ContainerSource {
containerSource := &sourcesv1alpha1.ContainerSource{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
for _, option := range options {
option(containerSource)
}
return containerSource
}

// ContainerSourceBasicTemplate returns a basic template that can be used in ContainerSource.
func ContainerSourceBasicTemplate(
name,
namespace,
imageName string,
args []string,
) *corev1.PodTemplateSpec {
envVars := []corev1.EnvVar{
corev1.EnvVar{
Name: "POD_NAME",
Value: name,
},
corev1.EnvVar{
Name: "POD_NAMESPACE",
Value: namespace,
},
}

podTemplateSpec := &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: imageName,
Image: pkgTest.ImagePath(imageName),
ImagePullPolicy: corev1.PullAlways,
Args: args,
Env: envVars,
},
},
},
}
return podTemplateSpec
}

// CloudEvent specifies the arguments for a CloudEvent sent by the sendevent
// binary.
type CloudEvent struct {
Expand Down
17 changes: 17 additions & 0 deletions test/common/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ func (client *Client) CreateCronJobSourceOrFail(
client.Cleaner.AddObj(cronJobSource)
}

// CreateContainerSourceOrFail will create a ContainerSource.
func (client *Client) CreateContainerSourceOrFail(
name string,
options ...func(*sourcesv1alpha1.ContainerSource),
) {
namespace := client.Namespace
containerSource := base.ContainerSource(name, options...)

containerSources := client.Eventing.SourcesV1alpha1().ContainerSources(namespace)
// update containerSource with the new reference
containerSource, err := containerSources.Create(containerSource)
if err != nil {
client.T.Fatalf("Failed to create containersource %q: %v", name, err)
}
client.Cleaner.AddObj(containerSource)
}

// WithService returns an option that creates a Service binded with the given pod.
func WithService(name string) func(*corev1.Pod, *Client) error {
return func(pod *corev1.Pod, client *Client) error {
Expand Down
28 changes: 28 additions & 0 deletions test/common/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,31 @@ func (client *Client) WaitForCronJobSourcesReady() error {
return nil
}

// WaitForContainerSourceReady waits until the containersource is Ready.
func (client *Client) WaitForContainerSourceReady(name string) error {
namespace := client.Namespace
containerSourceMeta := base.MetaSource(name, namespace, "ContainerSource")
if err := base.WaitForResourceReady(client.Dynamic, containerSourceMeta); err != nil {
return err
}
return nil
}

// WaitForContainerSourcesReady waits until all containersources in the namespace are Ready.
func (client *Client) WaitForContainerSourcesReady() error {
namespace := client.Namespace
containerSources, err := client.Eventing.SourcesV1alpha1().ContainerSources(namespace).List(metav1.ListOptions{})
if err != nil {
return err
}
for _, containerSource := range containerSources.Items {
if err := client.WaitForContainerSourceReady(containerSource.Name); err != nil {
return err
}
}
return nil
}

// WaitForAllTestResourcesReady waits until all test resources in the namespace are Ready.
// Currently the test resources include Pod, Channel, Subscription, Broker and Trigger.
// If there are new resources, this function needs to be changed.
Expand All @@ -233,6 +258,9 @@ func (client *Client) WaitForAllTestResourcesReady() error {
if err := client.WaitForCronJobSourcesReady(); err != nil {
return err
}
if err := client.WaitForContainerSourcesReady(); err != nil {
return err
}
if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions test/common/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func CheckerContainsCount(content string, count int) func(string) bool {
}
}

// CheckerContainsAtLeast returns a checker function to check if the log contains at least the count number of given content.
func CheckerContainsAtLeast(content string, count int) func(string) bool {
return func(log string) bool {
return strings.Count(log, content) >= count
}
}

// FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents.
// It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true.
func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) {
Expand Down
66 changes: 66 additions & 0 deletions test/e2e/source_container_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// +build e2e

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"fmt"
"testing"

"github.com/knative/eventing/test/base"
"github.com/knative/eventing/test/common"
"k8s.io/apimachinery/pkg/util/uuid"
)

func TestContainerSource(t *testing.T) {
const (
containerSourceName = "e2e-container-source"
templateName = "e2e-container-source-template"
// the heartbeats image is built from test_images/heartbeats
imageName = "heartbeats"

loggerPodName = "e2e-container-source-logger-pod"
)

client := Setup(t, true)
defer TearDown(client)

// create event logger pod and service
loggerPod := base.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName))

data := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID())
// args are the arguments passing to the container, msg is used in the heartbeats image
args := []string{"--msg=" + data}

// create container source
template := base.ContainerSourceBasicTemplate(templateName, client.Namespace, imageName, args)
templateOption := base.WithTemplateForContainerSource(template)
sinkOption := base.WithSinkServiceForContainerSource(loggerPodName)
client.CreateContainerSourceOrFail(containerSourceName, templateOption, sinkOption)

// wait for all test resources to be ready
if err := client.WaitForAllTestResourcesReady(); err != nil {
t.Fatalf("Failed to get all test resources ready: %v", err)
}

// verify the logger service receives the event
expectedCount := 2
if err := client.CheckLog(loggerPodName, common.CheckerContainsAtLeast(data, expectedCount)); err != nil {
t.Fatalf("String %q does not appear at least %d times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err)
}
}
120 changes: 120 additions & 0 deletions test/test_images/heartbeats/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"flag"
"fmt"
"log"
"os"
"strconv"
"time"

"github.com/knative/eventing/pkg/kncloudevents"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
Sequence int `json:"id"`
Msg string `json:"msg"`
}

var (
sink string
msg string
periodStr string
)

func init() {
flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
flag.StringVar(&msg, "msg", "", "the data message")
flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
// Sink URL where to send heartbeat cloudevents
Sink string `envconfig:"SINK"`

// Name of this pod.
Name string `envconfig:"POD_NAME" required:"true"`

// Namespace this pod exists in.
Namespace string `envconfig:"POD_NAMESPACE" required:"true"`
}

func main() {
flag.Parse()

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}

if env.Sink != "" {
sink = env.Sink
}

c, err := kncloudevents.NewDefaultClient(sink)
if err != nil {
log.Fatalf("failed to create client: %s", err.Error())
}

var period time.Duration
if p, err := strconv.Atoi(periodStr); err != nil {
period = time.Duration(5) * time.Second
} else {
period = time.Duration(p) * time.Second
}

source := types.ParseURLRef(
fmt.Sprintf("https://github.com/knative/eventing/test/heartbeats/#%s/%s", env.Namespace, env.Name))
log.Printf("Heartbeats Source: %s", source)

hb := &Heartbeat{
Sequence: 0,
Msg: msg,
}
ticker := time.NewTicker(period)
for {
hb.Sequence++

event := cloudevents.Event{
Context: cloudevents.EventContextV02{
Type: "dev.knative.eventing.samples.heartbeat",
Source: *source,
Extensions: map[string]interface{}{
"the": 42,
"heart": "yes",
"beats": true,
},
}.AsV02(),
Data: hb,
}

log.Printf("sending cloudevent to %s", sink)
if _, err := c.Send(context.Background(), event); err != nil {
log.Printf("failed to send cloudevent: %s", err.Error())
}
// Wait for next tick
<-ticker.C
}
}