Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 0 additions & 93 deletions test/e2e/helpers/broker_dls_test_helper.go

This file was deleted.

108 changes: 59 additions & 49 deletions test/e2e/helpers/broker_with_config_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ limitations under the License.
package helpers

import (
"fmt"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

"knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
)

Expand All @@ -37,20 +38,21 @@ func TestBrokerWithConfig(t *testing.T,
senderName = "e2e-brokerchannel-sender"
brokerName = "e2e-brokerchannel-broker"

any = v1beta1.TriggerAnyFilter
eventType1 = "type1"
eventType2 = "type2"
eventSource1 = "source1"
eventSource2 = "source2"
eventBody = "e2e-brokerchannel-body"
any = v1beta1.TriggerAnyFilter
eventType = "type1"
transformedEventType = "type2"
eventSource = "http://source1.com"
transformedEventSource = "http://source2.com"
eventBody = `{"msg":"e2e-brokerchannel-body"}`
transformedBody = `{"msg":"transformed body"}`

triggerName1 = "e2e-brokerchannel-trigger1"
triggerName2 = "e2e-brokerchannel-trigger2"
triggerName3 = "e2e-brokerchannel-trigger3"

transformationPodName = "e2e-brokerchannel-trans-pod"
loggerPodName1 = "e2e-brokerchannel-logger-pod1"
loggerPodName2 = "e2e-brokerchannel-logger-pod2"
transformationPodName = "e2e-brokerchannel-trans-pod"
allEventsRecorderPodName = "e2e-brokerchannel-logger-pod1"
transformedEventsRecorderPodName = "e2e-brokerchannel-logger-pod2"

channelName = "e2e-brokerchannel-channel"
subscriptionName = "e2e-brokerchannel-subscription"
Expand All @@ -67,36 +69,42 @@ func TestBrokerWithConfig(t *testing.T,
client.CreateBrokerV1Beta1OrFail(brokerName, resources.WithBrokerClassForBrokerV1Beta1(brokerClass), resources.WithConfigForBrokerV1Beta1(config))
client.WaitForResourceReadyOrFail(brokerName, lib.BrokerTypeMeta)

// create the event we want to transform to
transformedEventBody := fmt.Sprintf("%s %s", eventBody, string(uuid.NewUUID()))
eventAfterTransformation := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, transformedEventBody),
cloudevents.WithSource(eventSource2),
cloudevents.WithType(eventType2),
)
// eventToSend is the event sent as input of the test
eventToSend := cloudevents.NewEvent()
eventToSend.SetID(uuid.New().String())
eventToSend.SetType(eventType)
eventToSend.SetSource(eventSource)
if err := eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

// create the transformation service for trigger1
transformationPod := resources.DeprecatedEventTransformationPod(transformationPodName, eventAfterTransformation)
transformationPod := resources.EventTransformationPod(
transformationPodName,
transformedEventType,
transformedEventSource,
[]byte(transformedBody),
)
client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName))

// create trigger1 to receive the original event, and do event transformation
client.CreateTriggerOrFailV1Beta1(
triggerName1,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource1, eventType1, nil),
resources.WithAttributesTriggerFilterV1Beta1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(transformationPodName),
)

// create logger pod and service for trigger2
loggerPod1 := resources.EventLoggerPod(loggerPodName1)
client.CreatePodOrFail(loggerPod1, lib.WithService(loggerPodName1))
// create event tracker that should receive all sent events
allEventTracker, _ := recordevents.StartEventRecordOrFail(client, allEventsRecorderPodName)
defer allEventTracker.Cleanup()

// create trigger2 to receive all the events
// create trigger to receive all the events
client.CreateTriggerOrFailV1Beta1(
triggerName2,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(any, any, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerPodName1),
resources.WithSubscriberServiceRefForTriggerV1Beta1(allEventsRecorderPodName),
)

// create channel for trigger3
Expand All @@ -111,42 +119,44 @@ func TestBrokerWithConfig(t *testing.T,
client.CreateTriggerOrFailV1Beta1(
triggerName3,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource2, eventType2, nil),
resources.WithAttributesTriggerFilterV1Beta1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberURIForTriggerV1Beta1(channelURL),
)

// create logger pod and service for subscription
loggerPod2 := resources.EventLoggerPod(loggerPodName2)
client.CreatePodOrFail(loggerPod2, lib.WithService(loggerPodName2))
// create event tracker that should receive only transformed events
transformedEventTracker, _ := recordevents.StartEventRecordOrFail(client, transformedEventsRecorderPodName)
defer transformedEventTracker.Cleanup()

// create subscription
client.CreateSubscriptionOrFail(
subscriptionName,
channelName,
&channel,
resources.WithSubscriberForSubscription(loggerPodName2),
resources.WithSubscriberForSubscription(transformedEventsRecorderPodName),
)

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// send fake CloudEvent to the broker
eventToSend := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, eventBody),
cloudevents.WithSource(eventSource1),
cloudevents.WithType(eventType1),
)
client.SendFakeEventToAddressableOrFail(senderName, brokerName, lib.BrokerTypeMeta, eventToSend)

// check if trigger2's logging service receives both events
eventBodies := []string{transformedEventBody, eventBody}
if err := client.CheckLog(loggerPodName1, lib.CheckerContainsAll([]string{transformedEventBody, eventBody})); err != nil {
st.Fatalf("Strings %v not found in logs of logger pod %q: %v", eventBodies, loggerPodName1, err)
}

// check if subscription's logging service receives the transformed event
if err := client.CheckLog(loggerPodName2, lib.CheckerContains(transformedEventBody)); err != nil {
st.Fatalf("Strings %q not found in logs of logger pod %q: %v", transformedEventBody, loggerPodName2, err)
}
// send CloudEvent to the broker
client.SendEventToAddressable(senderName, brokerName, lib.BrokerTypeMeta, eventToSend)

// Assert the results on the event trackers
originalEventMatcher := recordevents.MatchEvent(AllOf(
HasSource(eventSource),
HasType(eventType),
HasData([]byte(eventBody)),
))
transformedEventMatcher := recordevents.MatchEvent(AllOf(
HasSource(transformedEventSource),
HasType(transformedEventType),
HasData([]byte(transformedBody)),
))

allEventTracker.AssertAtLeast(1, originalEventMatcher)
allEventTracker.AssertAtLeast(1, transformedEventMatcher)

transformedEventTracker.AssertAtLeast(1, transformedEventMatcher)
transformedEventTracker.AssertNot(originalEventMatcher)
})
}
7 changes: 6 additions & 1 deletion test/e2e/helpers/channel_event_tranformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T,
if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
transformationPod := resources.EventTransformationPod(transformationPodName, eventAfterTransformation)
transformationPod := resources.EventTransformationPod(
transformationPodName,
eventAfterTransformation.Type(),
eventAfterTransformation.Source(),
eventAfterTransformation.Data(),
)
client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName))

// create event logger pod and service as the subscriber
Expand Down
13 changes: 13 additions & 0 deletions test/lib/recordevents/event_info_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"

"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
)

const (
Expand Down Expand Up @@ -92,6 +94,17 @@ func NewEventInfoStore(client *lib.Client, podName string) (*EventInfoStore, err
return ei, nil
}

// Deploys a new recordevents pod and start the associated EventInfoStore
func StartEventRecordOrFail(client *lib.Client, podName string) (*EventInfoStore, *corev1.Pod) {
eventRecordPod := resources.EventRecordPod(podName)
client.CreatePodOrFail(eventRecordPod, lib.WithService(podName))
eventTracker, err := NewEventInfoStore(client, podName)
if err != nil {
client.T.Fatalf("Failed to start the EventInfoStore associated to pod '%s': %v", podName, err)
}
return eventTracker, eventRecordPod
}

// Starts the single threaded background goroutine used to update local state
// from the remote REST API.
func (ei *EventInfoStore) start() {
Expand Down
16 changes: 10 additions & 6 deletions test/lib/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
pkgTest "knative.dev/pkg/test"

cloudevents "github.com/cloudevents/sdk-go/v2"
cetest "knative.dev/eventing/test/lib/cloudevents"
)

Expand Down Expand Up @@ -101,11 +100,15 @@ func eventSenderPodImage(imageName string, name string, sink string, event *cete
}

// EventLoggerPod creates a Pod that logs events received.
// Deprecated: This test image is gonna be removed soon and you should use EventRecordPod.
// Look at recordevents.StartEventRecordOrFail for more info
func EventLoggerPod(name string) *corev1.Pod {
return eventLoggerPod("logevents", name)
}

// EventDetailsPod creates a Pod that validates events received and log details about events.
// Deprecated: This test image is gonna be removed soon and you should use EventRecordPod.
// Look at recordevents.StartEventRecordOrFail for more info
func EventDetailsPod(name string) *corev1.Pod {
return eventLoggerPod("eventdetails", name)
}
Expand Down Expand Up @@ -133,6 +136,8 @@ func eventLoggerPod(imageName string, name string) *corev1.Pod {
}

// DeprecatedEventTransformationPod creates a Pod that transforms events received.
// Deprecated: Use EventTransformationPod
// TODO(nlopezgi): remove once other tests that use sdk1 and depend on this method are migrated.
func DeprecatedEventTransformationPod(name string, event *cetest.CloudEvent) *corev1.Pod {
const imageName = "transformevents"
return &corev1.Pod{
Expand Down Expand Up @@ -160,8 +165,7 @@ func DeprecatedEventTransformationPod(name string, event *cetest.CloudEvent) *co
}

// EventTransformationPod creates a Pod that transforms events received receiving as arg a cloudevents sdk2 Event
// TODO(nlopezgi): remove DeprecatedEventTransformationPod above once other tests that use sdk1 and depend on this method are migrated.
func EventTransformationPod(name string, event cloudevents.Event) *corev1.Pod {
func EventTransformationPod(name string, newEventType string, newEventSource string, newEventData []byte) *corev1.Pod {
const imageName = "transformevents"
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -175,11 +179,11 @@ func EventTransformationPod(name string, event cloudevents.Event) *corev1.Pod {
ImagePullPolicy: corev1.PullAlways,
Args: []string{
"-event-type",
event.Type(),
newEventType,
"-event-source",
event.Source(),
newEventSource,
"-event-data",
string(event.Data()),
string(newEventData),
},
}},
RestartPolicy: corev1.RestartPolicyAlways,
Expand Down