Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions test/e2e/helpers/channel_dls_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"fmt"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/resources"
)

Expand All @@ -33,8 +33,8 @@ func ChannelDeadLetterSinkTestHelper(t *testing.T,
channelTestRunner lib.ChannelTestRunner,
options ...lib.SetupClientOption) {
const (
senderName = "e2e-channelchain-sender"
loggerPodName = "e2e-channel-dls-logger-pod"
senderName = "e2e-channelchain-sender"
recordEventsPodName = "e2e-channel-dls-recordevents-pod"
)
channelNames := []string{"e2e-channel-dls"}
// subscriptionNames corresponds to Subscriptions
Expand All @@ -48,34 +48,41 @@ func ChannelDeadLetterSinkTestHelper(t *testing.T,
client.CreateChannelsOrFail(channelNames, &channel)
client.WaitForResourcesReadyOrFail(&channel)

// create loggerPod and expose it as a service
pod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(pod, lib.WithService(loggerPodName))
// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer eventTracker.Cleanup()

// create subscriptions that subscribe to a service that does not exist
client.CreateSubscriptionsOrFail(
subscriptionNames,
channelNames[0],
&channel,
resources.WithSubscriberForSubscription("does-not-exist"),
resources.WithDeadLetterSinkForSubscription(loggerPodName),
resources.WithDeadLetterSinkForSubscription(recordEventsPodName),
)

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// send fake CloudEvent to the first channel
body := fmt.Sprintf("TestChannelDeadLetterSink %s", uuid.NewUUID())
event := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, body),
cloudevents.WithSource(senderName),
)
client.SendFakeEventToAddressableOrFail(senderName, channelNames[0], &channel, event)
// send CloudEvent to the first channel
event := cloudevents.NewEvent()
event.SetID("dummy")
eventSource := fmt.Sprintf("http://%s.svc/", senderName)
event.SetSource(eventSource)
event.SetType(lib.DefaultEventType)
body := fmt.Sprintf(`{"msg":"TestChannelDeadLetterSink %s"}`, uuid.New().String())
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
client.SendEventToAddressable(senderName, channelNames[0], &channel, event)

// check if the logging service receives the correct number of event messages
expectedContentCount := len(subscriptionNames)
if err := client.CheckLog(loggerPodName, lib.CheckerContainsCount(body, expectedContentCount)); err != nil {
st.Fatalf("String %q does not appear %d times in logs of logger pod %q: %v", body, expectedContentCount, loggerPodName, err)
}
eventTracker.AssertWaitMatchSourceData(t, recordEventsPodName, eventSource, body, expectedContentCount, expectedContentCount)
})
}