diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 49df1a98320..9d9c3380be2 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -132,18 +132,13 @@ func tracingTest( // assertEventMatch verifies that recorder pod contains at least one event that // matches mustMatch. It is used to show that the expected event was sent to // the logger Pod. It returns a list of the matching events. -func assertEventMatch(t *testing.T, client *lib.Client, recorderPodName string, - mustMatch cetest.EventMatcher) []recordevents.EventInfo { +func assertEventMatch(t *testing.T, client *lib.Client, recorderPodName string, mustMatch cetest.EventMatcher) []recordevents.EventInfo { targetTracker, err := recordevents.NewEventInfoStore(client, recorderPodName) if err != nil { t.Fatalf("Pod tracker failed: %v", err) } defer targetTracker.Cleanup() - matches, err := targetTracker.WaitAtLeastNMatch(recordevents.MatchEvent(mustMatch), 1) - if err != nil { - t.Fatalf("Expected messages not found: %v", err) - } - return matches + return targetTracker.AssertAtLeast(1, recordevents.MatchEvent(mustMatch)) } // getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the diff --git a/test/e2e/helpers/channel_chain_test_helper.go b/test/e2e/helpers/channel_chain_test_helper.go index 27bb3693372..7553dbdda87 100644 --- a/test/e2e/helpers/channel_chain_test_helper.go +++ b/test/e2e/helpers/channel_chain_test_helper.go @@ -21,6 +21,7 @@ import ( "testing" cloudevents "github.com/cloudevents/sdk-go/v2" + . "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -53,12 +54,7 @@ func ChannelChainTestHelper(t *testing.T, client.WaitForResourcesReadyOrFail(&channel) // create loggerPod and expose it as a service - recordEventsPod := resources.EventRecordPod(recordEventsPodName) - client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) - eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName) defer eventTracker.Cleanup() // create subscriptions that subscribe the first channel, and reply events directly to the second channel @@ -92,10 +88,10 @@ func ChannelChainTestHelper(t *testing.T, client.SendEventToAddressable(senderName, channelNames[0], &channel, event) - // check if the logging service receives the correct number of event messages - expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) - // verify the logger service receives the event - eventTracker.AssertWaitMatchSourceData(t, eventSource, body, expectedContentCount, expectedContentCount) + eventTracker.AssertAtLeast(len(subscriptionNames1)*len(subscriptionNames2), recordevents.MatchEvent( + HasSource(eventSource), + HasData([]byte(body)), + )) }) } diff --git a/test/e2e/helpers/channel_defaulter_test_helper.go b/test/e2e/helpers/channel_defaulter_test_helper.go index 6d203470a48..a2c1baace4a 100644 --- a/test/e2e/helpers/channel_defaulter_test_helper.go +++ b/test/e2e/helpers/channel_defaulter_test_helper.go @@ -22,6 +22,7 @@ import ( "time" cloudevents "github.com/cloudevents/sdk-go/v2" + . "github.com/cloudevents/sdk-go/v2/test" "github.com/ghodss/yaml" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -96,12 +97,7 @@ func defaultChannelTestHelper(t *testing.T, client *lib.Client, expectedChannel client.CreateChannelWithDefaultOrFail(eventingtesting.NewChannel(channelName, client.Namespace)) // create event logger pod and service as the subscriber - recordEventsPod := resources.EventRecordPod(recordEventsPodName) - client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) - eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName) defer eventTracker.Cleanup() // create subscription to subscribe the channel, and forward the received events to the logger service @@ -154,7 +150,10 @@ func defaultChannelTestHelper(t *testing.T, client *lib.Client, expectedChannel client.SendEventToAddressable(senderName, channelName, lib.ChannelTypeMeta, event) // verify the logger service receives the event - eventTracker.AssertWaitMatchSourceData(t, eventSource, body, 1, 1) + eventTracker.AssertAtLeast(1, recordevents.MatchEvent( + HasSource(eventSource), + HasData([]byte(body)), + )) } // updateDefaultChannelCM will update the default channel configmap diff --git a/test/e2e/helpers/channel_dls_test_helper.go b/test/e2e/helpers/channel_dls_test_helper.go index 00b21fae134..8bab0d67f60 100644 --- a/test/e2e/helpers/channel_dls_test_helper.go +++ b/test/e2e/helpers/channel_dls_test_helper.go @@ -21,6 +21,7 @@ import ( "testing" cloudevents "github.com/cloudevents/sdk-go/v2" + . "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -50,12 +51,7 @@ func ChannelDeadLetterSinkTestHelper(t *testing.T, client.WaitForResourcesReadyOrFail(&channel) // create event logger pod and service as the subscriber - recordEventsPod := resources.EventRecordPod(recordEventsPodName) - client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) - eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName) defer eventTracker.Cleanup() // create subscriptions that subscribe to a service that does not exist @@ -83,7 +79,9 @@ func ChannelDeadLetterSinkTestHelper(t *testing.T, client.SendEventToAddressable(senderName, channelNames[0], &channel, event) // check if the logging service receives the correct number of event messages - expectedContentCount := len(subscriptionNames) - eventTracker.AssertWaitMatchSourceData(t, eventSource, body, expectedContentCount, expectedContentCount) + eventTracker.AssertAtLeast(len(subscriptionNames), recordevents.MatchEvent( + HasSource(eventSource), + HasData([]byte(body)), + )) }) } diff --git a/test/e2e/helpers/channel_event_tranformation_test_helper.go b/test/e2e/helpers/channel_event_tranformation_test_helper.go index b1382437fe8..7bdacccc650 100644 --- a/test/e2e/helpers/channel_event_tranformation_test_helper.go +++ b/test/e2e/helpers/channel_event_tranformation_test_helper.go @@ -21,6 +21,7 @@ import ( "testing" cloudevents "github.com/cloudevents/sdk-go/v2" + . "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -69,12 +70,7 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T, client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName)) // create event logger pod and service as the subscriber - recordEventsPod := resources.EventRecordPod(recordEventsPodName) - client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) - eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName) defer eventTracker.Cleanup() // create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel @@ -108,13 +104,9 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T, client.SendEventToAddressable(senderName, channelNames[0], &channel, eventToSend) // check if the logging service receives the correct number of event messages - expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2) - eventTracker.AssertWaitMatchSourceData( - t, - eventSource, - transformedEventBody, - expectedContentCount, - expectedContentCount, - ) + eventTracker.AssertAtLeast(len(subscriptionNames1)*len(subscriptionNames2), recordevents.MatchEvent( + HasSource(eventSource), + HasData([]byte(transformedEventBody)), + )) }) } diff --git a/test/e2e/helpers/channel_single_event_helper.go b/test/e2e/helpers/channel_single_event_helper.go index 524ef9afe51..f82b9ec8473 100644 --- a/test/e2e/helpers/channel_single_event_helper.go +++ b/test/e2e/helpers/channel_single_event_helper.go @@ -20,6 +20,7 @@ import ( "fmt" "testing" + . "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -61,12 +62,7 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding cloudevents.Encoding client.CreateChannelOrFail(channelName, &channel) // create event logger pod and service - eventRecordPod := resources.EventRecordPod(eventRecorder) - client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder)) - eventTracker, err := recordevents.NewEventInfoStore(client, eventRecorder) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } + eventTracker, _ := recordevents.StartEventRecordOrFail(client, eventRecorder) defer eventTracker.Cleanup() // If the caller specified a different version, override it here. @@ -113,6 +109,9 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding cloudevents.Encoding ) // verify the logger service receives the event - eventTracker.AssertWaitMatchSourceData(t, eventSource, body, 1, 1) + eventTracker.AssertAtLeast(1, recordevents.MatchEvent( + HasData([]byte(body)), + HasSource(eventSource), + )) }) } diff --git a/test/e2e/helpers/parallel_test_helper.go b/test/e2e/helpers/parallel_test_helper.go index 1cb33b7f18b..8b8e881ffcc 100644 --- a/test/e2e/helpers/parallel_test_helper.go +++ b/test/e2e/helpers/parallel_test_helper.go @@ -19,6 +19,7 @@ import ( "fmt" "testing" + . "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -89,13 +90,7 @@ func ParallelTestHelper(t *testing.T, // create event logger pod and service eventRecorder := fmt.Sprintf("%s-event-record-pod", tc.name) - eventRecordPod := resources.EventRecordPod(eventRecorder) - client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder)) - eventTracker, err := recordevents.NewEventInfoStore(client, eventRecorder) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer eventTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, eventRecorder) // create channel as reply of the Parallel // TODO(chizhg): now we'll have to use a channel plus its subscription here, as reply of the Subscription @@ -135,10 +130,16 @@ func ParallelTestHelper(t *testing.T, senderPodName, tc.name, lib.FlowsParallelTypeMeta, - event) + event, + ) // verify the logger service receives the correct transformed event - eventTracker.AssertWaitMatchSourceData(t, eventSource, tc.expected, 1, 1) + eventTracker.AssertExact(1, recordevents.MatchEvent( + HasSource(eventSource), + recordevents.DataContains(tc.expected), + )) + + eventTracker.Cleanup() } }) } diff --git a/test/e2e/helpers/sequence_test_helper.go b/test/e2e/helpers/sequence_test_helper.go index 13c167d8fd9..40103604636 100644 --- a/test/e2e/helpers/sequence_test_helper.go +++ b/test/e2e/helpers/sequence_test_helper.go @@ -20,6 +20,7 @@ import ( "testing" cloudevents "github.com/cloudevents/sdk-go/v2" + cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -91,12 +92,7 @@ func SequenceTestHelper(t *testing.T, // make the logger service as a Knative service, and remove the channel and subscription. client.CreateChannelOrFail(channelName, &channel) // create event logger pod and service as the subscriber - recordEventsPod := resources.EventRecordPod(recordEventsPodName) - client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) - eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName) defer eventTracker.Cleanup() // create subscription to subscribe the channel, and forward the received events to the logger service client.CreateSubscriptionOrFail( @@ -144,6 +140,9 @@ func SequenceTestHelper(t *testing.T, for _, config := range stepSubscriberConfigs { expectedMsg += config.msgAppender } - eventTracker.AssertWaitMatchSourceData(t, eventSource, expectedMsg, 1, 1) + eventTracker.AssertAtLeast(1, recordevents.MatchEvent( + cetest.HasSource(eventSource), + recordevents.DataContains(expectedMsg), + )) }) } diff --git a/test/e2e/source_api_server_test.go b/test/e2e/source_api_server_test.go index a0552c0d6c2..ae0540a9145 100644 --- a/test/e2e/source_api_server_test.go +++ b/test/e2e/source_api_server_test.go @@ -22,7 +22,6 @@ import ( "testing" "time" - "github.com/cloudevents/sdk-go/v2/event" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -79,8 +78,7 @@ func TestApiServerSource(t *testing.T) { EventMode: mode, ServiceAccountName: serviceAccountName, }, - pod: func(name string) *corev1.Pod { return resources.HelloWorldPod(name) }, - expected: "", + pod: func(name string) *corev1.Pod { return resources.HelloWorldPod(name) }, }, { name: "event-ref-match-label", @@ -119,76 +117,65 @@ func TestApiServerSource(t *testing.T) { }, } - client := setup(t, true) - defer tearDown(client) - - // creates ServiceAccount and RoleBinding with a role for reading pods and events - r := resources.Role(roleName, - resources.WithRuleForRole(&rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"events", "pods"}, - Verbs: []string{"get", "list", "watch"}})) - client.CreateServiceAccountOrFail(serviceAccountName) - client.CreateRoleOrFail(r) - client.CreateRoleBindingOrFail( - serviceAccountName, - lib.RoleKind, - roleName, - fmt.Sprintf("%s-%s", serviceAccountName, roleName), - client.Namespace, - ) - for _, tc := range table { - - // create event logger pod and service - loggerPodName := fmt.Sprintf("%s-%s", baseLoggerPodName, tc.name) - tc.spec.Sink = duckv1.Destination{Ref: resources.ServiceKRef(loggerPodName)} - - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() - - apiServerSource := eventingtesting.NewApiServerSource( - fmt.Sprintf("%s-%s", baseApiServerSourceName, tc.name), - client.Namespace, - eventingtesting.WithApiServerSourceSpec(tc.spec), - ) - - client.CreateApiServerSourceOrFail(apiServerSource) - - // wait for all test resources to be ready - client.WaitForAllTestResourcesReadyOrFail() - - helloworldPod := tc.pod(fmt.Sprintf("%s-%s", baseHelloworldPodName, tc.name)) - client.CreatePodOrFail(helloworldPod) - - // verify the logger service receives the event(s) - // TODO(chizhg): right now it's only doing a very basic check by looking for the tc.data word, - // we can add a json matcher to improve it in the future. - - if tc.expected == "" { - time.Sleep(10 * time.Second) - ev, _, err := targetTracker.Find(recordevents.MatchEvent(func(have event.Event) error { - //TODO This really needs to be no-op? - return nil - })) - if err != nil { - t.Fatalf("Saw error looking for events: %v", err) + tc := tc // capture range variable + t.Run(tc.name, func(t *testing.T) { + // Setup client + client := setup(t, true) + defer tearDown(client) + + // creates ServiceAccount and RoleBinding with a role for reading pods and events + r := resources.Role(roleName, + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events", "pods"}, + Verbs: []string{"get", "list", "watch"}})) + client.CreateServiceAccountOrFail(serviceAccountName) + client.CreateRoleOrFail(r) + client.CreateRoleBindingOrFail( + serviceAccountName, + lib.RoleKind, + roleName, + fmt.Sprintf("%s-%s", serviceAccountName, roleName), + client.Namespace, + ) + + // create event record + recordEventPodName := fmt.Sprintf("%s-%s", baseLoggerPodName, tc.name) + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() + + spec := tc.spec + spec.Sink = duckv1.Destination{Ref: resources.ServiceKRef(recordEventPodName)} + + apiServerSource := eventingtesting.NewApiServerSource( + fmt.Sprintf("%s-%s", baseApiServerSourceName, tc.name), + client.Namespace, + eventingtesting.WithApiServerSourceSpec(spec), + ) + + client.CreateApiServerSourceOrFail(apiServerSource) + + // wait for all test resources to be ready + client.WaitForAllTestResourcesReadyOrFail() + + helloworldPod := tc.pod(fmt.Sprintf("%s-%s", baseHelloworldPodName, tc.name)) + client.CreatePodOrFail(helloworldPod) + + // verify the logger service receives the event(s) + // TODO(chizhg): right now it's only doing a very basic check by looking for the tc.data word, + // we can add a json matcher to improve it in the future. + + // Run asserts + if tc.expected == "" { + time.Sleep(10 * time.Second) + eventTracker.AssertNot(recordevents.Any()) + } else { + eventTracker.AssertAtLeast(1, recordevents.MatchEvent( + recordevents.DataContains(tc.expected), + )) } - if len(ev) != 0 { - t.Fatalf("Log is not empty in logger pod %q: %d events seen", loggerPodName, len(ev)) - } - - } else { - err = targetTracker.WaitMatchSourceData("", tc.expected, 1, -1) - if err != nil { - t.Fatalf("Error watching for data %s event in pod %s: %v", tc.expected, loggerPodName, err) - } - } + }) } } diff --git a/test/e2e/source_container_test.go b/test/e2e/source_container_test.go index eb196afb3f4..b81ccbd7981 100644 --- a/test/e2e/source_container_test.go +++ b/test/e2e/source_container_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + . "github.com/cloudevents/sdk-go/v2/test" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" @@ -28,7 +29,6 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" pkgTest "knative.dev/pkg/test" - "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" @@ -43,25 +43,20 @@ func TestContainerSource(t *testing.T) { // the heartbeats image is built from test_images/heartbeats imageName = "heartbeats" - loggerPodName = "e2e-container-source-logger-pod" + recordEventPodName = "e2e-container-source-logger-pod" ) client := setup(t, true) defer tearDown(client) - // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + // create event record pod + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() // create container source - data := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID()) + message := fmt.Sprintf("TestContainerSource%s", uuid.NewUUID()) // args are the arguments passing to the container, msg is used in the heartbeats image - args := []string{"--msg=" + data} + args := []string{"--msg=" + message} // envVars are the environment variables of the container envVars := []corev1.EnvVar{{ Name: "POD_NAME", @@ -89,7 +84,7 @@ func TestContainerSource(t *testing.T) { }, }, SourceSpec: duckv1.SourceSpec{ - Sink: duckv1.Destination{Ref: resources.KnativeRefForService(loggerPodName, client.Namespace)}, + Sink: duckv1.Destination{Ref: resources.KnativeRefForService(recordEventPodName, client.Namespace)}, }, }), ) @@ -99,9 +94,8 @@ func TestContainerSource(t *testing.T) { client.WaitForAllTestResourcesReadyOrFail() // verify the logger service receives the event - expectedCount := 2 - expectedSource := fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, templateName) - if err := targetTracker.WaitMatchSourceData(expectedSource, data, expectedCount, -1); err != nil { - t.Fatalf("String %q does not appear at least %d times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) - } + eventTracker.AssertAtLeast(2, recordevents.MatchEvent( + recordevents.MatchHeartBeatsImageMessage(message), + HasSource(fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, templateName)), + )) } diff --git a/test/e2e/source_ping_test.go b/test/e2e/source_ping_test.go index 3d875fcbf18..d746600abfb 100644 --- a/test/e2e/source_ping_test.go +++ b/test/e2e/source_ping_test.go @@ -25,6 +25,7 @@ import ( pkgResources "knative.dev/eventing/pkg/reconciler/mtnamespace/resources" "knative.dev/eventing/test/lib/recordevents" + . "github.com/cloudevents/sdk-go/v2/test" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" @@ -42,30 +43,25 @@ func TestPingSourceV1Alpha1(t *testing.T) { // Every 1 minute starting from now schedule = "*/1 * * * *" - loggerPodName = "e2e-ping-source-logger-pod-v1alpha1" + recordEventPodName = "e2e-ping-source-logger-pod-v1alpha1" ) client := setup(t, true) defer tearDown(client) // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() // create cron job source - data := fmt.Sprintf("TestPingSource %s", uuid.NewUUID()) + data := fmt.Sprintf(`{"msg":"TestPingSource %s"}`, uuid.NewUUID()) source := eventingtesting.NewPingSourceV1Alpha1( sourceName, client.Namespace, eventingtesting.WithPingSourceSpec(sourcesv1alpha1.PingSourceSpec{ Schedule: schedule, Data: data, - Sink: &duckv1.Destination{Ref: resources.KnativeRefForService(loggerPodName, client.Namespace)}, + Sink: &duckv1.Destination{Ref: resources.KnativeRefForService(recordEventPodName, client.Namespace)}, }), ) client.CreatePingSourceV1Alpha1OrFail(source) @@ -73,11 +69,10 @@ func TestPingSourceV1Alpha1(t *testing.T) { // wait for all test resources to be ready client.WaitForAllTestResourcesReadyOrFail() - // verify the logger service receives the event and only once - err = targetTracker.WaitMatchSourceData(sourcesv1alpha1.PingSourceSource(client.Namespace, sourceName), data, 1, 1) - if err != nil { - t.Fatalf("Error watching for data %s event in pod %s: %v", data, loggerPodName, err) - } + eventTracker.AssertExact(1, recordevents.MatchEvent( + HasSource(sourcesv1alpha1.PingSourceSource(client.Namespace, sourceName)), + HasData([]byte(data)), + )) } func TestPingSourceV1Alpha2(t *testing.T) { @@ -85,23 +80,18 @@ func TestPingSourceV1Alpha2(t *testing.T) { sourceName = "e2e-ping-source" // Every 1 minute starting from now - loggerPodName = "e2e-ping-source-logger-pod-v1alpha2" + recordEventPodName = "e2e-ping-source-logger-pod-v1alpha2" ) client := setup(t, true) defer tearDown(client) // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() // create cron job source - data := fmt.Sprintf("TestPingSource %s", uuid.NewUUID()) + data := fmt.Sprintf(`{"msg":"TestPingSource %s"}`, uuid.NewUUID()) source := eventingtesting.NewPingSourceV1Alpha2( sourceName, client.Namespace, @@ -109,7 +99,7 @@ func TestPingSourceV1Alpha2(t *testing.T) { JsonData: data, SourceSpec: duckv1.SourceSpec{ Sink: duckv1.Destination{ - Ref: resources.KnativeRefForService(loggerPodName, client.Namespace), + Ref: resources.KnativeRefForService(recordEventPodName, client.Namespace), }, }, }), @@ -120,10 +110,10 @@ func TestPingSourceV1Alpha2(t *testing.T) { client.WaitForAllTestResourcesReadyOrFail() // verify the logger service receives the event and only once - err = targetTracker.WaitMatchSourceData(sourcesv1alpha2.PingSourceSource(client.Namespace, sourceName), data, 1, 1) - if err != nil { - t.Fatalf("Error watching for data %s event in pod %s: %v", data, loggerPodName, err) - } + eventTracker.AssertExact(1, recordevents.MatchEvent( + HasSource(sourcesv1alpha2.PingSourceSource(client.Namespace, sourceName)), + HasData([]byte(data)), + )) } func TestPingSourceV1Alpha2ResourceScope(t *testing.T) { @@ -131,23 +121,18 @@ func TestPingSourceV1Alpha2ResourceScope(t *testing.T) { sourceName = "e2e-ping-source" // Every 1 minute starting from now - loggerPodName = "e2e-ping-source-logger-pod-v1alpha2rs" + recordEventPodName = "e2e-ping-source-logger-pod-v1alpha2rs" ) client := setup(t, true) defer tearDown(client) // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() // create cron job source - data := fmt.Sprintf("TestPingSource %s", uuid.NewUUID()) + data := fmt.Sprintf(`{"msg":"TestPingSource %s"}`, uuid.NewUUID()) source := eventingtesting.NewPingSourceV1Alpha2( sourceName, client.Namespace, @@ -156,7 +141,7 @@ func TestPingSourceV1Alpha2ResourceScope(t *testing.T) { JsonData: data, SourceSpec: duckv1.SourceSpec{ Sink: duckv1.Destination{ - Ref: resources.KnativeRefForService(loggerPodName, client.Namespace), + Ref: resources.KnativeRefForService(recordEventPodName, client.Namespace), }, }, }), @@ -167,10 +152,10 @@ func TestPingSourceV1Alpha2ResourceScope(t *testing.T) { client.WaitForAllTestResourcesReadyOrFail() // verify the logger service receives the event and only once - err = targetTracker.WaitMatchSourceData(sourcesv1alpha2.PingSourceSource(client.Namespace, sourceName), data, 1, 1) - if err != nil { - t.Fatalf("Error watching for data %s event in pod %s: %v", data, loggerPodName, err) - } + eventTracker.AssertExact(1, recordevents.MatchEvent( + HasSource(sourcesv1alpha2.PingSourceSource(client.Namespace, sourceName)), + HasData([]byte(data)), + )) } func TestPingSourceV1Alpha2EventTypes(t *testing.T) { @@ -190,12 +175,11 @@ func TestPingSourceV1Alpha2EventTypes(t *testing.T) { client.WaitForResourceReadyOrFail(pkgResources.DefaultBrokerName, lib.BrokerTypeMeta) // Create ping source - data := fmt.Sprintf("TestPingSource %s", uuid.NewUUID()) source := eventingtesting.NewPingSourceV1Alpha2( sourceName, client.Namespace, eventingtesting.WithPingSourceV1A2Spec(sourcesv1alpha2.PingSourceSpec{ - JsonData: data, + JsonData: fmt.Sprintf(`{"msg":"TestPingSource %s"}`, uuid.NewUUID()), SourceSpec: duckv1.SourceSpec{ Sink: duckv1.Destination{ // TODO change sink to be a non-Broker one once we revisit EventType https://github.com/knative/eventing/issues/2750 diff --git a/test/e2e/source_sinkbinding_v1alpha1_test.go b/test/e2e/source_sinkbinding_v1alpha1_test.go index 6bb9f613520..df3ef331d82 100644 --- a/test/e2e/source_sinkbinding_v1alpha1_test.go +++ b/test/e2e/source_sinkbinding_v1alpha1_test.go @@ -19,14 +19,13 @@ package e2e import ( "fmt" - "strings" "testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" - ce "github.com/cloudevents/sdk-go/v2" + . "github.com/cloudevents/sdk-go/v2/test" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" @@ -34,7 +33,6 @@ import ( pkgTest "knative.dev/pkg/test" "knative.dev/pkg/tracker" - "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" @@ -48,20 +46,15 @@ func TestSinkBindingDeployment(t *testing.T) { // the heartbeats image is built from test_images/heartbeats imageName = "heartbeats" - loggerPodName = "e2e-sink-binding-logger-pod" + recordEventPodName = "e2e-sink-binding-recordevent-pod" ) client := setup(t, true) defer tearDown(client) - // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + // create event record pod + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() extensionSecret := string(uuid.NewUUID()) @@ -69,7 +62,7 @@ func TestSinkBindingDeployment(t *testing.T) { sinkBinding := eventingtesting.NewSinkBindingV1Alpha1( sinkBindingName, client.Namespace, - eventingtesting.WithSinkV1A1(duckv1.Destination{Ref: resources.KnativeRefForService(loggerPodName, client.Namespace)}), + eventingtesting.WithSinkV1A1(duckv1.Destination{Ref: resources.KnativeRefForService(recordEventPodName, client.Namespace)}), eventingtesting.WithSubjectV1A1(tracker.Reference{ APIVersion: "apps/v1", Kind: "Deployment", @@ -82,7 +75,7 @@ func TestSinkBindingDeployment(t *testing.T) { ) client.CreateSinkBindingV1Alpha1OrFail(sinkBinding) - data := fmt.Sprintf("TestSinkBindingDeployment%s", uuid.NewUUID()) + message := fmt.Sprintf("TestSinkBindingDeployment%s", uuid.NewUUID()) client.CreateDeploymentOrFail(&appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: client.Namespace, @@ -105,7 +98,7 @@ func TestSinkBindingDeployment(t *testing.T) { Name: imageName, Image: pkgTest.ImagePath(imageName), ImagePullPolicy: corev1.PullAlways, - Args: []string{"--msg=" + data}, + Args: []string{"--msg=" + message}, Env: []corev1.EnvVar{{ Name: "POD_NAME", Value: deploymentName, @@ -122,33 +115,11 @@ func TestSinkBindingDeployment(t *testing.T) { // wait for all test resources to be ready client.WaitForAllTestResourcesReadyOrFail() - // Look for events with expected data, and sinkbinding extension - expectedCount := 2 - expectedSource := fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName) - matchFunc := func(ev ce.Event) error { - if expectedSource != ev.Source() { - return fmt.Errorf("expected source %s, saw %s", expectedSource, ev.Source()) - } - ext := ev.Extensions() - value, found := ext["sinkbinding"] - if !found { - return fmt.Errorf("didn't find extension sinkbinding") - } - if value != extensionSecret { - return fmt.Errorf("expension sinkbinding didn't match %s, saw %s", extensionSecret, value) - } - db := ev.Data() - if !strings.Contains(string(db), data) { - return fmt.Errorf("expected substring %s in %s", data, string(db)) - } - return nil - } - - _, err = targetTracker.WaitAtLeastNMatch(recordevents.MatchEvent(matchFunc), expectedCount) - if err != nil { - t.Fatalf("Data %s, extension %q does not appear at least %d times in events of logger pod %q: %v", data, extensionSecret, expectedCount, loggerPodName, err) - - } + eventTracker.AssertAtLeast(2, recordevents.MatchEvent( + recordevents.MatchHeartBeatsImageMessage(message), + HasSource(fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName)), + HasExtension("sinkbinding", extensionSecret), + )) } func TestSinkBindingCronJob(t *testing.T) { @@ -158,26 +129,21 @@ func TestSinkBindingCronJob(t *testing.T) { // the heartbeats image is built from test_images/heartbeats imageName = "heartbeats" - loggerPodName = "e2e-sink-binding-logger-pod" + recordEventPod = "e2e-sink-binding-recordevent-pod" ) client := setup(t, true) defer tearDown(client) // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPod) + defer eventTracker.Cleanup() // create sink binding sinkBinding := eventingtesting.NewSinkBindingV1Alpha1( sinkBindingName, client.Namespace, - eventingtesting.WithSinkV1A1(duckv1.Destination{Ref: resources.KnativeRefForService(loggerPodName, client.Namespace)}), + eventingtesting.WithSinkV1A1(duckv1.Destination{Ref: resources.KnativeRefForService(recordEventPod, client.Namespace)}), eventingtesting.WithSubjectV1A1(tracker.Reference{ APIVersion: "batch/v1", Kind: "Job", @@ -191,7 +157,7 @@ func TestSinkBindingCronJob(t *testing.T) { ) client.CreateSinkBindingV1Alpha1OrFail(sinkBinding) - data := fmt.Sprintf("TestSinkBindingCronJob%s", uuid.NewUUID()) + message := fmt.Sprintf("TestSinkBindingCronJob%s", uuid.NewUUID()) client.CreateCronJobOrFail(&batchv1beta1.CronJob{ ObjectMeta: metav1.ObjectMeta{ Namespace: client.Namespace, @@ -213,7 +179,7 @@ func TestSinkBindingCronJob(t *testing.T) { Name: imageName, Image: pkgTest.ImagePath(imageName), ImagePullPolicy: corev1.PullAlways, - Args: []string{"--msg=" + data}, + Args: []string{"--msg=" + message}, Env: []corev1.EnvVar{{ Name: "ONE_SHOT", Value: "true", @@ -236,9 +202,8 @@ func TestSinkBindingCronJob(t *testing.T) { client.WaitForAllTestResourcesReadyOrFail() // verify the logger service receives the event - expectedCount := 2 - expectedSource := fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName) - if err := targetTracker.WaitMatchSourceData(expectedSource, data, expectedCount, -1); err != nil { - t.Fatalf("String %q does not appear at least %d times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) - } + eventTracker.AssertAtLeast(2, recordevents.MatchEvent( + recordevents.MatchHeartBeatsImageMessage(message), + HasSource(fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName)), + )) } diff --git a/test/e2e/source_sinkbinding_v1alpha2_test.go b/test/e2e/source_sinkbinding_v1alpha2_test.go index 95b682da2d0..2705bc185a0 100644 --- a/test/e2e/source_sinkbinding_v1alpha2_test.go +++ b/test/e2e/source_sinkbinding_v1alpha2_test.go @@ -19,14 +19,13 @@ package e2e import ( "fmt" - "strings" "testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" - ce "github.com/cloudevents/sdk-go/v2" + . "github.com/cloudevents/sdk-go/v2/test" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" @@ -34,7 +33,6 @@ import ( pkgTest "knative.dev/pkg/test" "knative.dev/pkg/tracker" - "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" @@ -48,20 +46,15 @@ func TestSinkBindingV1Alpha2Deployment(t *testing.T) { // the heartbeats image is built from test_images/heartbeats imageName = "heartbeats" - loggerPodName = "e2e-sink-binding-logger-pod-v1alpha2dep" + recordEventPodName = "e2e-sink-binding-recordevent-pod-v1alpha2dep" ) client := setup(t, true) defer tearDown(client) // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() extensionSecret := string(uuid.NewUUID()) @@ -69,7 +62,7 @@ func TestSinkBindingV1Alpha2Deployment(t *testing.T) { sinkBinding := eventingtesting.NewSinkBindingV1Alpha2( sinkBindingName, client.Namespace, - eventingtesting.WithSinkV1A2(duckv1.Destination{Ref: resources.KnativeRefForService(loggerPodName, client.Namespace)}), + eventingtesting.WithSinkV1A2(duckv1.Destination{Ref: resources.KnativeRefForService(recordEventPodName, client.Namespace)}), eventingtesting.WithSubjectV1A2(tracker.Reference{ APIVersion: "apps/v1", Kind: "Deployment", @@ -82,7 +75,7 @@ func TestSinkBindingV1Alpha2Deployment(t *testing.T) { ) client.CreateSinkBindingV1Alpha2OrFail(sinkBinding) - data := fmt.Sprintf("TestSinkBindingDeployment%s", uuid.NewUUID()) + message := fmt.Sprintf("TestSinkBindingDeployment%s", uuid.NewUUID()) client.CreateDeploymentOrFail(&appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: client.Namespace, @@ -105,7 +98,7 @@ func TestSinkBindingV1Alpha2Deployment(t *testing.T) { Name: imageName, Image: pkgTest.ImagePath(imageName), ImagePullPolicy: corev1.PullAlways, - Args: []string{"--msg=" + data}, + Args: []string{"--msg=" + message}, Env: []corev1.EnvVar{{ Name: "POD_NAME", Value: deploymentName, @@ -123,32 +116,11 @@ func TestSinkBindingV1Alpha2Deployment(t *testing.T) { client.WaitForAllTestResourcesReadyOrFail() // Look for events with expected data, and sinkbinding extension - expectedCount := 2 - expectedSource := fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName) - matchFunc := func(ev ce.Event) error { - if expectedSource != ev.Source() { - return fmt.Errorf("expected source %s, saw %s", expectedSource, ev.Source()) - } - ext := ev.Extensions() - value, found := ext["sinkbinding"] - if !found { - return fmt.Errorf("didn't find extension sinkbinding") - } - if value != extensionSecret { - return fmt.Errorf("expension sinkbinding didn't match %s, saw %s", extensionSecret, value) - } - db := ev.Data() - if !strings.Contains(string(db), data) { - return fmt.Errorf("expected substring %s in %s", data, string(db)) - } - return nil - } - - _, err = targetTracker.WaitAtLeastNMatch(recordevents.MatchEvent(matchFunc), expectedCount) - if err != nil { - t.Fatalf("Data %s, extension %q does not appear at least %d times in events of logger pod %q: %v", data, extensionSecret, expectedCount, loggerPodName, err) - - } + eventTracker.AssertAtLeast(2, recordevents.MatchEvent( + recordevents.MatchHeartBeatsImageMessage(message), + HasSource(fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName)), + HasExtension("sinkbinding", extensionSecret), + )) } func TestSinkBindingV1Alpha2CronJob(t *testing.T) { @@ -158,26 +130,21 @@ func TestSinkBindingV1Alpha2CronJob(t *testing.T) { // the heartbeats image is built from test_images/heartbeats imageName = "heartbeats" - loggerPodName = "e2e-sink-binding-logger-pod-v1alpha2c" + recordEventPodName = "e2e-sink-binding-recordevent-pod-v1alpha2c" ) client := setup(t, true) defer tearDown(client) // create event logger pod and service - loggerPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) - targetTracker, err := recordevents.NewEventInfoStore(client, loggerPodName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventPodName) + defer eventTracker.Cleanup() // create sink binding sinkBinding := eventingtesting.NewSinkBindingV1Alpha2( sinkBindingName, client.Namespace, - eventingtesting.WithSinkV1A2(duckv1.Destination{Ref: resources.KnativeRefForService(loggerPodName, client.Namespace)}), + eventingtesting.WithSinkV1A2(duckv1.Destination{Ref: resources.KnativeRefForService(recordEventPodName, client.Namespace)}), eventingtesting.WithSubjectV1A2(tracker.Reference{ APIVersion: "batch/v1", Kind: "Job", @@ -191,7 +158,7 @@ func TestSinkBindingV1Alpha2CronJob(t *testing.T) { ) client.CreateSinkBindingV1Alpha2OrFail(sinkBinding) - data := fmt.Sprintf("TestSinkBindingCronJob%s", uuid.NewUUID()) + message := fmt.Sprintf("TestSinkBindingCronJob%s", uuid.NewUUID()) client.CreateCronJobOrFail(&batchv1beta1.CronJob{ ObjectMeta: metav1.ObjectMeta{ Namespace: client.Namespace, @@ -213,7 +180,7 @@ func TestSinkBindingV1Alpha2CronJob(t *testing.T) { Name: imageName, Image: pkgTest.ImagePath(imageName), ImagePullPolicy: corev1.PullAlways, - Args: []string{"--msg=" + data}, + Args: []string{"--msg=" + message}, Env: []corev1.EnvVar{{ Name: "ONE_SHOT", Value: "true", @@ -236,10 +203,9 @@ func TestSinkBindingV1Alpha2CronJob(t *testing.T) { client.WaitForAllTestResourcesReadyOrFail() // verify the logger service receives the event - expectedCount := 2 - expectedSource := fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName) - if err := targetTracker.WaitMatchSourceData(expectedSource, data, expectedCount, -1); err != nil { - t.Fatalf("String %q does not appear at least %d times in logs of logger pod %q: %v", data, expectedCount, loggerPodName, err) - } + eventTracker.AssertAtLeast(2, recordevents.MatchEvent( + recordevents.MatchHeartBeatsImageMessage(message), + HasSource(fmt.Sprintf("https://knative.dev/eventing/test/heartbeats/#%s/%s", client.Namespace, deploymentName)), + )) } diff --git a/test/e2e/trigger_dependency_annotation_test.go b/test/e2e/trigger_dependency_annotation_test.go index 367434897fe..596931bea63 100644 --- a/test/e2e/trigger_dependency_annotation_test.go +++ b/test/e2e/trigger_dependency_annotation_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + . "github.com/cloudevents/sdk-go/v2/test" "k8s.io/apimachinery/pkg/util/uuid" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -59,13 +60,8 @@ func TestTriggerDependencyAnnotation(t *testing.T) { client.WaitForResourceReadyOrFail(defaultBrokerName, lib.BrokerTypeMeta) // Create subscribers. - loggerPod := resources.EventRecordPod(subscriberName) - client.CreatePodOrFail(loggerPod, lib.WithService(subscriberName)) - targetTracker, err := recordevents.NewEventInfoStore(client, subscriberName) - if err != nil { - t.Fatalf("Pod tracker failed: %v", err) - } - defer targetTracker.Cleanup() + eventTracker, _ := recordevents.StartEventRecordOrFail(client, subscriberName) + defer eventTracker.Cleanup() // Wait for subscriber to become ready client.WaitForAllTestResourcesReadyOrFail() @@ -76,7 +72,7 @@ func TestTriggerDependencyAnnotation(t *testing.T) { resources.WithDependencyAnnotationTriggerV1Beta1(dependencyAnnotation), ) - jsonData := fmt.Sprintf("Test trigger-annotation %s", uuid.NewUUID()) + jsonData := fmt.Sprintf(`{"msg":"Test trigger-annotation %s"}`, uuid.NewUUID()) pingSource := eventingtesting.NewPingSourceV1Alpha2( pingSourceName, client.Namespace, @@ -101,8 +97,8 @@ func TestTriggerDependencyAnnotation(t *testing.T) { // Trigger should become ready after pingSource was created client.WaitForResourceReadyOrFail(triggerName, lib.TriggerTypeMeta) - err = targetTracker.WaitMatchSourceData(sourcesv1alpha2.PingSourceSource(client.Namespace, pingSourceName), jsonData, 1, -1) - if err != nil { - t.Fatalf("Error watching for data %s event in pod %s: %v", jsonData, subscriberName, err) - } + eventTracker.AssertAtLeast(1, recordevents.MatchEvent( + HasSource(sourcesv1alpha2.PingSourceSource(client.Namespace, pingSourceName)), + HasData([]byte(jsonData)), + )) } diff --git a/test/lib/recordevents/event_info.go b/test/lib/recordevents/event_info.go index 444b38b5819..3ddc51ae887 100644 --- a/test/lib/recordevents/event_info.go +++ b/test/lib/recordevents/event_info.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "math/rand" "net/http" + "strings" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -85,7 +86,13 @@ type SearchedInfo struct { // Pretty print the SearchedInfor for error messages func (s *SearchedInfo) String() string { - return fmt.Sprintf("%d events seen, last N = %s", s.TotalEvent, s.LastNEvent) + var sb strings.Builder + sb.WriteString(fmt.Sprintf("%d events seen, last %d events:", s.TotalEvent, len(s.LastNEvent))) + for _, ei := range s.LastNEvent { + sb.WriteString(ei.String()) + sb.WriteRune('\n') + } + return sb.String() } // Connection state for a REST connection to a pod diff --git a/test/lib/recordevents/event_info_matchers.go b/test/lib/recordevents/event_info_matchers.go index c5fa537cf9b..6d833bdffe0 100644 --- a/test/lib/recordevents/event_info_matchers.go +++ b/test/lib/recordevents/event_info_matchers.go @@ -18,13 +18,23 @@ package recordevents import ( "fmt" + "strings" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/event" cetest "github.com/cloudevents/sdk-go/v2/test" ) // Does the provided EventInfo match some criteria type EventInfoMatcher func(EventInfo) error +// Matcher that never fails +func Any() EventInfoMatcher { + return func(ei EventInfo) error { + return nil + } +} + // Convert a matcher that checks valid messages to a function // that checks EventInfo structures, returning an error for any that don't // contain valid events. @@ -37,3 +47,32 @@ func MatchEvent(evf ...cetest.EventMatcher) EventInfoMatcher { } } } + +// MatchHeartBeatsImageMessage matches that the data field of the event, in the format of the heartbeats image, contains the following msg field +func MatchHeartBeatsImageMessage(expectedMsg string) cetest.EventMatcher { + return cetest.AllOf( + cetest.HasDataContentType(cloudevents.ApplicationJSON), + func(have cloudevents.Event) error { + var m map[string]interface{} + err := have.DataAs(&m) + if err != nil { + return fmt.Errorf("cannot parse heartbeats message %s", err.Error()) + } + if m["msg"].(string) != expectedMsg { + return fmt.Errorf("heartbeats message don't match. Expected: '%s', Actual: '%s'", expectedMsg, m["msg"].(string)) + } + return nil + }, + ) +} + +// DataContains matches that the data field of the event, converted to a string, contains the provided string +func DataContains(expectedContainedString string) cetest.EventMatcher { + return func(have event.Event) error { + dataAsString := string(have.Data()) + if !strings.Contains(dataAsString, expectedContainedString) { + return fmt.Errorf("data '%s' doesn't contain '%s'", dataAsString, expectedContainedString) + } + return nil + } +} diff --git a/test/lib/recordevents/event_info_store.go b/test/lib/recordevents/event_info_store.go index 7167a5ad6c3..9b5a9f3858b 100644 --- a/test/lib/recordevents/event_info_store.go +++ b/test/lib/recordevents/event_info_store.go @@ -23,7 +23,6 @@ import ( "testing" "time" - cloudevents "github.com/cloudevents/sdk-go/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -190,19 +189,24 @@ func (ei *EventInfoStore) refreshData() ([]EventInfo, error) { // last 5 events seen and the total events matched. This SearchedInfo structure // is primarily to ease debugging in failure printouts. The provided function is // guaranteed to be called exactly once on each EventInfo from the pod. -func (ei *EventInfoStore) Find(f EventInfoMatcher) ([]EventInfo, SearchedInfo, error) { +// The error array contains the eventual match errors, while the last return error contains +// an eventual communication error while trying to get the events from the recordevents pod +func (ei *EventInfoStore) Find(f EventInfoMatcher) ([]EventInfo, SearchedInfo, []error, error) { const maxLastEvents = 5 allMatch := []EventInfo{} sInfo := SearchedInfo{} lastEvents := []EventInfo{} + var nonMatchingErrors []error allEvents, err := ei.refreshData() if err != nil { - return nil, sInfo, fmt.Errorf("error getting events %v", err) + return nil, sInfo, nonMatchingErrors, fmt.Errorf("error getting events %v", err) } for i := range allEvents { - if f(allEvents[i]) == nil { + if err := f(allEvents[i]); err == nil { allMatch = append(allMatch, allEvents[i]) + } else { + nonMatchingErrors = append(nonMatchingErrors, err) } lastEvents = append(lastEvents, allEvents[i]) if len(lastEvents) > maxLastEvents { @@ -213,85 +217,13 @@ func (ei *EventInfoStore) Find(f EventInfoMatcher) ([]EventInfo, SearchedInfo, e sInfo.LastNEvent = lastEvents sInfo.TotalEvent = len(allEvents) - return allMatch, sInfo, nil -} - -// Wait a long time (currently 4 minutes) until the provided function matches at least -// five events. The matching events are returned if we find at least n. If the -// function times out, an error is returned. -// If you need to perform assert on the result (aka you want to fail if error != nil), then use AssertAtLeast -func (ei *EventInfoStore) WaitAtLeastNMatch(f EventInfoMatcher, min int) ([]EventInfo, error) { - var matchRet []EventInfo - var internalErr error - - wait.PollImmediate(ei.retryInterval, ei.timeout, func() (bool, error) { - allMatch, sInfo, err := ei.Find(f) - if err != nil { - internalErr = fmt.Errorf("FAIL MATCHING: unexpected error during find: %v", err) - return false, nil - } - count := len(allMatch) - if count < min { - internalErr = fmt.Errorf("FAIL MATCHING: saw %d/%d matching events. recent events: (%s)", - count, min, &sInfo) - return false, nil - } - matchRet = allMatch - internalErr = nil - return true, nil - }) - return matchRet, internalErr -} - -// Deprecated: use AssertAtLeast -func (ei *EventInfoStore) MustWaitAtLeastNMatch(t testing.TB, f EventInfoMatcher, n int) []EventInfo { - events, err := ei.WaitAtLeastNMatch(f, n) - if err != nil { - t.Fatalf("Timeout waiting for %d matches. Error: %v", n, err) - } - - return events -} - -// Wait for at least minCount events with source exactly matching source and data contained within the event -// data field. If source is the empty string, don't check the source. If maxCount is >0, return an error -// if more than maxCount entries are seen. -// Deprecated: use AssertInRange -func (ei *EventInfoStore) WaitMatchSourceData(source string, data string, minCount int, maxCount int) error { - matchFunc := func(ev cloudevents.Event) error { - if source != "" && ev.Source() != source { - return fmt.Errorf("mismatched source: expected %s, saw %s", source, ev.Source()) - } - db := ev.Data() - body := string(db) - if strings.Contains(body, data) { - return nil - } else { - return fmt.Errorf("didn't find substring (%s) in data (%s)", data, body) - } - } - // verify the logger service receives the event and only once - match, err := ei.WaitAtLeastNMatch(MatchEvent(matchFunc), minCount) - if err != nil { - return fmt.Errorf("error waiting for event: %v", err) - } - if maxCount > 0 && len(match) > maxCount { - return fmt.Errorf("expected <= %d events, saw %d", maxCount, len(match)) - } - return nil -} - -// Deprecated: use AssertInRange -func (ei *EventInfoStore) AssertWaitMatchSourceData(tb testing.TB, source string, data string, minCount int, maxCount int) { - if err := ei.WaitMatchSourceData(source, data, minCount, maxCount); err != nil { - tb.Fatalf("Timeout waiting for source %q and data %q. It does not appear at least %d times in the event record pod %q: %v", source, data, minCount, ei.podName, err) - } + return allMatch, sInfo, nonMatchingErrors, nil } // Assert that there are at least min number of matches of f. // This method fails the test if the assert is not fulfilled. func (ei *EventInfoStore) AssertAtLeast(min int, f EventInfoMatcher) []EventInfo { - events, err := ei.WaitAtLeastNMatch(f, min) + events, err := ei.waitAtLeastNMatch(f, min) if err != nil { ei.tb.Fatalf("Timeout waiting for at least %d matches. Error: %v", min, err) } @@ -312,7 +244,7 @@ func (ei *EventInfoStore) AssertInRange(min int, max int, f EventInfoMatcher) [] // Assert that there aren't any matches of f. // This method fails the test if the assert is not fulfilled. func (ei *EventInfoStore) AssertNot(f EventInfoMatcher) []EventInfo { - res, recentEvents, err := ei.Find(f) + res, recentEvents, _, err := ei.Find(f) if err != nil { ei.tb.Fatalf("unexpected error during find on recordevents '%s': %v", ei.podName, err) } @@ -329,3 +261,44 @@ func (ei *EventInfoStore) AssertNot(f EventInfoMatcher) []EventInfo { func (ei *EventInfoStore) AssertExact(n int, f EventInfoMatcher) []EventInfo { return ei.AssertInRange(n, n, f) } + +// Wait a long time (currently 4 minutes) until the provided function matches at least +// five events. The matching events are returned if we find at least n. If the +// function times out, an error is returned. +// If you need to perform assert on the result (aka you want to fail if error != nil), then use AssertAtLeast +func (ei *EventInfoStore) waitAtLeastNMatch(f EventInfoMatcher, min int) ([]EventInfo, error) { + var matchRet []EventInfo + var internalErr error + + wait.PollImmediate(ei.retryInterval, ei.timeout, func() (bool, error) { + allMatch, sInfo, matchErrs, err := ei.Find(f) + if err != nil { + internalErr = fmt.Errorf("FAIL MATCHING: unexpected error during find: %v", err) + return false, nil + } + count := len(allMatch) + if count < min { + internalErr = fmt.Errorf( + "FAIL MATCHING: saw %d/%d matching events.\nRecent events: \n%s\nMatch errors: \n%s\n", + count, + min, + &sInfo, + formatErrors(matchErrs), + ) + return false, nil + } + matchRet = allMatch + internalErr = nil + return true, nil + }) + return matchRet, internalErr +} + +func formatErrors(errs []error) string { + var sb strings.Builder + for _, err := range errs { + sb.WriteString(err.Error()) + sb.WriteRune('\n') + } + return sb.String() +} diff --git a/test/lib/recordevents/event_info_store_test.go b/test/lib/recordevents/event_info_store_test.go index 7dd1e5f885c..47fcd3ece6a 100644 --- a/test/lib/recordevents/event_info_store_test.go +++ b/test/lib/recordevents/event_info_store_test.go @@ -139,7 +139,7 @@ func TestSequentialAndTrim(t *testing.T) { subEv := totalEv[:10] deg.setEv(1, subEv) ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, err := ei.Find(func(EventInfo) error { return nil }) + allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) if err != nil { t.Fatalf("Unexpected error from find: %v", err) } @@ -152,7 +152,7 @@ func TestSequentialAndTrim(t *testing.T) { subEv = totalEv[10:19] deg.setEv(11, subEv) - allData, _, err = ei.Find(func(EventInfo) error { return nil }) + allData, _, _, err = ei.Find(func(EventInfo) error { return nil }) if err != nil { t.Fatalf("Unexpected error from find: %v", err) } @@ -169,14 +169,14 @@ func TestOverlap(t *testing.T) { subEv := totalEv[:10] deg.setEv(1, subEv) ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, err := ei.Find(func(EventInfo) error { return nil }) + allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) if err != nil { t.Fatalf("Unexpected error from find: %v", err) } checkEvIDEqual(t, allData, expectedFull[:10]) subEv = totalEv[6:19] deg.setEv(7, subEv) - allData, _, err = ei.Find(func(EventInfo) error { return nil }) + allData, _, _, err = ei.Find(func(EventInfo) error { return nil }) if err != nil { t.Fatalf("Unexpected error from find: %v", err) } @@ -193,14 +193,14 @@ func TestGap(t *testing.T) { subEv := totalEv[:10] deg.setEv(1, subEv) ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, err := ei.Find(func(EventInfo) error { return nil }) + allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) if err != nil { t.Fatalf("Unexpected error from find: %v", err) } checkEvIDEqual(t, allData, expectedFull[:10]) subEv = totalEv[11:19] deg.setEv(12, subEv) - _, _, err = ei.Find(func(EventInfo) error { return nil }) + _, _, _, err = ei.Find(func(EventInfo) error { return nil }) if err == nil { t.Fatalf("Unexpected success from find") } @@ -216,14 +216,14 @@ func TestSequentialNoOp(t *testing.T) { subEv := totalEv[:10] deg.setEv(1, subEv) ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, err := ei.Find(func(EventInfo) error { return nil }) + allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) if err != nil { t.Fatalf("Unexpected error from find: %v", err) } checkEvIDEqual(t, allData, expectedFull[:10]) subEv = []EventInfo{} deg.setEv(11, subEv) - allData, _, err = ei.Find(func(EventInfo) error { return nil }) + allData, _, _, err = ei.Find(func(EventInfo) error { return nil }) if err != nil { t.Fatalf("Unexpected error from find: %v", err) } @@ -250,7 +250,7 @@ func TestWaitForN(t *testing.T) { } } - allMatch, waitErr = ei.WaitAtLeastNMatch(MatchEvent(matchFunc), 2) + allMatch, waitErr = ei.waitAtLeastNMatch(MatchEvent(matchFunc), 2) wg.Done() }() var tCalls int