From f6a7db9f541920a51cb5a3c12b55804aa97cfa18 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 14 Oct 2020 10:38:08 +0200 Subject: [PATCH] Using recordevents as sequencestepper Signed-off-by: Francesco Guardiani --- test/e2e/helpers/sequence_test_helper.go | 24 +++++++++++++--------- test/lib/recordevents/observer/observer.go | 6 +++++- test/lib/recordevents/observer/reply.go | 22 +++++++++++++++++++- test/lib/recordevents/resources.go | 18 ++++++++++++++++ 4 files changed, 58 insertions(+), 12 deletions(-) diff --git a/test/e2e/helpers/sequence_test_helper.go b/test/e2e/helpers/sequence_test_helper.go index 807384498ee..5f0c44dc0fb 100644 --- a/test/e2e/helpers/sequence_test_helper.go +++ b/test/e2e/helpers/sequence_test_helper.go @@ -76,9 +76,11 @@ func SequenceTestHelper( // create a stepper Pod with Service podName := config.podName msgAppender := config.msgAppender - stepperPod := resources.SequenceStepperPod(podName, msgAppender) + recordevents.DeployEventRecordOrFail( + ctx, client, podName, + recordevents.ReplyWithAppendedData(msgAppender), + ) - client.CreatePodOrFail(stepperPod, testlib.WithService(podName)) // create a new step step := v1beta1.SequenceStep{ Destination: duckv1.Destination{ @@ -131,8 +133,7 @@ func SequenceTestHelper( event.SetSource(eventSource) event.SetType(testlib.DefaultEventType) msg := fmt.Sprintf("TestSequence %s", uuid.New().String()) - body := fmt.Sprintf(`{"msg":"%s"}`, msg) - if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + if err := event.SetData(cloudevents.TextPlain, msg); err != nil { st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } client.SendEventToAddressable( @@ -149,7 +150,8 @@ func SequenceTestHelper( } eventTracker.AssertAtLeast(1, recordevents.MatchEvent( cetest.HasSource(eventSource), - cetest.DataContains(expectedMsg), + cetest.HasDataContentType(cloudevents.TextPlain), + cetest.HasData([]byte(expectedMsg)), )) }) } @@ -192,9 +194,11 @@ func SequenceV1TestHelper( // create a stepper Pod with Service podName := config.podName msgAppender := config.msgAppender - stepperPod := resources.SequenceStepperPod(podName, msgAppender) + recordevents.DeployEventRecordOrFail( + ctx, client, podName, + recordevents.ReplyWithAppendedData(msgAppender), + ) - client.CreatePodOrFail(stepperPod, testlib.WithService(podName)) // create a new step step := flowsv1.SequenceStep{ Destination: duckv1.Destination{ @@ -247,8 +251,7 @@ func SequenceV1TestHelper( event.SetSource(eventSource) event.SetType(testlib.DefaultEventType) msg := fmt.Sprintf("TestSequence %s", uuid.New().String()) - body := fmt.Sprintf(`{"msg":"%s"}`, msg) - if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + if err := event.SetData(cloudevents.TextPlain, msg); err != nil { st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } client.SendEventToAddressable( @@ -265,7 +268,8 @@ func SequenceV1TestHelper( } eventTracker.AssertAtLeast(1, recordevents.MatchEvent( cetest.HasSource(eventSource), - cetest.DataContains(expectedMsg), + cetest.HasDataContentType(cloudevents.TextPlain), + cetest.HasData([]byte(expectedMsg)), )) }) } diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index badc5048802..b57dac9c718 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -58,6 +58,10 @@ type envConfig struct { // The event data to use in the reply, if enabled ReplyEventData string `envconfig:"REPLY_EVENT_DATA" default:"" required:"false"` + + // This string to append in the data field in the reply, if enabled. + // This will threat the data as text/plain field + ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { @@ -71,7 +75,7 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ var replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) if env.Reply { logging.FromContext(ctx).Info("Observer will reply with an event") - replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData) + replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData, env.ReplyAppendData) } else { logging.FromContext(ctx).Info("Observer won't reply with an event") replyFunc = NoOpReply diff --git a/test/lib/recordevents/observer/reply.go b/test/lib/recordevents/observer/reply.go index 96456977f0c..f60945d1b00 100644 --- a/test/lib/recordevents/observer/reply.go +++ b/test/lib/recordevents/observer/reply.go @@ -23,6 +23,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "go.uber.org/zap" "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" @@ -32,7 +33,7 @@ func NoOpReply(_ context.Context, writer http.ResponseWriter, _ recordevents.Eve writer.WriteHeader(http.StatusAccepted) } -func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) { +func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string, replyAppendData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) { return func(ctx context.Context, writer http.ResponseWriter, info recordevents.EventInfo) { if info.Error != "" { writer.WriteHeader(http.StatusBadRequest) @@ -51,17 +52,36 @@ func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyE outputEvent := info.Event.Clone() if replyEventSource != "" { + logging.FromContext(ctx).Infof("Setting reply event source '%s'", replyEventSource) outputEvent.SetSource(replyEventSource) } if replyEventType != "" { + logging.FromContext(ctx).Infof("Setting reply event type '%s'", replyEventType) outputEvent.SetType(replyEventType) } if replyEventData != "" { + logging.FromContext(ctx).Infof("Setting reply event data '%s'", replyAppendData) if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(replyEventData)); err != nil { logging.FromContext(ctx).Warn("Cannot set the event data") } } + if replyAppendData != "" { + var d string + if outputEvent.Data() == nil { + d = replyAppendData + } else { + if err := info.Event.DataAs(&d); err != nil { + logging.FromContext(ctx).Warn("Cannot read the event data as text/plain") + } + d = d + replyAppendData + } + logging.FromContext(ctx).Infof("Setting appended event data '%s'", d) + if err := outputEvent.SetData(cloudevents.TextPlain, d); err != nil { + logging.FromContext(ctx).Warn("Cannot set the event data") + } + } + logging.FromContext(ctx).Infow("Replying with", zap.Stringer("event", outputEvent)) err := cehttp.WriteResponseWriter(ctx, binding.ToMessage(&outputEvent), 200, writer) if err != nil { logging.FromContext(ctx).Warn("Error while writing the event as response", err) diff --git a/test/lib/recordevents/resources.go b/test/lib/recordevents/resources.go index 7b018818c63..36307c7957b 100644 --- a/test/lib/recordevents/resources.go +++ b/test/lib/recordevents/resources.go @@ -73,6 +73,24 @@ func ReplyWithTransformedEvent(replyEventType string, replyEventSource string, r } } +// ReplyWithAppendedData is an option to let the recordevents reply with the transformed event with appended data +func ReplyWithAppendedData(appendData string) EventRecordOption { + return func(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + if appendData != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_APPEND_DATA", Value: appendData}, + ) + } + + return nil + } +} + // DeployEventRecordOrFail deploys the recordevents image with necessary sa, roles, rb to execute the image func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod { client.CreateServiceAccountOrFail(name)