Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions test/e2e/helpers/sequence_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ func SequenceTestHelper(
// create a stepper Pod with Service
podName := config.podName
msgAppender := config.msgAppender
stepperPod := resources.SequenceStepperPod(podName, msgAppender)
recordevents.DeployEventRecordOrFail(
ctx, client, podName,
recordevents.ReplyWithAppendedData(msgAppender),
)

client.CreatePodOrFail(stepperPod, testlib.WithService(podName))
// create a new step
step := v1beta1.SequenceStep{
Destination: duckv1.Destination{
Expand Down Expand Up @@ -131,8 +133,7 @@ func SequenceTestHelper(
event.SetSource(eventSource)
event.SetType(testlib.DefaultEventType)
msg := fmt.Sprintf("TestSequence %s", uuid.New().String())
body := fmt.Sprintf(`{"msg":"%s"}`, msg)
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
if err := event.SetData(cloudevents.TextPlain, msg); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(
Expand All @@ -149,7 +150,8 @@ func SequenceTestHelper(
}
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
cetest.HasSource(eventSource),
cetest.DataContains(expectedMsg),
cetest.HasDataContentType(cloudevents.TextPlain),
cetest.HasData([]byte(expectedMsg)),
))
})
}
Expand Down Expand Up @@ -192,9 +194,11 @@ func SequenceV1TestHelper(
// create a stepper Pod with Service
podName := config.podName
msgAppender := config.msgAppender
stepperPod := resources.SequenceStepperPod(podName, msgAppender)
recordevents.DeployEventRecordOrFail(
ctx, client, podName,
recordevents.ReplyWithAppendedData(msgAppender),
)

client.CreatePodOrFail(stepperPod, testlib.WithService(podName))
// create a new step
step := flowsv1.SequenceStep{
Destination: duckv1.Destination{
Expand Down Expand Up @@ -247,8 +251,7 @@ func SequenceV1TestHelper(
event.SetSource(eventSource)
event.SetType(testlib.DefaultEventType)
msg := fmt.Sprintf("TestSequence %s", uuid.New().String())
body := fmt.Sprintf(`{"msg":"%s"}`, msg)
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
if err := event.SetData(cloudevents.TextPlain, msg); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(
Expand All @@ -265,7 +268,8 @@ func SequenceV1TestHelper(
}
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
cetest.HasSource(eventSource),
cetest.DataContains(expectedMsg),
cetest.HasDataContentType(cloudevents.TextPlain),
cetest.HasData([]byte(expectedMsg)),
))
})
}
6 changes: 5 additions & 1 deletion test/lib/recordevents/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type envConfig struct {

// The event data to use in the reply, if enabled
ReplyEventData string `envconfig:"REPLY_EVENT_DATA" default:"" required:"false"`

// This string to append in the data field in the reply, if enabled.
// This will threat the data as text/plain field
ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"`
}

func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer {
Expand All @@ -71,7 +75,7 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ
var replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo)
if env.Reply {
logging.FromContext(ctx).Info("Observer will reply with an event")
replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData)
replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData, env.ReplyAppendData)
} else {
logging.FromContext(ctx).Info("Observer won't reply with an event")
replyFunc = NoOpReply
Expand Down
22 changes: 21 additions & 1 deletion test/lib/recordevents/observer/reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"go.uber.org/zap"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
Expand All @@ -32,7 +33,7 @@ func NoOpReply(_ context.Context, writer http.ResponseWriter, _ recordevents.Eve
writer.WriteHeader(http.StatusAccepted)
}

func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) {
func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string, replyAppendData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) {
return func(ctx context.Context, writer http.ResponseWriter, info recordevents.EventInfo) {
if info.Error != "" {
writer.WriteHeader(http.StatusBadRequest)
Expand All @@ -51,17 +52,36 @@ func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyE
outputEvent := info.Event.Clone()

if replyEventSource != "" {
logging.FromContext(ctx).Infof("Setting reply event source '%s'", replyEventSource)
outputEvent.SetSource(replyEventSource)
}
if replyEventType != "" {
logging.FromContext(ctx).Infof("Setting reply event type '%s'", replyEventType)
outputEvent.SetType(replyEventType)
}
if replyEventData != "" {
logging.FromContext(ctx).Infof("Setting reply event data '%s'", replyAppendData)
if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(replyEventData)); err != nil {
logging.FromContext(ctx).Warn("Cannot set the event data")
}
}
if replyAppendData != "" {
var d string
if outputEvent.Data() == nil {
d = replyAppendData
} else {
if err := info.Event.DataAs(&d); err != nil {
logging.FromContext(ctx).Warn("Cannot read the event data as text/plain")
}
d = d + replyAppendData
}
logging.FromContext(ctx).Infof("Setting appended event data '%s'", d)
if err := outputEvent.SetData(cloudevents.TextPlain, d); err != nil {
logging.FromContext(ctx).Warn("Cannot set the event data")
}
}

logging.FromContext(ctx).Infow("Replying with", zap.Stringer("event", outputEvent))
err := cehttp.WriteResponseWriter(ctx, binding.ToMessage(&outputEvent), 200, writer)
if err != nil {
logging.FromContext(ctx).Warn("Error while writing the event as response", err)
Expand Down
18 changes: 18 additions & 0 deletions test/lib/recordevents/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ func ReplyWithTransformedEvent(replyEventType string, replyEventSource string, r
}
}

// ReplyWithAppendedData is an option to let the recordevents reply with the transformed event with appended data
func ReplyWithAppendedData(appendData string) EventRecordOption {
return func(pod *corev1.Pod, client *testlib.Client) error {
pod.Spec.Containers[0].Env = append(
pod.Spec.Containers[0].Env,
corev1.EnvVar{Name: "REPLY", Value: "true"},
)
if appendData != "" {
pod.Spec.Containers[0].Env = append(
pod.Spec.Containers[0].Env,
corev1.EnvVar{Name: "REPLY_APPEND_DATA", Value: appendData},
)
}

return nil
}
}

// DeployEventRecordOrFail deploys the recordevents image with necessary sa, roles, rb to execute the image
func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod {
client.CreateServiceAccountOrFail(name)
Expand Down