diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index 5562496c656..ccf990f5db9 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -21,7 +21,7 @@ package e2e import ( "testing" - cloudevents "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go/v2" "knative.dev/eventing/test/e2e/helpers" ) @@ -36,7 +36,7 @@ EventSource ---> Channel ---> Subscription ---> Service(Logger) func TestSingleBinaryEventForChannel(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, - cloudevents.Binary, + cloudevents.EncodingBinary, helpers.SubscriptionV1alpha1, "", channelTestRunner, @@ -46,7 +46,7 @@ func TestSingleBinaryEventForChannel(t *testing.T) { func TestSingleStructuredEventForChannel(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, - cloudevents.Structured, + cloudevents.EncodingStructured, helpers.SubscriptionV1alpha1, "", channelTestRunner, @@ -56,7 +56,7 @@ func TestSingleStructuredEventForChannel(t *testing.T) { func TestSingleBinaryEventForChannelV1Beta1(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, - cloudevents.Binary, + cloudevents.EncodingBinary, helpers.SubscriptionV1beta1, "", channelTestRunner, @@ -66,7 +66,7 @@ func TestSingleBinaryEventForChannelV1Beta1(t *testing.T) { func TestSingleBinaryEventForChannelV1Beta1SubscribeToV1Alpha1(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, - cloudevents.Binary, + cloudevents.EncodingBinary, helpers.SubscriptionV1beta1, "messaging.knative.dev/v1alpha1", channelTestRunner, @@ -76,7 +76,7 @@ func TestSingleBinaryEventForChannelV1Beta1SubscribeToV1Alpha1(t *testing.T) { func TestSingleStructuredEventForChannelV1Beta1(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, - cloudevents.Structured, + cloudevents.EncodingStructured, helpers.SubscriptionV1beta1, "", channelTestRunner, diff --git a/test/e2e/helpers/channel_single_event_helper.go b/test/e2e/helpers/channel_single_event_helper.go index 80883b34853..6a2e790bf75 100644 --- a/test/e2e/helpers/channel_single_event_helper.go +++ b/test/e2e/helpers/channel_single_event_helper.go @@ -20,12 +20,13 @@ import ( "fmt" "testing" + "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" + cloudevents "github.com/cloudevents/sdk-go/v2" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/lib/resources/sender" ) type SubscriptionVersion string @@ -41,15 +42,15 @@ const ( // a subscription to its v1alpha1 version by using channelVersion to override it. // channelVersion == "" means that the version of the channel subscribed to is not // modified. -func SingleEventForChannelTestHelper(t *testing.T, encoding string, +func SingleEventForChannelTestHelper(t *testing.T, encoding cloudevents.Encoding, subscriptionVersion SubscriptionVersion, channelVersion string, channelTestRunner lib.ChannelTestRunner, options ...lib.SetupClientOption) { - channelName := "e2e-singleevent-channel-" + encoding - senderName := "e2e-singleevent-sender-" + encoding - subscriptionName := "e2e-singleevent-subscription-" + encoding - loggerPodName := "e2e-singleevent-logger-pod-" + encoding + channelName := "e2e-singleevent-channel-" + encoding.String() + senderName := "e2e-singleevent-sender-" + encoding.String() + subscriptionName := "e2e-singleevent-subscription-" + encoding.String() + eventRecorder := "e2e-singleevent-event-record-pod-" + encoding.String() channelTestRunner.RunTests(t, lib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) { st.Logf("Run test with channel %q", channel) @@ -59,9 +60,14 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding string, // create channel client.CreateChannelOrFail(channelName, &channel) - // create logger service as the subscriber - pod := resources.EventLoggerPod(loggerPodName) - client.CreatePodOrFail(pod, lib.WithService(loggerPodName)) + // create event logger pod and service + eventRecordPod := resources.EventRecordPod(eventRecorder) + client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder)) + eventTracker, err := client.NewEventInfoStore(eventRecorder, t.Logf) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer eventTracker.Cleanup() // If the caller specified a different version, override it here. if channelVersion != "" { @@ -75,32 +81,43 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding string, subscriptionName, channelName, &channel, - resources.WithSubscriberForSubscription(loggerPodName), + resources.WithSubscriberForSubscription(eventRecorder), ) case SubscriptionV1beta1: client.CreateSubscriptionOrFailV1Beta1( subscriptionName, channelName, &channel, - resources.WithSubscriberForSubscriptionV1Beta1(loggerPodName), + resources.WithSubscriberForSubscriptionV1Beta1(eventRecorder), ) } // wait for all test resources to be ready, so that we can start sending events client.WaitForAllTestResourcesReadyOrFail() - // send fake CloudEvent to the channel - body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID()) - event := cloudevents.New( - fmt.Sprintf(`{"msg":%q}`, body), - cloudevents.WithSource(senderName), - cloudevents.WithEncoding(encoding), + // send CloudEvent to the channel + event := cloudevents.NewEvent() + event.SetID("dummy") + + eventSource := fmt.Sprintf("http://%s.svc/", senderName) + event.SetSource(eventSource) + event.SetType(lib.DefaultEventType) + + body := fmt.Sprintf(`{"msg":"TestSingleEvent %s"}`, uuid.New().String()) + if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + st.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable( + senderName, + channelName, + &channel, + event, + sender.WithEncoding(encoding), + sender.EnableIncrementalId(), ) - client.SendFakeEventToAddressableOrFail(senderName, channelName, &channel, event) // verify the logger service receives the event - if err := client.CheckLog(loggerPodName, lib.CheckerContains(body)); err != nil { - st.Fatalf("String %q not found in logs of logger pod %q: %v", body, loggerPodName, err) - } + eventTracker.AssertWaitMatchSourceData(t, eventRecorder, eventSource, body, 1, 1) }) } diff --git a/test/lib/checkevents.go b/test/lib/checkevents.go index 3c9478d71a6..675955a56cc 100644 --- a/test/lib/checkevents.go +++ b/test/lib/checkevents.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "sync" + "testing" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -236,6 +237,15 @@ func (ei *EventInfoStore) WaitAtLeastNMatch(f EventInfoMatchFunc, n int) ([]Even return matchRet, internalErr } +func (ei *EventInfoStore) MustWaitAtLeastNMatch(t testing.TB, f EventInfoMatchFunc, n int) []EventInfo { + events, err := ei.WaitAtLeastNMatch(f, n) + if err != nil { + t.Fatalf("Timeout waiting for %d matches. Error: %v", n, err) + } + + return events +} + // Wait for at least minCount events with source exactly matching source and data contained within the event // data field. If source is the empty string, don't check the source. If maxCount is >0, return an error // if more than maxCount entries are seen. @@ -263,6 +273,12 @@ func (ei *EventInfoStore) WaitMatchSourceData(source string, data string, minCou return nil } +func (ei *EventInfoStore) AssertWaitMatchSourceData(tb testing.TB, eventRecord string, source string, data string, minCount int, maxCount int) { + if err := ei.WaitMatchSourceData(source, data, minCount, maxCount); err != nil { + tb.Fatalf("Timeout waiting for source %q and data %q. It does not appear at least %d times in the event record pod %q: %v", source, data, minCount, eventRecord, err) + } +} + // Does the provided EventInfo match some criteria type EventInfoMatchFunc func(EventInfo) error diff --git a/test/lib/cloudevents.go b/test/lib/cloudevents.go new file mode 100644 index 00000000000..2162ccef421 --- /dev/null +++ b/test/lib/cloudevents.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package lib + +const ( + DefaultEventSource = "http://knative.test" + DefaultEventType = "dev.knative.test.event" +) diff --git a/test/upgrade/smoke_test.go b/test/upgrade/smoke_test.go index 858d45373d5..da67fb6d9de 100644 --- a/test/upgrade/smoke_test.go +++ b/test/upgrade/smoke_test.go @@ -18,14 +18,14 @@ package upgrade import ( "testing" - cloudevents "github.com/cloudevents/sdk-go" + cloudevents "github.com/cloudevents/sdk-go/v2" "knative.dev/eventing/test/e2e/helpers" ) func runSmokeTest(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, - cloudevents.Binary, + cloudevents.EncodingBinary, helpers.SubscriptionV1alpha1, "", channelTestRunner,