Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions test/e2e/channel_single_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package e2e
import (
"testing"

cloudevents "github.com/cloudevents/sdk-go"
cloudevents "github.com/cloudevents/sdk-go/v2"

"knative.dev/eventing/test/e2e/helpers"
)
Expand All @@ -36,7 +36,7 @@ EventSource ---> Channel ---> Subscription ---> Service(Logger)
func TestSingleBinaryEventForChannel(t *testing.T) {
helpers.SingleEventForChannelTestHelper(
t,
cloudevents.Binary,
cloudevents.EncodingBinary,
helpers.SubscriptionV1alpha1,
"",
channelTestRunner,
Expand All @@ -46,7 +46,7 @@ func TestSingleBinaryEventForChannel(t *testing.T) {
func TestSingleStructuredEventForChannel(t *testing.T) {
helpers.SingleEventForChannelTestHelper(
t,
cloudevents.Structured,
cloudevents.EncodingStructured,
helpers.SubscriptionV1alpha1,
"",
channelTestRunner,
Expand All @@ -56,7 +56,7 @@ func TestSingleStructuredEventForChannel(t *testing.T) {
func TestSingleBinaryEventForChannelV1Beta1(t *testing.T) {
helpers.SingleEventForChannelTestHelper(
t,
cloudevents.Binary,
cloudevents.EncodingBinary,
helpers.SubscriptionV1beta1,
"",
channelTestRunner,
Expand All @@ -66,7 +66,7 @@ func TestSingleBinaryEventForChannelV1Beta1(t *testing.T) {
func TestSingleBinaryEventForChannelV1Beta1SubscribeToV1Alpha1(t *testing.T) {
helpers.SingleEventForChannelTestHelper(
t,
cloudevents.Binary,
cloudevents.EncodingBinary,
helpers.SubscriptionV1beta1,
"messaging.knative.dev/v1alpha1",
channelTestRunner,
Expand All @@ -76,7 +76,7 @@ func TestSingleBinaryEventForChannelV1Beta1SubscribeToV1Alpha1(t *testing.T) {
func TestSingleStructuredEventForChannelV1Beta1(t *testing.T) {
helpers.SingleEventForChannelTestHelper(
t,
cloudevents.Structured,
cloudevents.EncodingStructured,
helpers.SubscriptionV1beta1,
"",
channelTestRunner,
Expand Down
61 changes: 39 additions & 22 deletions test/e2e/helpers/channel_single_event_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"fmt"
"testing"

"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

cloudevents "github.com/cloudevents/sdk-go/v2"
Comment thread
slinkydeveloper marked this conversation as resolved.
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
)

type SubscriptionVersion string
Expand All @@ -41,15 +42,15 @@ const (
// a subscription to its v1alpha1 version by using channelVersion to override it.
// channelVersion == "" means that the version of the channel subscribed to is not
// modified.
func SingleEventForChannelTestHelper(t *testing.T, encoding string,
func SingleEventForChannelTestHelper(t *testing.T, encoding cloudevents.Encoding,
subscriptionVersion SubscriptionVersion,
channelVersion string,
channelTestRunner lib.ChannelTestRunner,
options ...lib.SetupClientOption) {
channelName := "e2e-singleevent-channel-" + encoding
senderName := "e2e-singleevent-sender-" + encoding
subscriptionName := "e2e-singleevent-subscription-" + encoding
loggerPodName := "e2e-singleevent-logger-pod-" + encoding
channelName := "e2e-singleevent-channel-" + encoding.String()
senderName := "e2e-singleevent-sender-" + encoding.String()
subscriptionName := "e2e-singleevent-subscription-" + encoding.String()
eventRecorder := "e2e-singleevent-event-record-pod-" + encoding.String()

channelTestRunner.RunTests(t, lib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) {
st.Logf("Run test with channel %q", channel)
Expand All @@ -59,9 +60,14 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding string,
// create channel
client.CreateChannelOrFail(channelName, &channel)

// create logger service as the subscriber
pod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(pod, lib.WithService(loggerPodName))
// create event logger pod and service
eventRecordPod := resources.EventRecordPod(eventRecorder)
client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder))
eventTracker, err := client.NewEventInfoStore(eventRecorder, t.Logf)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer eventTracker.Cleanup()

// If the caller specified a different version, override it here.
if channelVersion != "" {
Expand All @@ -75,32 +81,43 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding string,
subscriptionName,
channelName,
&channel,
resources.WithSubscriberForSubscription(loggerPodName),
resources.WithSubscriberForSubscription(eventRecorder),
)
case SubscriptionV1beta1:
client.CreateSubscriptionOrFailV1Beta1(
subscriptionName,
channelName,
&channel,
resources.WithSubscriberForSubscriptionV1Beta1(loggerPodName),
resources.WithSubscriberForSubscriptionV1Beta1(eventRecorder),
)
}

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()

// send fake CloudEvent to the channel
body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID())
event := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, body),
cloudevents.WithSource(senderName),
cloudevents.WithEncoding(encoding),
// send CloudEvent to the channel
event := cloudevents.NewEvent()
event.SetID("dummy")

eventSource := fmt.Sprintf("http://%s.svc/", senderName)
event.SetSource(eventSource)
event.SetType(lib.DefaultEventType)

body := fmt.Sprintf(`{"msg":"TestSingleEvent %s"}`, uuid.New().String())
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

client.SendEventToAddressable(
senderName,
channelName,
&channel,
event,
sender.WithEncoding(encoding),
sender.EnableIncrementalId(),
)
client.SendFakeEventToAddressableOrFail(senderName, channelName, &channel, event)

// verify the logger service receives the event
if err := client.CheckLog(loggerPodName, lib.CheckerContains(body)); err != nil {
st.Fatalf("String %q not found in logs of logger pod %q: %v", body, loggerPodName, err)
}
eventTracker.AssertWaitMatchSourceData(t, eventRecorder, eventSource, body, 1, 1)
})
}
16 changes: 16 additions & 0 deletions test/lib/checkevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"sync"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -236,6 +237,15 @@ func (ei *EventInfoStore) WaitAtLeastNMatch(f EventInfoMatchFunc, n int) ([]Even
return matchRet, internalErr
}

func (ei *EventInfoStore) MustWaitAtLeastNMatch(t testing.TB, f EventInfoMatchFunc, n int) []EventInfo {
events, err := ei.WaitAtLeastNMatch(f, n)
if err != nil {
t.Fatalf("Timeout waiting for %d matches. Error: %v", n, err)
}

return events
}

// Wait for at least minCount events with source exactly matching source and data contained within the event
// data field. If source is the empty string, don't check the source. If maxCount is >0, return an error
// if more than maxCount entries are seen.
Expand Down Expand Up @@ -263,6 +273,12 @@ func (ei *EventInfoStore) WaitMatchSourceData(source string, data string, minCou
return nil
}

func (ei *EventInfoStore) AssertWaitMatchSourceData(tb testing.TB, eventRecord string, source string, data string, minCount int, maxCount int) {
if err := ei.WaitMatchSourceData(source, data, minCount, maxCount); err != nil {
tb.Fatalf("Timeout waiting for source %q and data %q. It does not appear at least %d times in the event record pod %q: %v", source, data, minCount, eventRecord, err)
}
}

// Does the provided EventInfo match some criteria
type EventInfoMatchFunc func(EventInfo) error

Expand Down
21 changes: 21 additions & 0 deletions test/lib/cloudevents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lib

const (
DefaultEventSource = "http://knative.test"
DefaultEventType = "dev.knative.test.event"
)
4 changes: 2 additions & 2 deletions test/upgrade/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package upgrade
import (
"testing"

cloudevents "github.com/cloudevents/sdk-go"
cloudevents "github.com/cloudevents/sdk-go/v2"
"knative.dev/eventing/test/e2e/helpers"
)

func runSmokeTest(t *testing.T) {
helpers.SingleEventForChannelTestHelper(
t,
cloudevents.Binary,
cloudevents.EncodingBinary,
helpers.SubscriptionV1alpha1,
"",
channelTestRunner,
Expand Down