Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions test/e2e/helpers/sequence_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ limitations under the License.
package helpers

import (
"encoding/json"
"fmt"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

"knative.dev/eventing/pkg/apis/flows/v1beta1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
eventingtesting "knative.dev/eventing/pkg/reconciler/testing/v1beta1"
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/resources"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
Expand All @@ -39,9 +38,9 @@ func SequenceTestHelper(t *testing.T,
sequenceName = "e2e-sequence"
senderPodName = "e2e-sequence-sender-pod"

channelName = "e2e-sequence-channel"
subscriptionName = "e2e-sequence-subscription"
loggerPodName = "e2e-sequence-logger-pod"
channelName = "e2e-sequence-channel"
subscriptionName = "e2e-sequence-subscription"
recordEventsPodName = "e2e-sequence-recordevents-pod"
)

stepSubscriberConfigs := []struct {
Expand Down Expand Up @@ -90,15 +89,20 @@ func SequenceTestHelper(t *testing.T,
// must be Addressable. In the future if we use Knative Serving in the tests, we can
// make the logger service as a Knative service, and remove the channel and subscription.
client.CreateChannelOrFail(channelName, &channel)
// create logger service as the subscriber
loggerPod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName))
// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer eventTracker.Cleanup()
// create subscription to subscribe the channel, and forward the received events to the logger service
client.CreateSubscriptionOrFail(
subscriptionName,
channelName,
&channel,
resources.WithSubscriberForSubscription(loggerPodName),
resources.WithSubscriberForSubscription(recordEventsPodName),
)
replyRef := &duckv1.KReference{Kind: channel.Kind, APIVersion: channel.APIVersion, Name: channelName, Namespace: client.Namespace}

Expand All @@ -117,19 +121,18 @@ func SequenceTestHelper(t *testing.T,
// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// send fake CloudEvent to the Sequence
msg := fmt.Sprintf("TestSequence %s", uuid.NewUUID())
// NOTE: the eventData format must be BaseData, as it needs to be correctly parsed in the stepper service.
eventData := cloudevents.BaseData{Message: msg}
eventDataBytes, err := json.Marshal(eventData)
if err != nil {
st.Fatalf("Failed to convert %v to json: %v", eventData, err)
// send CloudEvent to the Sequence
event := cloudevents.NewEvent()
event.SetID("dummy")
eventSource := fmt.Sprintf("http://%s.svc/", senderPodName)
event.SetSource(eventSource)
event.SetType(lib.DefaultEventType)
msg := fmt.Sprintf("TestSequence %s", uuid.New().String())
body := fmt.Sprintf(`{"msg":"%s"}`, msg)
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
event := cloudevents.New(
string(eventDataBytes),
cloudevents.WithSource(senderPodName),
)
client.SendFakeEventToAddressableOrFail(
client.SendEventToAddressable(
senderPodName,
sequenceName,
lib.FlowsSequenceTypeMeta,
Expand All @@ -140,8 +143,6 @@ func SequenceTestHelper(t *testing.T,
for _, config := range stepSubscriberConfigs {
expectedMsg += config.msgAppender
}
if err := client.CheckLog(loggerPodName, lib.CheckerContains(expectedMsg)); err != nil {
st.Fatalf("String %q not found in logs of logger pod %q: %v", expectedMsg, loggerPodName, err)
}
eventTracker.AssertWaitMatchSourceData(t, recordEventsPodName, eventSource, expectedMsg, 1, 1)
})
}
53 changes: 29 additions & 24 deletions test/test_images/sequencestepper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ package main

import (
"context"
"encoding/json"
"flag"
"log"

ce "github.com/cloudevents/sdk-go"

"knative.dev/eventing/test/lib/cloudevents"
cloudevents "github.com/cloudevents/sdk-go/v2"
)

var (
Expand All @@ -34,43 +33,49 @@ func init() {
flag.StringVar(&eventMsgAppender, "msg-appender", "", "a string we want to append on the event message")
}

func gotEvent(event ce.Event, resp *ce.EventResponse) error {
ctx := event.Context.AsV1()

data := &cloudevents.BaseData{}
if err := event.DataAs(data); err != nil {
log.Printf("Got Data Error: %s\n", err.Error())
return err
}

func gotEvent(event cloudevents.Event) (*cloudevents.Event, error) {
log.Println("Received a new event: ")
log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), data)
log.Printf("[%s] %s %s: %s", event.Time().String(), event.Source(), event.Type(), string(event.Data()))

outputEvent := event.Clone()

// append eventMsgAppender to message of the data
data.Message = data.Message + eventMsgAppender
r := ce.Event{
Context: ctx,
Data: data,
var data map[string]interface{}
if err := json.Unmarshal(event.Data(), &data); err != nil {
return nil, err
}
data["msg"] = data["msg"].(string) + eventMsgAppender
if eventData, err := json.Marshal(&data); err != nil {
return nil, err
} else if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(eventData)); err != nil {
return nil, err
}

r.SetDataContentType(ce.ApplicationJSON)

log.Println("Transform the event to: ")
log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), data)
log.Printf("[%s] %s %s: %s", outputEvent.Time().String(), outputEvent.Source(), outputEvent.Type(), string(outputEvent.Data()))

resp.RespondWith(200, &r)
return nil
return &outputEvent, nil
}

func main() {
// parse the command line flags
flag.Parse()

c, err := ce.NewDefaultClient()
t, err := cloudevents.NewHTTP(cloudevents.WithPort(8080))
if err != nil {
log.Fatalf("failed to create transport, %v", err)
}

c, err := cloudevents.NewClient(t,
cloudevents.WithTimeNow(),
cloudevents.WithUUIDs(),
)
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

log.Printf("listening on 8080")
log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent))
if err := c.StartReceiver(context.Background(), gotEvent); err != nil {
log.Fatalf("failed to start receiver: %s", err)
}
}