Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/pelletier/go-toml v1.8.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.9.1
github.com/rickb777/date v1.13.0
github.com/robfig/cron/v3 v3.0.1
github.com/rogpeppe/fastuuid v1.2.0
Expand Down
14 changes: 8 additions & 6 deletions test/conformance/helpers/broker_data_plane_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,16 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper(
}

transformMsg := []byte(`{"msg":"Transformed!"}`)
transformPod := resources.EventTransformationPod(
recordevents.DeployEventRecordOrFail(
ctx,
client,
transformerName,
"reply-check-type",
"reply-check-source",
transformMsg,
recordevents.ReplyWithTransformedEvent(
"reply-check-type",
"reply-check-source",
string(transformMsg),
),
)
client.CreatePodOrFail(transformPod, testlib.WithService(transformerName))
client.WaitForAllTestResourcesReadyOrFail(ctx)

trigger := client.CreateTriggerOrFailV1Beta1(
triggerName,
Expand Down
15 changes: 9 additions & 6 deletions test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,18 @@ func setupBrokerTracing(ctx context.Context, brokerClass string) SetupTracingTes
resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerPodName),
)

// Create a transformer (EventTransfrmer) Pod that replies with the same event as the input,
// Create a transformer Pod (recordevents with transform reply) that replies with the same event as the input,
// except the reply's event's type is changed to bar.
eventTransformerPod := resources.EventTransformationPod(
eventTransformerPod := recordevents.DeployEventRecordOrFail(
ctx,
client,
"transformer",
etLogger,
senderName,
[]byte(eventBody),
recordevents.ReplyWithTransformedEvent(
etLogger,
senderName,
eventBody,
),
)
client.CreatePodOrFail(eventTransformerPod, testlib.WithService(eventTransformerPod.Name))

// Create a Trigger that receives events (type=foo) and sends them to the transformer Pod.
transformerTrigger := client.CreateTriggerOrFailV1Beta1(
Expand Down
13 changes: 8 additions & 5 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,16 @@ func setupChannelTracingWithReply(
recordEventsPod := recordevents.DeployEventRecordOrFail(ctx, client, recordEventsPodName)

// Create the subscriber, a Pod that mutates the event.
transformerPod := resources.EventTransformationPod(
transformerPod := recordevents.DeployEventRecordOrFail(
ctx,
client,
"transformer",
"mutated",
eventSource,
nil,
recordevents.ReplyWithTransformedEvent(
"mutated",
eventSource,
"",
),
)
client.CreatePodOrFail(transformerPod, testlib.WithService(transformerPod.Name))

// Create the Subscription linking the Channel to the mutator.
client.CreateSubscriptionOrFail(
Expand Down
13 changes: 8 additions & 5 deletions test/e2e/helpers/broker_channel_flow_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ func BrokerChannelFlowWithTransformation(
}

// create the transformation service for trigger1
transformationPod := resources.EventTransformationPod(
recordevents.DeployEventRecordOrFail(
ctx,
client,
transformationPodName,
transformedEventType,
transformedEventSource,
[]byte(transformedBody),
recordevents.ReplyWithTransformedEvent(
transformedEventType,
transformedEventSource,
transformedBody,
),
)
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create trigger1 to receive the original event, and do event transformation
if triggerVersion == "v1" {
Expand Down
13 changes: 8 additions & 5 deletions test/e2e/helpers/broker_event_transformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,16 @@ func EventTransformationForTriggerTestHelper(
client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta)

// create the transformation service
transformationPod := resources.EventTransformationPod(
recordevents.DeployEventRecordOrFail(
ctx,
client,
transformationPodName,
transformedEventType,
transformedEventSource,
[]byte(transformedBody),
recordevents.ReplyWithTransformedEvent(
transformedEventType,
transformedEventSource,
transformedBody,
),
)
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create trigger1 for event transformation
if triggerVersion == "v1" {
Expand Down
13 changes: 8 additions & 5 deletions test/e2e/helpers/channel_event_tranformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ func EventTransformationForSubscriptionTestHelper(
if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
transformationPod := resources.EventTransformationPod(
recordevents.DeployEventRecordOrFail(
ctx,
client,
transformationPodName,
eventAfterTransformation.Type(),
eventAfterTransformation.Source(),
eventAfterTransformation.Data(),
recordevents.ReplyWithTransformedEvent(
eventAfterTransformation.Type(),
eventAfterTransformation.Source(),
string(eventAfterTransformation.Data()),
),
)
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create event logger pod and service as the subscriber
eventTracker, _ := recordevents.StartEventRecordOrFail(ctx, client, recordEventsPodName)
Expand Down
30 changes: 22 additions & 8 deletions test/e2e/helpers/parallel_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,20 @@ func ParallelTestHelper(
for branchNumber, cse := range tc.branchesConfig {
// construct filter services
filterPodName := fmt.Sprintf("parallel-%s-branch-%d-filter", tc.name, branchNumber)
filterPod := resources.EventFilteringPod(filterPodName, cse.filter)
client.CreatePodOrFail(filterPod, testlib.WithService(filterPodName))
if cse.filter {
recordevents.DeployEventRecordOrFail(ctx, client, filterPodName)
} else {
recordevents.DeployEventRecordOrFail(ctx, client, filterPodName, recordevents.EchoEvent)
}

// construct branch subscriber
subPodName := fmt.Sprintf("parallel-%s-branch-%d-sub", tc.name, branchNumber)
subPod := resources.SequenceStepperPod(subPodName, subPodName)
client.CreatePodOrFail(subPod, testlib.WithService(subPodName))
recordevents.DeployEventRecordOrFail(
ctx,
client,
subPodName,
recordevents.ReplyWithAppendedData(subPodName),
)

parallelBranches[branchNumber] = v1beta1.ParallelBranch{
Filter: &duckv1.Destination{
Expand Down Expand Up @@ -183,13 +190,20 @@ func ParallelV1TestHelper(
for branchNumber, cse := range tc.branchesConfig {
// construct filter services
filterPodName := fmt.Sprintf("parallel-%s-branch-%d-filter", tc.name, branchNumber)
filterPod := resources.EventFilteringPod(filterPodName, cse.filter)
client.CreatePodOrFail(filterPod, testlib.WithService(filterPodName))
if cse.filter {
recordevents.DeployEventRecordOrFail(ctx, client, filterPodName)
} else {
recordevents.DeployEventRecordOrFail(ctx, client, filterPodName, recordevents.EchoEvent)
}

// construct branch subscriber
subPodName := fmt.Sprintf("parallel-%s-branch-%d-sub", tc.name, branchNumber)
subPod := resources.SequenceStepperPod(subPodName, subPodName)
client.CreatePodOrFail(subPod, testlib.WithService(subPodName))
recordevents.DeployEventRecordOrFail(
ctx,
client,
subPodName,
recordevents.ReplyWithAppendedData(subPodName),
)

parallelBranches[branchNumber] = flowsv1.ParallelBranch{
Filter: &duckv1.Destination{
Expand Down
24 changes: 14 additions & 10 deletions test/e2e/helpers/sequence_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ func SequenceTestHelper(
// create a stepper Pod with Service
podName := config.podName
msgAppender := config.msgAppender
stepperPod := resources.SequenceStepperPod(podName, msgAppender)
recordevents.DeployEventRecordOrFail(
ctx, client, podName,
recordevents.ReplyWithAppendedData(msgAppender),
)

client.CreatePodOrFail(stepperPod, testlib.WithService(podName))
// create a new step
step := v1beta1.SequenceStep{
Destination: duckv1.Destination{
Expand Down Expand Up @@ -131,8 +133,7 @@ func SequenceTestHelper(
event.SetSource(eventSource)
event.SetType(testlib.DefaultEventType)
msg := fmt.Sprintf("TestSequence %s", uuid.New().String())
body := fmt.Sprintf(`{"msg":"%s"}`, msg)
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
if err := event.SetData(cloudevents.TextPlain, msg); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(
Expand All @@ -149,7 +150,8 @@ func SequenceTestHelper(
}
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
cetest.HasSource(eventSource),
cetest.DataContains(expectedMsg),
cetest.HasDataContentType(cloudevents.TextPlain),
cetest.HasData([]byte(expectedMsg)),
))
})
}
Expand Down Expand Up @@ -192,9 +194,11 @@ func SequenceV1TestHelper(
// create a stepper Pod with Service
podName := config.podName
msgAppender := config.msgAppender
stepperPod := resources.SequenceStepperPod(podName, msgAppender)
recordevents.DeployEventRecordOrFail(
ctx, client, podName,
recordevents.ReplyWithAppendedData(msgAppender),
)

client.CreatePodOrFail(stepperPod, testlib.WithService(podName))
// create a new step
step := flowsv1.SequenceStep{
Destination: duckv1.Destination{
Expand Down Expand Up @@ -247,8 +251,7 @@ func SequenceV1TestHelper(
event.SetSource(eventSource)
event.SetType(testlib.DefaultEventType)
msg := fmt.Sprintf("TestSequence %s", uuid.New().String())
body := fmt.Sprintf(`{"msg":"%s"}`, msg)
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
if err := event.SetData(cloudevents.TextPlain, msg); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(
Expand All @@ -265,7 +268,8 @@ func SequenceV1TestHelper(
}
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
cetest.HasSource(eventSource),
cetest.DataContains(expectedMsg),
cetest.HasDataContentType(cloudevents.TextPlain),
cetest.HasData([]byte(expectedMsg)),
))
})
}
31 changes: 0 additions & 31 deletions test/lib/recordevents/event_info_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/util/wait"
pkgTest "knative.dev/pkg/test"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
)

const (
Expand All @@ -42,34 +39,6 @@ const (
retryTimeout = 4 * time.Minute
)

type EventRecordOption = func(*corev1.Pod, *testlib.Client) error

func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod {
client.CreateServiceAccountOrFail(name)
client.CreateRoleOrFail(resources.Role(name,
resources.WithRuleForRole(&rbacv1.PolicyRule{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get"},
}),
resources.WithRuleForRole(&rbacv1.PolicyRule{
APIGroups: []string{""},
Resources: []string{"events"},
Verbs: []string{rbacv1.VerbAll},
}),
))
client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace)

eventRecordPod := EventRecordPod(name, name)
client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...)
err := pkgTest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace)
if err != nil {
client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err))
}
client.WaitForServiceEndpointsOrFail(ctx, name, 1)
return eventRecordPod
}

// Deploys a new recordevents pod and start the associated EventInfoStore
func StartEventRecordOrFail(ctx context.Context, client *testlib.Client, podName string, options ...EventRecordOption) (*EventInfoStore, *corev1.Pod) {
eventRecordPod := DeployEventRecordOrFail(ctx, client, podName, options...)
Expand Down
Loading