Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,13 @@ func tracingTest(
// assertEventMatch verifies that recorder pod contains at least one event that
// matches mustMatch. It is used to show that the expected event was sent to
// the logger Pod. It returns a list of the matching events.
func assertEventMatch(t *testing.T, client *lib.Client, recorderPodName string,
mustMatch cetest.EventMatcher) []recordevents.EventInfo {
func assertEventMatch(t *testing.T, client *lib.Client, recorderPodName string, mustMatch cetest.EventMatcher) []recordevents.EventInfo {
targetTracker, err := recordevents.NewEventInfoStore(client, recorderPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer targetTracker.Cleanup()
matches, err := targetTracker.WaitAtLeastNMatch(recordevents.MatchEvent(mustMatch), 1)
if err != nil {
t.Fatalf("Expected messages not found: %v", err)
}
return matches
return targetTracker.AssertAtLeast(1, recordevents.MatchEvent(mustMatch))
}

// getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the
Expand Down
16 changes: 6 additions & 10 deletions test/e2e/helpers/channel_chain_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -53,12 +54,7 @@ func ChannelChainTestHelper(t *testing.T,
client.WaitForResourcesReadyOrFail(&channel)

// create loggerPod and expose it as a service
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()

// create subscriptions that subscribe the first channel, and reply events directly to the second channel
Expand Down Expand Up @@ -92,10 +88,10 @@ func ChannelChainTestHelper(t *testing.T,

client.SendEventToAddressable(senderName, channelNames[0], &channel, event)

// check if the logging service receives the correct number of event messages
expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2)

// verify the logger service receives the event
eventTracker.AssertWaitMatchSourceData(t, eventSource, body, expectedContentCount, expectedContentCount)
eventTracker.AssertAtLeast(len(subscriptionNames1)*len(subscriptionNames2), recordevents.MatchEvent(
HasSource(eventSource),
HasData([]byte(body)),
))
})
}
13 changes: 6 additions & 7 deletions test/e2e/helpers/channel_defaulter_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/ghodss/yaml"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -96,12 +97,7 @@ func defaultChannelTestHelper(t *testing.T, client *lib.Client, expectedChannel
client.CreateChannelWithDefaultOrFail(eventingtesting.NewChannel(channelName, client.Namespace))

// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()

// create subscription to subscribe the channel, and forward the received events to the logger service
Expand Down Expand Up @@ -154,7 +150,10 @@ func defaultChannelTestHelper(t *testing.T, client *lib.Client, expectedChannel
client.SendEventToAddressable(senderName, channelName, lib.ChannelTypeMeta, event)

// verify the logger service receives the event
eventTracker.AssertWaitMatchSourceData(t, eventSource, body, 1, 1)
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
HasSource(eventSource),
HasData([]byte(body)),
))
}

// updateDefaultChannelCM will update the default channel configmap
Expand Down
14 changes: 6 additions & 8 deletions test/e2e/helpers/channel_dls_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -50,12 +51,7 @@ func ChannelDeadLetterSinkTestHelper(t *testing.T,
client.WaitForResourcesReadyOrFail(&channel)

// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()

// create subscriptions that subscribe to a service that does not exist
Expand Down Expand Up @@ -83,7 +79,9 @@ func ChannelDeadLetterSinkTestHelper(t *testing.T,
client.SendEventToAddressable(senderName, channelNames[0], &channel, event)

// check if the logging service receives the correct number of event messages
expectedContentCount := len(subscriptionNames)
eventTracker.AssertWaitMatchSourceData(t, eventSource, body, expectedContentCount, expectedContentCount)
eventTracker.AssertAtLeast(len(subscriptionNames), recordevents.MatchEvent(
HasSource(eventSource),
HasData([]byte(body)),
))
})
}
20 changes: 6 additions & 14 deletions test/e2e/helpers/channel_event_tranformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -69,12 +70,7 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T,
client.CreatePodOrFail(transformationPod, lib.WithService(transformationPodName))

// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()

// create subscriptions that subscribe the first channel, use the transformation service to transform the events and then forward the transformed events to the second channel
Expand Down Expand Up @@ -108,13 +104,9 @@ func EventTransformationForSubscriptionTestHelper(t *testing.T,
client.SendEventToAddressable(senderName, channelNames[0], &channel, eventToSend)

// check if the logging service receives the correct number of event messages
expectedContentCount := len(subscriptionNames1) * len(subscriptionNames2)
eventTracker.AssertWaitMatchSourceData(
t,
eventSource,
transformedEventBody,
expectedContentCount,
expectedContentCount,
)
eventTracker.AssertAtLeast(len(subscriptionNames1)*len(subscriptionNames2), recordevents.MatchEvent(
HasSource(eventSource),
HasData([]byte(transformedEventBody)),
))
})
}
13 changes: 6 additions & 7 deletions test/e2e/helpers/channel_single_event_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"testing"

. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -61,12 +62,7 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding cloudevents.Encoding
client.CreateChannelOrFail(channelName, &channel)

// create event logger pod and service
eventRecordPod := resources.EventRecordPod(eventRecorder)
client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder))
eventTracker, err := recordevents.NewEventInfoStore(client, eventRecorder)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
eventTracker, _ := recordevents.StartEventRecordOrFail(client, eventRecorder)
defer eventTracker.Cleanup()

// If the caller specified a different version, override it here.
Expand Down Expand Up @@ -113,6 +109,9 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding cloudevents.Encoding
)

// verify the logger service receives the event
eventTracker.AssertWaitMatchSourceData(t, eventSource, body, 1, 1)
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
HasData([]byte(body)),
HasSource(eventSource),
))
})
}
19 changes: 10 additions & 9 deletions test/e2e/helpers/parallel_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"testing"

. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -89,13 +90,7 @@ func ParallelTestHelper(t *testing.T,

// create event logger pod and service
eventRecorder := fmt.Sprintf("%s-event-record-pod", tc.name)
eventRecordPod := resources.EventRecordPod(eventRecorder)
client.CreatePodOrFail(eventRecordPod, lib.WithService(eventRecorder))
eventTracker, err := recordevents.NewEventInfoStore(client, eventRecorder)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer eventTracker.Cleanup()
eventTracker, _ := recordevents.StartEventRecordOrFail(client, eventRecorder)

// create channel as reply of the Parallel
// TODO(chizhg): now we'll have to use a channel plus its subscription here, as reply of the Subscription
Expand Down Expand Up @@ -135,10 +130,16 @@ func ParallelTestHelper(t *testing.T,
senderPodName,
tc.name,
lib.FlowsParallelTypeMeta,
event)
event,
)

// verify the logger service receives the correct transformed event
eventTracker.AssertWaitMatchSourceData(t, eventSource, tc.expected, 1, 1)
eventTracker.AssertExact(1, recordevents.MatchEvent(
HasSource(eventSource),
recordevents.DataContains(tc.expected),
))

eventTracker.Cleanup()
}
})
}
13 changes: 6 additions & 7 deletions test/e2e/helpers/sequence_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -91,12 +92,7 @@ func SequenceTestHelper(t *testing.T,
// make the logger service as a Knative service, and remove the channel and subscription.
client.CreateChannelOrFail(channelName, &channel)
// create event logger pod and service as the subscriber
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))
eventTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()
// create subscription to subscribe the channel, and forward the received events to the logger service
client.CreateSubscriptionOrFail(
Expand Down Expand Up @@ -144,6 +140,9 @@ func SequenceTestHelper(t *testing.T,
for _, config := range stepSubscriberConfigs {
expectedMsg += config.msgAppender
}
eventTracker.AssertWaitMatchSourceData(t, eventSource, expectedMsg, 1, 1)
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
cetest.HasSource(eventSource),
recordevents.DataContains(expectedMsg),
))
})
}
Loading