Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions test/e2e/helpers/channel_defaulter_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import (
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/ghodss/yaml"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/uuid"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/apis/messaging/config"
eventingtesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/duck"
"knative.dev/eventing/test/lib/resources"
reconciler "knative.dev/pkg/reconciler"
Expand Down Expand Up @@ -89,21 +89,26 @@ func defaultChannelTestHelper(t *testing.T, client *lib.Client, expectedChannel
channelName := "e2e-defaulter-channel"
senderName := "e2e-defaulter-sender"
subscriptionName := "e2e-defaulter-subscription"
loggerPodName := "e2e-defaulter-logger-pod"
recordEventsPodName := "e2e-defaulter-recordevents-pod"

// create channel
client.CreateChannelWithDefaultOrFail(eventingtesting.NewChannel(channelName, client.Namespace))

// create logger service as the subscriber
pod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(pod, lib.WithService(loggerPodName))
// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer eventTracker.Cleanup()

// create subscription to subscribe the channel, and forward the received events to the logger service
client.CreateSubscriptionOrFail(
subscriptionName,
channelName,
lib.ChannelTypeMeta,
resources.WithSubscriberForSubscription(loggerPodName),
resources.WithSubscriberForSubscription(recordEventsPodName),
)

// wait for all test resources to be ready, so that we can start sending events
Expand Down Expand Up @@ -135,18 +140,20 @@ func defaultChannelTestHelper(t *testing.T, client *lib.Client, expectedChannel
t.Fatalf("The defaultchannel is expected to create 1 underlying channel, but got %d", len(filteredObjs))
}

// send fake CloudEvent to the channel
body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID())
event := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, body),
cloudevents.WithSource(senderName),
)
client.SendFakeEventToAddressableOrFail(senderName, channelName, lib.ChannelTypeMeta, event)
// send CloudEvent to the channel
event := cloudevents.NewEvent()
event.SetID("dummy")
eventSource := fmt.Sprintf("http://%s.svc/", senderName)
event.SetSource(eventSource)
event.SetType(lib.DefaultEventType)
body := fmt.Sprintf(`{"msg":"TestSingleEvent %s"}`, uuid.New().String())
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(senderName, channelName, lib.ChannelTypeMeta, event)

// verify the logger service receives the event
if err := client.CheckLog(loggerPodName, lib.CheckerContains(body)); err != nil {
t.Fatalf("String %q not found in logs of logger pod %q: %v", body, loggerPodName, err)
}
eventTracker.AssertWaitMatchSourceData(t, recordEventsPodName, eventSource, body, 1, 1)
}

// updateDefaultChannelCM will update the default channel configmap
Expand Down