Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/base/resources/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
NatssChannelKind string = "NatssChannel"

SequenceKind string = "Sequence"
ChoiceKind string = "Choice"
)

// Kind for sources resources.
Expand Down
23 changes: 23 additions & 0 deletions test/base/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,29 @@ func SequenceStepperPod(name, eventMsgAppender string) *corev1.Pod {
}
}

// EventFilteringPod creates a Pod that either filter or send the received CloudEvent
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// EventFilteringPod creates a Pod that either filter or send the received CloudEvent
// EventFilteringPod creates a Pod that either filters or sends the received CloudEvent.

func EventFilteringPod(name string, filter bool) *corev1.Pod {
const imageName = "filterevents"
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"e2etest": string(uuid.NewUUID())},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: imageName,
Image: pkgTest.ImagePath(imageName),
ImagePullPolicy: corev1.PullAlways,
}},
RestartPolicy: corev1.RestartPolicyAlways,
},
}
if filter {
pod.Spec.Containers[0].Args = []string{"-filter"}
}
return pod
}

// EventLatencyPod creates a Pod that measures events transfer latency.
func EventLatencyPod(name, sink string, eventCount int) *corev1.Pod {
const imageName = "latency"
Expand Down
12 changes: 12 additions & 0 deletions test/common/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
import (
eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
messagingv1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1"
sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1"
"github.com/knative/eventing/test/base/resources"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -155,6 +156,17 @@ func (client *Client) CreateSequenceOrFail(
client.Tracker.AddObj(sequence)
}

// CreateChoiceOrFail will create a Choice or fail the test if there is an error.
func (client *Client) CreateChoiceOrFail(choice *messagingv1alpha1.Choice) {
choices := client.Eventing.MessagingV1alpha1().Choices(client.Namespace)
// create choice with the new reference
created, err := choices.Create(choice)
if err != nil {
client.T.Fatalf("Failed to create choice %q: %v", choice.Name, err)
}
client.Tracker.AddObj(created)
}

// CreateCronJobSourceOrFail will create a CronJobSource or fail the test if there is an error.
func (client *Client) CreateCronJobSourceOrFail(
name,
Expand Down
3 changes: 3 additions & 0 deletions test/common/typemeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ var NatssChannelTypeMeta = MessagingTypeMeta(resources.NatssChannelKind)
// SequenceTypeMeta is the TypeMeta ref for Sequence.
var SequenceTypeMeta = MessagingTypeMeta(resources.SequenceKind)

// ChoiceTypeMeta is the TypeMeta ref for Choice.
var ChoiceTypeMeta = MessagingTypeMeta(resources.ChoiceKind)

// MessagingTypeMeta returns the TypeMeta ref for an eventing messaing resource.
func MessagingTypeMeta(kind string) *metav1.TypeMeta {
return &metav1.TypeMeta{
Expand Down
147 changes: 147 additions & 0 deletions test/e2e/choice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// +build e2e

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"encoding/json"
"fmt"
"testing"

eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1"
eventingtesting "github.com/knative/eventing/pkg/reconciler/testing"
"github.com/knative/eventing/test/base/resources"
"github.com/knative/eventing/test/common"
"k8s.io/apimachinery/pkg/util/uuid"
pkgTest "knative.dev/pkg/test"
)

type caseConfig struct {
filter bool
}

func TestChoice(t *testing.T) {
const (
senderPodName = "e2e-choice"
)
table := []struct {
name string
casesConfig []caseConfig
expected string
}{
{
name: "two-cases-pass-first-case-only",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make these two separate test cases? That way we can remove a for loop below:

	table := []struct {
		name        string
		casesConfig caseConfig
		expected    string
	}{
		{
			name: "don't filter",
			casesConfig: caseConfig{
				filter: false,
			},
		},
		{
			name: "filter",
			casesConfig: caseConfig{
				filter: true,
			},
			expected: "choice-filter",
		},
	}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One choice can have multiple cases. It's a bit confusing because there is only one test (more to come).

casesConfig: []caseConfig{
{filter: false},
{filter: true},
},
expected: "choice-two-cases-pass-first-case-only-case-0-sub",
},
}
channelTypeMeta := getChannelTypeMeta(common.DefaultChannel)

client := setup(t, true)
defer tearDown(client)

for _, tc := range table {
choiceCases := make([]v1alpha1.ChoiceCase, len(tc.casesConfig))
for caseNumber, cse := range tc.casesConfig {
// construct filter services
filterPodName := fmt.Sprintf("choice-%s-case-%d-filter", tc.name, caseNumber)
filterPod := resources.EventFilteringPod(filterPodName, cse.filter)
client.CreatePodOrFail(filterPod, common.WithService(filterPodName))

// construct case subscriber
subPodName := fmt.Sprintf("choice-%s-case-%d-sub", tc.name, caseNumber)
subPod := resources.SequenceStepperPod(subPodName, subPodName)
client.CreatePodOrFail(subPod, common.WithService(subPodName))

choiceCases[caseNumber] = v1alpha1.ChoiceCase{
Filter: &eventingv1alpha1.SubscriberSpec{
Ref: resources.ServiceRef(filterPodName),
},
Subscriber: eventingv1alpha1.SubscriberSpec{
Ref: resources.ServiceRef(subPodName),
},
}
}

channelTemplate := &eventingduckv1alpha1.ChannelTemplateSpec{
TypeMeta: *(channelTypeMeta),
}

// create logger service for global reply
loggerPodName := fmt.Sprintf("%s-logger", tc.name)
loggerPod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName))

// create channel as reply of the Choice
// TODO(Fredy-Z): now we'll have to use a channel plus its subscription here, as reply of the Subscription
// must be Addressable.
replyChannelName := fmt.Sprintf("reply-%s", tc.name)
client.CreateChannelOrFail(replyChannelName, channelTypeMeta)
replySubscriptionName := fmt.Sprintf("reply-%s", tc.name)
client.CreateSubscriptionOrFail(
replySubscriptionName,
replyChannelName,
channelTypeMeta,
resources.WithSubscriberForSubscription(loggerPodName),
)

choice := eventingtesting.NewChoice(tc.name, client.Namespace,
eventingtesting.WithChoiceChannelTemplateSpec(channelTemplate),
eventingtesting.WithChoiceCases(choiceCases),
eventingtesting.WithChoiceReply(pkgTest.CoreV1ObjectReference(channelTypeMeta.Kind, channelTypeMeta.APIVersion, replyChannelName)))

client.CreateChoiceOrFail(choice)

if err := client.WaitForAllTestResourcesReady(); err != nil {
t.Fatalf("Failed to get all test resources ready: %v", err)
}

// send fake CloudEvent to the Choice
msg := fmt.Sprintf("TestChoice %s - ", uuid.NewUUID())
// NOTE: the eventData format must be CloudEventBaseData, as it needs to be correctly parsed in the stepper service.
eventData := resources.CloudEventBaseData{Message: msg}
eventDataBytes, err := json.Marshal(eventData)
if err != nil {
t.Fatalf("Failed to convert %v to json: %v", eventData, err)
}
event := &resources.CloudEvent{
Source: senderPodName,
Type: resources.CloudEventDefaultType,
Data: string(eventDataBytes),
Encoding: resources.CloudEventDefaultEncoding,
}
if err := client.SendFakeEventToAddressable(
senderPodName,
tc.name,
common.ChoiceTypeMeta,
event,
); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the choice %q", tc.name)
}

// verify the logger service receives the correct transformed event
if err := client.CheckLog(loggerPodName, common.CheckerContains(tc.expected)); err != nil {
t.Fatalf("String %q not found in logs of logger pod %q: %v", tc.expected, loggerPodName, err)
}
}

}
67 changes: 67 additions & 0 deletions test/test_images/filterevents/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"flag"
"log"

cloudevents "github.com/cloudevents/sdk-go"
)

var (
filter bool
)

func init() {
flag.BoolVar(&filter, "filter", false, "Whether to filter the event")
}

func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error {
ctx := event.Context.AsV03()

dataBytes, err := event.DataBytes()
if err != nil {
log.Printf("Got Data Error: %s\n", err.Error())
return err
}
log.Println("Received a new event: ")
log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes)

if filter {
log.Println("Filter event")
resp.Status = 200
} else {
log.Println("Reply with event")
resp.RespondWith(200, &event)
}
return nil
}

func main() {
// parse the command line flags
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// parse the command line flags
// Parse the command line flags.

flag.Parse()

c, err := cloudevents.NewDefaultClient()
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

log.Printf("listening on 8080")
log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent))
}