diff --git a/test/base/resources/cloud_event.go b/test/base/resources/cloud_event.go index d370be3be87..a14af298fbf 100644 --- a/test/base/resources/cloud_event.go +++ b/test/base/resources/cloud_event.go @@ -18,11 +18,12 @@ package resources // CloudEvent specifies the arguments for a CloudEvent used by the sendevents or transformevents image. type CloudEvent struct { - ID string - Type string - Source string - Data string // must be in json format - Encoding string // binary or structured + ID string + Type string + Source string + Extensions map[string]interface{} + Data string // must be in json format + Encoding string // binary or structured } // CloudEventBaseData defines a simple struct that can be used as data of a CloudEvent. diff --git a/test/base/resources/eventing.go b/test/base/resources/eventing.go index db2b4cdc053..55f5bdedbe4 100644 --- a/test/base/resources/eventing.go +++ b/test/base/resources/eventing.go @@ -19,6 +19,7 @@ package resources // This file contains functions that construct Eventing resources. import ( + "fmt" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" @@ -105,8 +106,8 @@ func Broker(name string, options ...BrokerOption) *eventingv1alpha1.Broker { return broker } -// WithTriggerFilter returns an option that adds a TriggerFilter for the given Trigger. -func WithTriggerFilter(eventSource, eventType string) TriggerOption { +// WithDeprecatedSourceAndTypeTriggerFilter returns an option that adds a TriggerFilter with DeprecatedSourceAndType for the given Trigger. +func WithDeprecatedSourceAndTypeTriggerFilter(eventSource, eventType string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { triggerFilter := &eventingv1alpha1.TriggerFilter{ DeprecatedSourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ @@ -118,6 +119,22 @@ func WithTriggerFilter(eventSource, eventType string) TriggerOption { } } +// WithAttributesTriggerFilter returns an option that adds a TriggerFilter with Attributes for the given Trigger. +func WithAttributesTriggerFilter(eventSource, eventType string, extensions map[string]interface{}) TriggerOption { + attrs := make(map[string]string) + attrs["type"] = eventType + attrs["source"] = eventSource + for k, v := range extensions { + attrs[k] = fmt.Sprintf("%v", v) + } + triggerFilterAttributes := eventingv1alpha1.TriggerFilterAttributes(attrs) + return func(t *eventingv1alpha1.Trigger) { + t.Spec.Filter = &eventingv1alpha1.TriggerFilter{ + Attributes: &triggerFilterAttributes, + } + } +} + // WithBroker returns an option that adds a Broker for the given Trigger. func WithBroker(brokerName string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { diff --git a/test/base/resources/kube.go b/test/base/resources/kube.go index 24ba3c31929..d60053809a9 100644 --- a/test/base/resources/kube.go +++ b/test/base/resources/kube.go @@ -19,6 +19,8 @@ package resources // This file contains functions that construct common Kubernetes resources. import ( + "encoding/json" + "fmt" "strconv" corev1 "k8s.io/api/core/v1" @@ -30,11 +32,17 @@ import ( ) // EventSenderPod creates a Pod that sends a single event to the given address. -func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod { +func EventSenderPod(name string, sink string, event *CloudEvent) (*corev1.Pod, error) { const imageName = "sendevents" if event.Encoding == "" { event.Encoding = CloudEventEncodingBinary } + eventExtensionsBytes, error := json.Marshal(event.Extensions) + eventExtensions := string(eventExtensionsBytes) + if error != nil { + return nil, fmt.Errorf("encountered error when we marshall cloud event extensions %v", error) + } + return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -51,6 +59,8 @@ func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod { event.Type, "-event-source", event.Source, + "-event-extensions", + eventExtensions, "-event-data", event.Data, "-event-encoding", @@ -62,7 +72,7 @@ func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod { //TODO restart on failure? RestartPolicy: corev1.RestartPolicyNever, }, - } + }, nil } // EventLoggerPod creates a Pod that logs events received. diff --git a/test/common/operation.go b/test/common/operation.go index 22ff6d46f54..06f74f4cb24 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -71,7 +71,10 @@ func (client *Client) sendFakeEventToAddress( event *resources.CloudEvent, ) error { namespace := client.Namespace - pod := resources.EventSenderPod(senderName, uri, event) + pod, err := resources.EventSenderPod(senderName, uri, event) + if err != nil { + return err + } client.CreatePodOrFail(pod) if err := pkgTest.WaitForPodRunning(client.Kube, senderName, namespace); err != nil { return err diff --git a/test/common/validation.go b/test/common/validation.go index f375c86fc4d..ad41cc7591c 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -17,10 +17,13 @@ limitations under the License. package common import ( + "encoding/json" + "regexp" "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" ) @@ -92,14 +95,40 @@ func (client *Client) FindAnyLogContents(podName string, contents []string) (boo if err != nil { return false, err } + eventContentsSet, err := parseEventContentsFromPodLogs(string(logs)) + if err != nil { + return false, err + } for _, content := range contents { - if strings.Contains(string(logs), content) { + if eventContentsSet.Has(content) { return true, nil } } return false, nil } +// parseEventContentsFromPodLogs extracts the contents of events from a Pod logs +// Example log entry: 2019/08/21 22:46:38 {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} +// Use regex to get the event content with json format: {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} +// Get the eventContent with key "msg" +// Returns a set with all unique event contents +func parseEventContentsFromPodLogs(logs string) (sets.String, error) { + re := regexp.MustCompile(`{.+}`) + matches := re.FindAllString(logs, -1) + eventContentsSet := sets.String{} + for _, match := range matches { + var matchedLogs map[string]string + err := json.Unmarshal([]byte(match), &matchedLogs) + if err != nil { + return nil, err + } else { + eventContent := matchedLogs["msg"] + eventContentsSet.Insert(eventContent) + } + } + return eventContentsSet, nil +} + // getContainerName gets name of the first container of the given pod. // Now our logger pod only contains one single container, and is only used for receiving events and validation. func (client *Client) getContainerName(podName, namespace string) (string, error) { diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index db6002dc789..d8eccc13b83 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -20,16 +20,16 @@ package e2e import ( "fmt" + "sort" "strings" "testing" "time" + "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/pkg/apis/eventing/v1alpha1" pkgResources "knative.dev/eventing/pkg/reconciler/namespace/resources" "knative.dev/eventing/test/base/resources" "knative.dev/eventing/test/common" - - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/pkg/test/logging" ) @@ -43,19 +43,25 @@ const ( eventType2 = "type2" eventSource1 = "source1" eventSource2 = "source2" + // Be careful with the length of extension name and values, + // we use extension name and value as a part of the name of resources like subscriber and trigger, the maximum characters allowed of resource name is 63 + extensionName1 = "extname1" + extensionValue1 = "extval1" + extensionName2 = "extname2" + extensionValue2 = "extvalue2" ) -// eventTypeAndSource specifies the type and source of an Event. -type eventTypeAndSource struct { - Type string - Source string +type eventContext struct { + Type string + Source string + Extensions map[string]interface{} } // Helper struct to tie the type and sources of the events we expect to receive // in subscribers with the selectors we use when creating their pods. type eventReceiver struct { - typeAndSource eventTypeAndSource - selector map[string]string + context eventContext + selector map[string]string } // This test annotates the testing namespace so that a default broker is created. @@ -63,107 +69,172 @@ type eventReceiver struct { // and sends different events to the broker's address. Finally, it verifies that only // the appropriate events are routed to the subscribers. func TestDefaultBrokerWithManyTriggers(t *testing.T) { - client := setup(t, true) - defer tearDown(client) - - // Label namespace so that it creates the default broker. - if err := client.LabelNamespace(map[string]string{"knative-eventing-injection": "enabled"}); err != nil { - t.Fatalf("Error annotating namespace: %v", err) + tests := []struct { + name string + eventsToReceive []eventReceiver // These are the event context attributes and extension attributes that triggers will listen to, + // to set in the subscriber and services pod + eventsToSend []eventContext // These are the event context attributes and extension attributes that will be send. + deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not + }{ + { + name: "test default broker with many deprecated triggers", + eventsToReceive: []eventReceiver{ + {eventContext{Type: any, Source: any}, newSelector()}, + {eventContext{Type: eventType1, Source: any}, newSelector()}, + {eventContext{Type: any, Source: eventSource1}, newSelector()}, + {eventContext{Type: eventType1, Source: eventSource1}, newSelector()}, + }, + eventsToSend: []eventContext{ + {Type: eventType1, Source: eventSource1}, + {Type: eventType1, Source: eventSource2}, + {Type: eventType2, Source: eventSource1}, + {Type: eventType2, Source: eventSource2}, + }, + deprecatedTriggerFilter: true, + }, { + name: "test default broker with many attribute triggers", + eventsToReceive: []eventReceiver{ + {eventContext{Type: any, Source: any}, newSelector()}, + {eventContext{Type: eventType1, Source: any}, newSelector()}, + {eventContext{Type: any, Source: eventSource1}, newSelector()}, + {eventContext{Type: eventType1, Source: eventSource1}, newSelector()}, + }, + eventsToSend: []eventContext{ + {Type: eventType1, Source: eventSource1}, + {Type: eventType1, Source: eventSource2}, + {Type: eventType2, Source: eventSource1}, + {Type: eventType2, Source: eventSource2}, + }, + deprecatedTriggerFilter: false, + }, + { + name: "test default broker with many attribute and extension triggers", + eventsToReceive: []eventReceiver{ + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName2: extensionValue2}}, newSelector()}, + {eventContext{Type: eventType1, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: any}}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, newSelector()}, + }, + eventsToSend: []eventContext{ + {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, + {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, + {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName2: extensionValue2}}, + {Type: eventType1, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, + {Type: eventType2, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: "non.matching.ext.val"}}, + {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{"non.matching.ext.name": extensionValue1}}, + {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, + {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1, "non.matching.ext.name": extensionValue2}}, + }, + deprecatedTriggerFilter: false, + }, } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := setup(t, true) + defer tearDown(client) - // Wait for default broker ready. - if err := client.WaitForResourceReady(defaultBrokerName, common.BrokerTypeMeta); err != nil { - t.Fatalf("Error waiting for default broker to become ready: %v", err) - } + // Label namespace so that it creates the default broker. + if err := client.LabelNamespace(map[string]string{"knative-eventing-injection": "enabled"}); err != nil { + t.Fatalf("Error annotating namespace: %v", err) + } - // These are the event types and sources that triggers will listen to, as well as the selectors - // to set in the subscriber and services pods. - eventsToReceive := []eventReceiver{ - {eventTypeAndSource{Type: any, Source: any}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: any}, newSelector()}, - {eventTypeAndSource{Type: any, Source: eventSource1}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: eventSource1}, newSelector()}, - } + // Wait for default broker ready. + if err := client.WaitForResourceReady(defaultBrokerName, common.BrokerTypeMeta); err != nil { + t.Fatalf("Error waiting for default broker to become ready: %v", err) + } - // Create subscribers. - for _, event := range eventsToReceive { - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - pod := resources.EventLoggerPod(subscriberName) - client.CreatePodOrFail(pod, common.WithService(subscriberName)) - } + // Create subscribers. + for _, event := range test.eventsToReceive { + subscriberName := name("dumper", event.context.Type, event.context.Source, event.context.Extensions) + pod := resources.EventLoggerPod(subscriberName) + client.CreatePodOrFail(pod, common.WithService(subscriberName)) + } - // Create triggers. - for _, event := range eventsToReceive { - triggerName := name("trigger", event.typeAndSource.Type, event.typeAndSource.Source) - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - client.CreateTriggerOrFail(triggerName, - resources.WithSubscriberRefForTrigger(subscriberName), - resources.WithTriggerFilter(event.typeAndSource.Source, event.typeAndSource.Type), - ) - } + // Create triggers. + for _, event := range test.eventsToReceive { + triggerName := name("trigger", event.context.Type, event.context.Source, event.context.Extensions) + subscriberName := name("dumper", event.context.Type, event.context.Source, event.context.Extensions) + triggerOption := getTriggerFilterOption(test.deprecatedTriggerFilter, event.context) + client.CreateTriggerOrFail(triggerName, + resources.WithSubscriberRefForTrigger(subscriberName), + triggerOption, + ) + } - // Wait for all test resources to become ready before sending the events. - if err := client.WaitForAllTestResourcesReady(); err != nil { - t.Fatalf("Failed to get all test resources ready: %v", err) - } + // Wait for all test resources to become ready before sending the events. + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } - // These are the event types and sources that will be send. - eventsToSend := []eventTypeAndSource{ - {eventType1, eventSource1}, - {eventType1, eventSource2}, - {eventType2, eventSource1}, - {eventType2, eventSource2}, - } - // Map to save the expected events per dumper so that we can verify the delivery. - expectedEvents := make(map[string][]string) - // Map to save the unexpected events per dumper so that we can verify that they weren't delivered. - unexpectedEvents := make(map[string][]string) - for _, eventToSend := range eventsToSend { - // Create cloud event. - // Using event type and source as part of the body for easier debugging. - body := fmt.Sprintf("Body-%s-%s", eventToSend.Type, eventToSend.Source) - cloudEvent := &resources.CloudEvent{ - Source: eventToSend.Source, - Type: eventToSend.Type, - Data: fmt.Sprintf(`{"msg":%q}`, body), - } - // Create sender pod. - senderPodName := name("sender", eventToSend.Type, eventToSend.Source) - if err := client.SendFakeEventToAddressable(senderPodName, defaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { - t.Fatalf("Error send cloud event to broker: %v", err) - } + // Map to save the expected events per dumper so that we can verify the delivery. + expectedEvents := make(map[string][]string) + // Map to save the unexpected events per dumper so that we can verify that they weren't delivered. + unexpectedEvents := make(map[string][]string) + for _, eventToSend := range test.eventsToSend { + // Create cloud event. + // Using event type, source and extensions as part of the body for easier debugging. + extensionsStr := joinSortedExtensions(eventToSend.Extensions) + body := fmt.Sprintf(("Body-%s-%s-%s"), eventToSend.Type, eventToSend.Source, extensionsStr) + cloudEvent := makeCloudEvent(eventToSend, body) + // Create sender pod. + senderPodName := name("sender", eventToSend.Type, eventToSend.Source, eventToSend.Extensions) + if err := client.SendFakeEventToAddressable(senderPodName, defaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { + t.Fatalf("Error send cloud event to broker: %v", err) + } - // Check on every dumper whether we should expect this event or not, and add its body - // to the expectedEvents/unexpectedEvents maps. - for _, eventToReceive := range eventsToReceive { - subscriberName := name("dumper", eventToReceive.typeAndSource.Type, eventToReceive.typeAndSource.Source) - if shouldExpectEvent(&eventToSend, &eventToReceive, t.Logf) { - expectedEvents[subscriberName] = append(expectedEvents[subscriberName], body) - } else { - unexpectedEvents[subscriberName] = append(unexpectedEvents[subscriberName], body) + // Check on every dumper whether we should expect this event or not, and add its body + // to the expectedEvents/unexpectedEvents maps. + for _, eventToReceive := range test.eventsToReceive { + subscriberName := name("dumper", eventToReceive.context.Type, eventToReceive.context.Source, eventToReceive.context.Extensions) + if shouldExpectEvent(&eventToSend, &eventToReceive, t.Logf) { + expectedEvents[subscriberName] = append(expectedEvents[subscriberName], body) + } else { + unexpectedEvents[subscriberName] = append(unexpectedEvents[subscriberName], body) + } + } } - } + + for _, event := range test.eventsToReceive { + subscriberName := name("dumper", event.context.Type, event.context.Source, event.context.Extensions) + if err := client.CheckLog(subscriberName, common.CheckerContainsAll(expectedEvents[subscriberName])); err != nil { + t.Fatalf("Event(s) not found in logs of subscriber pod %q: %v", subscriberName, err) + } + // At this point all the events should have been received in the pod. + // We check whether we find unexpected events. If so, then we fail. + found, err := client.FindAnyLogContents(subscriberName, unexpectedEvents[subscriberName]) + if err != nil { + t.Fatalf("Failed querying to find log contents in pod %q: %v", subscriberName, err) + } + if found { + t.Fatalf("Unexpected event(s) found in logs of subscriber pod %q", subscriberName) + } + } + }) } - for _, event := range eventsToReceive { - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - if err := client.CheckLog(subscriberName, common.CheckerContainsAll(expectedEvents[subscriberName])); err != nil { - t.Fatalf("Event(s) not found in logs of subscriber pod %q: %v", subscriberName, err) - } - // At this point all the events should have been received in the pod. - // We check whether we find unexpected events. If so, then we fail. - found, err := client.FindAnyLogContents(subscriberName, unexpectedEvents[subscriberName]) - if err != nil { - t.Fatalf("Failed querying to find log contents in pod %q: %v", subscriberName, err) - } - if found { - t.Fatalf("Unexpected event(s) found in logs of subscriber pod %q", subscriberName) - } +} + +func makeCloudEvent(eventToSend eventContext, body string) *resources.CloudEvent { + return &resources.CloudEvent{ + Source: eventToSend.Source, + Type: eventToSend.Type, + Extensions: eventToSend.Extensions, + Data: fmt.Sprintf(`{"msg":%q}`, body)} +} + +func getTriggerFilterOption(deprecatedTriggerFilter bool, context eventContext) resources.TriggerOption { + if deprecatedTriggerFilter { + return resources.WithDeprecatedSourceAndTypeTriggerFilter(context.Source, context.Type) + } else { + return resources.WithAttributesTriggerFilter(context.Source, context.Type, context.Extensions) } } // Helper function to create names for different objects (e.g., triggers, services, etc.). -func name(obj, eventType, eventSource string) string { +func name(obj, eventType, eventSource string, extensions map[string]interface{}) string { // Pod names need to be lowercase. We might have an eventType as Any, that is why we lowercase them. if eventType == "" { eventType = "testany" @@ -171,7 +242,36 @@ func name(obj, eventType, eventSource string) string { if eventSource == "" { eventSource = "testany" } - return strings.ToLower(fmt.Sprintf("%s-%s-%s", obj, eventType, eventSource)) + name := strings.ToLower(fmt.Sprintf("%s-%s-%s", obj, eventType, eventSource)) + if len(extensions) > 0 { + name = strings.ToLower(fmt.Sprintf("%s-%s", name, joinSortedExtensions(extensions))) + } + return name +} + +func joinSortedExtensions(extensions map[string]interface{}) string { + var sb strings.Builder + sortedExtensionNames := sortedKeys(extensions) + for _, sortedExtensionName := range sortedExtensionNames { + sb.WriteString("-") + sb.WriteString(sortedExtensionName) + sb.WriteString("-") + vStr := fmt.Sprintf("%v", extensions[sortedExtensionName]) + if vStr == "" { + vStr = "testany" + } + sb.WriteString(vStr) + } + return sb.String() +} + +func sortedKeys(m map[string]interface{}) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys } // Returns a new selector with a random uuid. @@ -180,12 +280,24 @@ func newSelector() map[string]string { } // Checks whether we should expect to receive 'eventToSend' in 'eventReceiver' based on its type and source pattern. -func shouldExpectEvent(eventToSend *eventTypeAndSource, receiver *eventReceiver, logf logging.FormatLogger) bool { - if receiver.typeAndSource.Type != any && receiver.typeAndSource.Type != eventToSend.Type { +func shouldExpectEvent(eventToSend *eventContext, receiver *eventReceiver, logf logging.FormatLogger) bool { + if receiver.context.Type != any && receiver.context.Type != eventToSend.Type { return false } - if receiver.typeAndSource.Source != any && receiver.typeAndSource.Source != eventToSend.Source { + if receiver.context.Source != any && receiver.context.Source != eventToSend.Source { return false } + for k, v := range receiver.context.Extensions { + var value interface{} + value, ok := eventToSend.Extensions[k] + // If the attribute does not exist in the event, return false. + if !ok { + return false + } + // If the attribute is not set to any and is different than the one from the event, return false. + if v != any && v != value { + return false + } + } return true } diff --git a/test/e2e/helpers/broker_channel_flow_test_helper.go b/test/e2e/helpers/broker_channel_flow_test_helper.go index e59980f5697..0e7c0fbf7cb 100644 --- a/test/e2e/helpers/broker_channel_flow_test_helper.go +++ b/test/e2e/helpers/broker_channel_flow_test_helper.go @@ -79,7 +79,7 @@ func BrokerChannelFlowTestHelper(t *testing.T, channelTestRunner common.ChannelT client.CreateTriggerOrFail( triggerName1, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource1, eventType1), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource1, eventType1), resources.WithSubscriberRefForTrigger(transformationPodName), ) @@ -91,7 +91,7 @@ func BrokerChannelFlowTestHelper(t *testing.T, channelTestRunner common.ChannelT client.CreateTriggerOrFail( triggerName2, resources.WithBroker(brokerName), - resources.WithTriggerFilter(any, any), + resources.WithDeprecatedSourceAndTypeTriggerFilter(any, any), resources.WithSubscriberRefForTrigger(loggerPodName1), ) @@ -107,7 +107,7 @@ func BrokerChannelFlowTestHelper(t *testing.T, channelTestRunner common.ChannelT client.CreateTriggerOrFail( triggerName3, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource2, eventType2), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource2, eventType2), resources.WithSubscriberURIForTrigger(channelURL), ) diff --git a/test/e2e/helpers/broker_event_transformation_test_helper.go b/test/e2e/helpers/broker_event_transformation_test_helper.go index 2ce1e81279a..95d21fa0598 100644 --- a/test/e2e/helpers/broker_event_transformation_test_helper.go +++ b/test/e2e/helpers/broker_event_transformation_test_helper.go @@ -74,7 +74,7 @@ func EventTransformationForTriggerTestHelper(t *testing.T, channelTestRunner com client.CreateTriggerOrFail( triggerName1, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource1, eventType1), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource1, eventType1), resources.WithSubscriberRefForTrigger(transformationPodName), ) @@ -86,7 +86,7 @@ func EventTransformationForTriggerTestHelper(t *testing.T, channelTestRunner com client.CreateTriggerOrFail( triggerName2, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource2, eventType2), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource2, eventType2), resources.WithSubscriberRefForTrigger(loggerPodName), ) diff --git a/test/test_images/sendevents/main.go b/test/test_images/sendevents/main.go index 65bf6b570c5..7812013c48b 100644 --- a/test/test_images/sendevents/main.go +++ b/test/test_images/sendevents/main.go @@ -31,15 +31,16 @@ import ( ) var ( - sink string - eventID string - eventType string - eventSource string - eventData string - eventEncoding string - periodStr string - delayStr string - maxMsgStr string + sink string + eventID string + eventType string + eventSource string + eventExtensions string + eventData string + eventEncoding string + periodStr string + delayStr string + maxMsgStr string ) func init() { @@ -47,6 +48,7 @@ func init() { flag.StringVar(&eventID, "event-id", "", "Event ID to use. Defaults to a generated UUID") flag.StringVar(&eventType, "event-type", "knative.eventing.test.e2e", "The Event Type to use.") flag.StringVar(&eventSource, "event-source", "", "Source URI to use. Defaults to the current machine's hostname") + flag.StringVar(&eventExtensions, "event-extensions", "", "The extensions of event with json format.") flag.StringVar(&eventData, "event-data", `{"hello": "world!"}`, "Cloudevent data body.") flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event, one of(binary, structured).") flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") @@ -138,6 +140,15 @@ func main() { } event.SetType(eventType) event.SetSource(eventSource) + + var extensions map[string]interface{} + if err := json.Unmarshal([]byte(eventExtensions), &extensions); err != nil { + log.Fatalf("Encountered error when unmarshalling cloud event extensions to map[string]interface{}: %v", err) + } + for k, v := range extensions { + event.SetExtension(k, v) + } + if err := event.SetData(untyped); err != nil { log.Fatalf("failed to set data, %v", err) }