Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions test/e2e/broker_event_transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package e2e
import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing/test/e2e/helpers"
"knative.dev/eventing/test/lib"
)

/*
Expand All @@ -38,14 +41,28 @@ EventSource ---> Broker ---> Trigger1 -------> Service(Transformation)
Note: the number denotes the sequence of the event that flows in this test case.
*/
func TestEventTransformationForTriggerV1BrokerV1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1", "v1", channelTestRunner)
runTest(t, "v1", "v1")
}

func TestEventTransformationForTriggerV1Beta1BrokerV1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1", "v1beta1", channelTestRunner)
runTest(t, "v1", "v1beta1")
}
func TestEventTransformationForTriggerV1Beta1BrokerV1Beta1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1beta1", "v1beta1", channelTestRunner)
runTest(t, "v1beta1", "v1beta1")
}
func TestEventTransformationForTriggerV1BrokerV1Beta1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1beta1", "v1", channelTestRunner)
runTest(t, "v1beta1", "v1")
}

func runTest(t *testing.T, brokerVersion string, triggerVersion string) {

channelTestRunner.RunTests(t, lib.FeatureBasic, func(t *testing.T, component metav1.TypeMeta) {
helpers.EventTransformationForTriggerTestHelper(
t,
brokerVersion,
triggerVersion,
helpers.ChannelBasedBrokerCreator(component, brokerClass),
)
})

}
2 changes: 1 addition & 1 deletion test/e2e/broker_with_many_triggers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

var unsupportedChannelVersions = []string{"v1alpha1"}

func DefaultBrokerCreator(_ *testlib.Client) string {
func DefaultBrokerCreator(_ *testlib.Client, _ string) string {
return sugarresources.DefaultBrokerName
}

Expand Down
169 changes: 78 additions & 91 deletions test/e2e/helpers/broker_event_transformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
Expand All @@ -43,14 +42,12 @@ EventSource ---> Broker ---> Trigger1 -------> Service(Transformation)
Note: the number denotes the sequence of the event that flows in this test case.
*/
func EventTransformationForTriggerTestHelper(t *testing.T,
brokerClass string,
brokerVersion string,
triggerVersion string,
componentsTestRunner testlib.ComponentsTestRunner,
creator BrokerCreator,
options ...testlib.SetupClientOption) {
const (
senderName = "e2e-eventtransformation-sender"
brokerName = "e2e-eventtransformation-broker"

eventType = "type1"
transformedEventType = "type2"
Expand All @@ -66,92 +63,82 @@ func EventTransformationForTriggerTestHelper(t *testing.T,
recordEventsPodName = "recordevents-pod"
)

componentsTestRunner.RunTests(t, testlib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) {
client := testlib.Setup(st, true, options...)
defer testlib.TearDown(client)

// Create a configmap used by the broker.
config := client.CreateBrokerConfigMapOrFail(brokerName, &channel)

// create a new broker
if brokerVersion == "v1" {
client.CreateBrokerV1OrFail(brokerName, resources.WithBrokerClassForBrokerV1(brokerClass), resources.WithConfigForBrokerV1(config))
} else {
client.CreateBrokerV1Beta1OrFail(brokerName, resources.WithBrokerClassForBrokerV1Beta1(brokerClass), resources.WithConfigForBrokerV1Beta1(config))
}
client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta)

// create the transformation service
transformationPod := resources.EventTransformationPod(
transformationPodName,
transformedEventType,
transformedEventSource,
[]byte(transformedBody),
client := testlib.Setup(t, true, options...)
defer testlib.TearDown(client)

brokerName := creator(client, brokerVersion)
client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta)

// create the transformation service
transformationPod := resources.EventTransformationPod(
transformationPodName,
transformedEventType,
transformedEventSource,
[]byte(transformedBody),
)
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create trigger1 for event transformation
if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
originalTriggerName,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1(transformationPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
originalTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(transformationPodName),
)
}

// create logger pod and service
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()

// create trigger2 for event receiving
if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
transformedTriggerName,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberServiceRefForTriggerV1(recordEventsPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
transformedTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(recordEventsPodName),
)
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create trigger1 for event transformation
if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
originalTriggerName,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1(transformationPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
originalTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(transformationPodName),
)
}

// create logger pod and service
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()

// create trigger2 for event receiving
if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
transformedTriggerName,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberServiceRefForTriggerV1(recordEventsPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
transformedTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(recordEventsPodName),
)
}

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// eventToSend is the event sent as input of the test
eventToSend := cloudevents.NewEvent()
eventToSend.SetID(uuid.New().String())
eventToSend.SetType(eventType)
eventToSend.SetSource(eventSource)
if err := eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(senderName, brokerName, testlib.BrokerTypeMeta, eventToSend)

// check if the logging service receives the correct event
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
HasSource(transformedEventSource),
HasType(transformedEventType),
HasData([]byte(transformedBody)),
))

eventTracker.AssertNot(recordevents.MatchEvent(
HasSource(eventSource),
HasType(eventType),
HasData([]byte(eventBody)),
))
})
}

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// eventToSend is the event sent as input of the test
eventToSend := cloudevents.NewEvent()
eventToSend.SetID(uuid.New().String())
eventToSend.SetType(eventType)
eventToSend.SetSource(eventSource)
if err := eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(senderName, brokerName, testlib.BrokerTypeMeta, eventToSend)

// check if the logging service receives the correct event
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
HasSource(transformedEventSource),
HasType(transformedEventType),
HasData([]byte(transformedBody)),
))

eventTracker.AssertNot(recordevents.MatchEvent(
HasSource(eventSource),
HasType(eventType),
HasData([]byte(eventBody)),
))
}
27 changes: 18 additions & 9 deletions test/e2e/helpers/broker_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,30 @@ func (tc eventTestCase) ToEventMatcher() cetest.EventMatcher {

// BrokerCreator creates a broker and returns its broker name.
// TestBrokerWithManyTriggers will wait for the broker to become ready.
type BrokerCreator func(client *testlib.Client) string
type BrokerCreator func(client *testlib.Client, version string) string

// ChannelBasedBrokerCreator creates a BrokerCreator that creates a broker based on the channel parameter.
func ChannelBasedBrokerCreator(channel metav1.TypeMeta, brokerClass string) BrokerCreator {
return func(client *testlib.Client) string {
return func(client *testlib.Client, version string) string {
brokerName := strings.ToLower(channel.Kind)

// create a ConfigMap used by the broker.
config := client.CreateBrokerConfigMapOrFail("config-"+brokerName, &channel)

// create a new broker.
client.CreateBrokerV1Beta1OrFail(brokerName,
resources.WithBrokerClassForBrokerV1Beta1(brokerClass),
resources.WithConfigForBrokerV1Beta1(config),
)
switch version {
case "v1":
client.CreateBrokerV1OrFail(brokerName,
resources.WithBrokerClassForBrokerV1(brokerClass),
resources.WithConfigForBrokerV1(config),
)
case "v1beta1":
client.CreateBrokerV1Beta1OrFail(brokerName,
resources.WithBrokerClassForBrokerV1Beta1(brokerClass),
resources.WithConfigForBrokerV1Beta1(config),
)
default:
panic("unknown version: " + version)
}

return brokerName
}
Expand Down Expand Up @@ -143,7 +152,7 @@ func TestBrokerWithManyTriggers(t *testing.T, brokerCreator BrokerCreator, shoul
// to set in the subscriber and services pod
// The attributes in these test cases will be used as assertions on the receivers
eventFilters []eventTestCase
//TriggerFilter with DeprecatedSourceAndType or not
// TriggerFilter with DeprecatedSourceAndType or not
deprecatedTriggerFilter bool
// Use v1beta1 trigger
v1beta1 bool
Expand Down Expand Up @@ -230,7 +239,7 @@ func TestBrokerWithManyTriggers(t *testing.T, brokerCreator BrokerCreator, shoul
}
}

brokerName := brokerCreator(client)
brokerName := brokerCreator(client, "v1beta1")

// Wait for broker ready.
client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/helpers/trigger_no_broker_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestTriggerNoBroker(t *testing.T, channel string, brokerCreator BrokerCreat
}

// Then create the Broker and just make sure they both come ready.
if bn := brokerCreator(client); bn != brokerName {
if bn := brokerCreator(client, "v1beta1"); bn != brokerName {
t.Fatalf("Broker created with unexpected name, wanted %q got %q", brokerName, bn)
}

Expand Down