Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions test/e2e/helpers/channel_chain_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"fmt"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/resources"
)

Expand All @@ -33,14 +33,15 @@ func ChannelChainTestHelper(t *testing.T,
channelTestRunner lib.ChannelTestRunner,
options ...lib.SetupClientOption) {
const (
senderName = "e2e-channelchain-sender"
loggerPodName = "e2e-channelchain-logger-pod"
senderName = "e2e-channelchain-sender"
recordEventsPodName = "e2e-channelchain-recordevents-pod"
)
channelNames := []string{"e2e-channelchain1", "e2e-channelchain2"}
// subscriptionNames1 corresponds to Subscriptions on channelNames[0]
subscriptionNames1 := []string{"e2e-channelchain-subs11", "e2e-channelchain-subs12"}
// subscriptionNames2 corresponds to Subscriptions on channelNames[1]
subscriptionNames2 := []string{"e2e-channelchain-subs21"}
eventSource := fmt.Sprintf("http://%s.svc/", senderName)

channelTestRunner.RunTests(t, lib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) {
client := lib.Setup(st, true, options...)
Expand All @@ -51,8 +52,13 @@ func ChannelChainTestHelper(t *testing.T,
client.WaitForResourcesReadyOrFail(&channel)

// create loggerPod and expose it as a service
pod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(pod, lib.WithService(loggerPodName))
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer eventTracker.Cleanup()

// create subscriptions that subscribe the first channel, and reply events directly to the second channel
client.CreateSubscriptionsOrFail(
Expand All @@ -66,24 +72,29 @@ func ChannelChainTestHelper(t *testing.T,
subscriptionNames2,
channelNames[1],
&channel,
resources.WithSubscriberForSubscription(loggerPodName),
resources.WithSubscriberForSubscription(recordEventsPodName),
)

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// send fake CloudEvent to the first channel
body := fmt.Sprintf("TestChannelChainEvent %s", uuid.NewUUID())
event := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, body),
cloudevents.WithSource(senderName),
)
client.SendFakeEventToAddressableOrFail(senderName, channelNames[0], &channel, event)
// send CloudEvent to the first channel
event := cloudevents.NewEvent()
event.SetID("dummy")
event.SetSource(eventSource)
event.SetType(lib.DefaultEventType)

body := fmt.Sprintf(`{"msg":"TestSingleEvent %s"}`, uuid.New().String())
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

client.SendEventToAddressable(senderName, channelNames[0], &channel, event)

// check if the logging service receives the correct number of event messages
expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2)
if err := client.CheckLog(loggerPodName, lib.CheckerContainsCount(body, expectedContentCount)); err != nil {
st.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", body, expectedContentCount, loggerPodName, err)
}

// verify the logger service receives the event
eventTracker.AssertWaitMatchSourceData(t, recordEventsPodName, eventSource, body, expectedContentCount, expectedContentCount)
})
}
48 changes: 24 additions & 24 deletions test/test_images/transformevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import (
"flag"
"log"

cloudevents "github.com/cloudevents/sdk-go"
cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"

"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
)

Expand All @@ -40,37 +39,28 @@ func init() {
flag.StringVar(&eventData, "event-data", "", "Cloudevent data body.")
}

func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error {
ctx := event.Context.AsV1()

dataBytes, err := event.DataBytes()
if err != nil {
log.Printf("Got Data Error: %s\n", err.Error())
return err
}
func gotEvent(event cloudevents.Event) (*cloudevents.Event, error) {
log.Println("Received a new event: ")
log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes)
log.Printf("[%s] %s %s: %s", event.Time().String(), event.Source(), event.Type(), string(event.Data()))

outputEvent := event.Clone()

if eventSource != "" {
ctx.SetSource(eventSource)
outputEvent.SetSource(eventSource)
}
if eventType != "" {
ctx.SetType(eventType)
outputEvent.SetType(eventType)
}
if eventData != "" {
dataBytes = []byte(eventData)
}
r := cloudevents.Event{
Context: ctx,
Data: string(dataBytes),
if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(eventData)); err != nil {
return nil, err
}
}
r.SetDataContentType(cloudevents.ApplicationJSON)

log.Println("Transform the event to: ")
log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes)
log.Printf("[%s] %s %s: %s", outputEvent.Time().String(), outputEvent.Source(), outputEvent.Type(), string(outputEvent.Data()))

resp.RespondWith(200, &r)
return nil
return &outputEvent, nil
}

func main() {
Expand All @@ -82,11 +72,21 @@ func main() {
log.Fatalf("Unable to setup trace publishing: %v", err)
}

c, err := kncloudevents.NewDefaultClient()
t, err := cloudevents.NewHTTP(cloudevents.WithPort(8080))
if err != nil {
log.Fatalf("failed to create transport, %v", err)
}

c, err := cloudevents.NewClient(t,
cloudevents.WithTimeNow(),
cloudevents.WithUUIDs(),
)
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

log.Printf("listening on 8080")
log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent))
if err := c.StartReceiver(context.Background(), gotEvent); err != nil {
log.Fatalf("failed to start receiver: %s", err)
}
}