From ae3bffbcb04de2bf354fdf0faf54554a57a400d7 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Thu, 15 Aug 2019 14:44:52 -0700 Subject: [PATCH 01/10] refactor broker_default_test.go to table driven structure --- test/e2e/broker_default_test.go | 195 +++++++++++++++++--------------- 1 file changed, 103 insertions(+), 92 deletions(-) diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index db6002dc789..c2c68ade1f5 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -20,16 +20,15 @@ package e2e import ( "fmt" + "knative.dev/eventing/test/base/resources" + "knative.dev/eventing/test/common" "strings" "testing" "time" + "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/pkg/apis/eventing/v1alpha1" pkgResources "knative.dev/eventing/pkg/reconciler/namespace/resources" - "knative.dev/eventing/test/base/resources" - "knative.dev/eventing/test/common" - - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/pkg/test/logging" ) @@ -63,103 +62,115 @@ type eventReceiver struct { // and sends different events to the broker's address. Finally, it verifies that only // the appropriate events are routed to the subscribers. func TestDefaultBrokerWithManyTriggers(t *testing.T) { - client := setup(t, true) - defer tearDown(client) - - // Label namespace so that it creates the default broker. - if err := client.LabelNamespace(map[string]string{"knative-eventing-injection": "enabled"}); err != nil { - t.Fatalf("Error annotating namespace: %v", err) - } + tests := []struct { + name string + eventsToReceive []eventReceiver // These are the event types and sources that triggers will listen to, as well as the selectors + // to set in the subscriber and services pods. + eventsToSend []eventTypeAndSource // These are the event types and sources that will be send. + deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not + extension map[string]interface{} //optional event extension + }{{ + name: "test default broker with many deprecated source and type triggers", + eventsToReceive: []eventReceiver{ + {eventTypeAndSource{Type: any, Source: any}, newSelector()}, + {eventTypeAndSource{Type: eventType1, Source: any}, newSelector()}, + {eventTypeAndSource{Type: any, Source: eventSource1}, newSelector()}, + {eventTypeAndSource{Type: eventType1, Source: eventSource1}, newSelector()}, + }, + eventsToSend: []eventTypeAndSource{ + {eventType1, eventSource1}, + {eventType1, eventSource2}, + {eventType2, eventSource1}, + {eventType2, eventSource2}, + }, + deprecatedTriggerFilter: true, + },} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := setup(t, true) + defer tearDown(client) + + // Label namespace so that it creates the default broker. + if err := client.LabelNamespace(map[string]string{"knative-eventing-injection": "enabled"}); err != nil { + t.Fatalf("Error annotating namespace: %v", err) + } - // Wait for default broker ready. - if err := client.WaitForResourceReady(defaultBrokerName, common.BrokerTypeMeta); err != nil { - t.Fatalf("Error waiting for default broker to become ready: %v", err) - } + // Wait for default broker ready. + if err := client.WaitForResourceReady(defaultBrokerName, common.BrokerTypeMeta); err != nil { + t.Fatalf("Error waiting for default broker to become ready: %v", err) + } - // These are the event types and sources that triggers will listen to, as well as the selectors - // to set in the subscriber and services pods. - eventsToReceive := []eventReceiver{ - {eventTypeAndSource{Type: any, Source: any}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: any}, newSelector()}, - {eventTypeAndSource{Type: any, Source: eventSource1}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: eventSource1}, newSelector()}, - } + // Create subscribers. + for _, event := range test.eventsToReceive { + subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) + pod := resources.EventLoggerPod(subscriberName) + client.CreatePodOrFail(pod, common.WithService(subscriberName)) + } - // Create subscribers. - for _, event := range eventsToReceive { - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - pod := resources.EventLoggerPod(subscriberName) - client.CreatePodOrFail(pod, common.WithService(subscriberName)) - } + // Create triggers. + for _, event := range test.eventsToReceive { + triggerName := name("trigger", event.typeAndSource.Type, event.typeAndSource.Source) + subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) + client.CreateTriggerOrFail(triggerName, + resources.WithSubscriberRefForTrigger(subscriberName), + resources.WithTriggerFilter(event.typeAndSource.Source, event.typeAndSource.Type), + ) + } - // Create triggers. - for _, event := range eventsToReceive { - triggerName := name("trigger", event.typeAndSource.Type, event.typeAndSource.Source) - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - client.CreateTriggerOrFail(triggerName, - resources.WithSubscriberRefForTrigger(subscriberName), - resources.WithTriggerFilter(event.typeAndSource.Source, event.typeAndSource.Type), - ) - } + // Wait for all test resources to become ready before sending the events. + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } - // Wait for all test resources to become ready before sending the events. - if err := client.WaitForAllTestResourcesReady(); err != nil { - t.Fatalf("Failed to get all test resources ready: %v", err) - } + // Map to save the expected events per dumper so that we can verify the delivery. + expectedEvents := make(map[string][]string) + // Map to save the unexpected events per dumper so that we can verify that they weren't delivered. + unexpectedEvents := make(map[string][]string) + for _, eventToSend := range test.eventsToSend { + // Create cloud event. + // Using event type and source as part of the body for easier debugging. + body := fmt.Sprintf("Body-%s-%s", eventToSend.Type, eventToSend.Source) + cloudEvent := &resources.CloudEvent{ + Source: eventToSend.Source, + Type: eventToSend.Type, + Data: fmt.Sprintf(`{"msg":%q}`, body), + } + // Create sender pod. + senderPodName := name("sender", eventToSend.Type, eventToSend.Source) + if err := client.SendFakeEventToAddressable(senderPodName, defaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { + t.Fatalf("Error send cloud event to broker: %v", err) + } + + // Check on every dumper whether we should expect this event or not, and add its body + // to the expectedEvents/unexpectedEvents maps. + for _, eventToReceive := range test.eventsToReceive { + subscriberName := name("dumper", eventToReceive.typeAndSource.Type, eventToReceive.typeAndSource.Source) + if shouldExpectEvent(&eventToSend, &eventToReceive, t.Logf) { + expectedEvents[subscriberName] = append(expectedEvents[subscriberName], body) + } else { + unexpectedEvents[subscriberName] = append(unexpectedEvents[subscriberName], body) + } + } + } - // These are the event types and sources that will be send. - eventsToSend := []eventTypeAndSource{ - {eventType1, eventSource1}, - {eventType1, eventSource2}, - {eventType2, eventSource1}, - {eventType2, eventSource2}, - } - // Map to save the expected events per dumper so that we can verify the delivery. - expectedEvents := make(map[string][]string) - // Map to save the unexpected events per dumper so that we can verify that they weren't delivered. - unexpectedEvents := make(map[string][]string) - for _, eventToSend := range eventsToSend { - // Create cloud event. - // Using event type and source as part of the body for easier debugging. - body := fmt.Sprintf("Body-%s-%s", eventToSend.Type, eventToSend.Source) - cloudEvent := &resources.CloudEvent{ - Source: eventToSend.Source, - Type: eventToSend.Type, - Data: fmt.Sprintf(`{"msg":%q}`, body), - } - // Create sender pod. - senderPodName := name("sender", eventToSend.Type, eventToSend.Source) - if err := client.SendFakeEventToAddressable(senderPodName, defaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { - t.Fatalf("Error send cloud event to broker: %v", err) - } - - // Check on every dumper whether we should expect this event or not, and add its body - // to the expectedEvents/unexpectedEvents maps. - for _, eventToReceive := range eventsToReceive { - subscriberName := name("dumper", eventToReceive.typeAndSource.Type, eventToReceive.typeAndSource.Source) - if shouldExpectEvent(&eventToSend, &eventToReceive, t.Logf) { - expectedEvents[subscriberName] = append(expectedEvents[subscriberName], body) - } else { - unexpectedEvents[subscriberName] = append(unexpectedEvents[subscriberName], body) + for _, event := range test.eventsToReceive { + subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) + if err := client.CheckLog(subscriberName, common.CheckerContainsAll(expectedEvents[subscriberName])); err != nil { + t.Fatalf("Event(s) not found in logs of subscriber pod %q: %v", subscriberName, err) + } + // At this point all the events should have been received in the pod. + // We check whether we find unexpected events. If so, then we fail. + found, err := client.FindAnyLogContents(subscriberName, unexpectedEvents[subscriberName]) + if err != nil { + t.Fatalf("Failed querying to find log contents in pod %q: %v", subscriberName, err) + } + if found { + t.Fatalf("Unexpected event(s) found in logs of subscriber pod %q", subscriberName) + } } - } + }) } - for _, event := range eventsToReceive { - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - if err := client.CheckLog(subscriberName, common.CheckerContainsAll(expectedEvents[subscriberName])); err != nil { - t.Fatalf("Event(s) not found in logs of subscriber pod %q: %v", subscriberName, err) - } - // At this point all the events should have been received in the pod. - // We check whether we find unexpected events. If so, then we fail. - found, err := client.FindAnyLogContents(subscriberName, unexpectedEvents[subscriberName]) - if err != nil { - t.Fatalf("Failed querying to find log contents in pod %q: %v", subscriberName, err) - } - if found { - t.Fatalf("Unexpected event(s) found in logs of subscriber pod %q", subscriberName) - } - } } // Helper function to create names for different objects (e.g., triggers, services, etc.). From 68daf9f27454e26f46138cc5de7bdcc6c8e662fb Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Thu, 15 Aug 2019 16:39:08 -0700 Subject: [PATCH 02/10] added end to end test for many triggers with attributes --- test/base/resources/eventing.go | 30 ++++++++++- test/e2e/broker_channel_flow_test.go | 6 +-- test/e2e/broker_default_test.go | 52 ++++++++++++++++++-- test/e2e/broker_event_transformation_test.go | 4 +- 4 files changed, 80 insertions(+), 12 deletions(-) diff --git a/test/base/resources/eventing.go b/test/base/resources/eventing.go index db2b4cdc053..355c65a2956 100644 --- a/test/base/resources/eventing.go +++ b/test/base/resources/eventing.go @@ -105,8 +105,8 @@ func Broker(name string, options ...BrokerOption) *eventingv1alpha1.Broker { return broker } -// WithTriggerFilter returns an option that adds a TriggerFilter for the given Trigger. -func WithTriggerFilter(eventSource, eventType string) TriggerOption { +// WithDeprecatedSourceAndTypeTriggerFilter returns an option that adds a TriggerFilter with DeprecatedSourceAndType for the given Trigger. +func WithDeprecatedSourceAndTypeTriggerFilter(eventSource, eventType string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { triggerFilter := &eventingv1alpha1.TriggerFilter{ DeprecatedSourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{ @@ -118,6 +118,32 @@ func WithTriggerFilter(eventSource, eventType string) TriggerOption { } } +// WithAttributesTriggerFilter returns an option that adds a TriggerFilter with Attributes for the given Trigger. +func WithAttributesTriggerFilter(eventSource, eventType string) TriggerOption { + return func(t *eventingv1alpha1.Trigger) { + triggerFilter := &eventingv1alpha1.TriggerFilter{ + Attributes: &eventingv1alpha1.TriggerFilterAttributes{ + "type": eventType, + "source": eventSource, + }, + } + t.Spec.Filter = triggerFilter + } +} +// WithAttributesAndExtensionTriggerFilter returns an option that adds a TriggerFilter with Attributes AndExtension. +func WithAttributesAndExtensionTriggerFilter(eventSource, eventType, extensionName, extensionValue string) TriggerOption { + return func(t *eventingv1alpha1.Trigger) { + triggerFilter := &eventingv1alpha1.TriggerFilter{ + Attributes: &eventingv1alpha1.TriggerFilterAttributes{ + "type": eventType, + "source": eventSource, + extensionName:extensionValue, + }, + } + t.Spec.Filter = triggerFilter + } +} + // WithBroker returns an option that adds a Broker for the given Trigger. func WithBroker(brokerName string) TriggerOption { return func(t *eventingv1alpha1.Trigger) { diff --git a/test/e2e/broker_channel_flow_test.go b/test/e2e/broker_channel_flow_test.go index 91908161ef9..3679a5782c1 100644 --- a/test/e2e/broker_channel_flow_test.go +++ b/test/e2e/broker_channel_flow_test.go @@ -104,7 +104,7 @@ func BrokerChannelFlowTestHelper(t *testing.T, channels []string) { client.CreateTriggerOrFail( triggerName1, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource1, eventType1), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource1, eventType1), resources.WithSubscriberRefForTrigger(transformationPodName), ) @@ -116,7 +116,7 @@ func BrokerChannelFlowTestHelper(t *testing.T, channels []string) { client.CreateTriggerOrFail( triggerName2, resources.WithBroker(brokerName), - resources.WithTriggerFilter(any, any), + resources.WithDeprecatedSourceAndTypeTriggerFilter(any, any), resources.WithSubscriberRefForTrigger(loggerPodName1), ) @@ -132,7 +132,7 @@ func BrokerChannelFlowTestHelper(t *testing.T, channels []string) { client.CreateTriggerOrFail( triggerName3, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource2, eventType2), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource2, eventType2), resources.WithSubscriberURIForTrigger(channelURL), ) diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index c2c68ade1f5..fcf578ce341 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -44,12 +44,22 @@ const ( eventSource2 = "source2" ) +var ( + emptyEventExtension = eventExtension{} +) + // eventTypeAndSource specifies the type and source of an Event. type eventTypeAndSource struct { Type string Source string } +// eventExtension specifies the name and value of an Event extension. +type eventExtension struct { + name string + value string +} + // Helper struct to tie the type and sources of the events we expect to receive // in subscribers with the selectors we use when creating their pods. type eventReceiver struct { @@ -66,9 +76,9 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { name string eventsToReceive []eventReceiver // These are the event types and sources that triggers will listen to, as well as the selectors // to set in the subscriber and services pods. - eventsToSend []eventTypeAndSource // These are the event types and sources that will be send. - deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not - extension map[string]interface{} //optional event extension + eventsToSend []eventTypeAndSource // These are the event types and sources that will be send. + deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not + eventExtension eventExtension //optional event extension }{{ name: "test default broker with many deprecated source and type triggers", eventsToReceive: []eventReceiver{ @@ -84,7 +94,25 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { {eventType2, eventSource2}, }, deprecatedTriggerFilter: true, - },} + eventExtension: emptyEventExtension, + }, { + name: "test default broker with many attribute triggers", + eventsToReceive: []eventReceiver{ + {eventTypeAndSource{Type: any, Source: any}, newSelector()}, + {eventTypeAndSource{Type: eventType1, Source: any}, newSelector()}, + {eventTypeAndSource{Type: any, Source: eventSource1}, newSelector()}, + {eventTypeAndSource{Type: eventType1, Source: eventSource1}, newSelector()}, + }, + eventsToSend: []eventTypeAndSource{ + {eventType1, eventSource1}, + {eventType1, eventSource2}, + {eventType2, eventSource1}, + {eventType2, eventSource2}, + }, + deprecatedTriggerFilter: false, + eventExtension: eventExtension{}, + }, + } for _, test := range tests { t.Run(test.name, func(t *testing.T) { client := setup(t, true) @@ -106,14 +134,16 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { pod := resources.EventLoggerPod(subscriberName) client.CreatePodOrFail(pod, common.WithService(subscriberName)) } + extensionExists := test.eventExtension == eventExtension{} // Create triggers. for _, event := range test.eventsToReceive { triggerName := name("trigger", event.typeAndSource.Type, event.typeAndSource.Source) subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) + triggerOption := getTriggerFilterOption(test.deprecatedTriggerFilter, extensionExists, event.typeAndSource, test.eventExtension) client.CreateTriggerOrFail(triggerName, resources.WithSubscriberRefForTrigger(subscriberName), - resources.WithTriggerFilter(event.typeAndSource.Source, event.typeAndSource.Type), + triggerOption, ) } @@ -173,6 +203,18 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } +func getTriggerFilterOption(deprecatedTriggerFilter, extensionExists bool, typeAndSource eventTypeAndSource, extension eventExtension) resources.TriggerOption { + if deprecatedTriggerFilter { + return resources.WithDeprecatedSourceAndTypeTriggerFilter(typeAndSource.Source, typeAndSource.Type) + } else { + if extensionExists { + return resources.WithAttributesAndExtensionTriggerFilter(typeAndSource.Source, typeAndSource.Type, extension.name, extension.value) + } else { + return resources.WithAttributesTriggerFilter(typeAndSource.Source, typeAndSource.Type) + } + } +} + // Helper function to create names for different objects (e.g., triggers, services, etc.). func name(obj, eventType, eventSource string) string { // Pod names need to be lowercase. We might have an eventType as Any, that is why we lowercase them. diff --git a/test/e2e/broker_event_transformation_test.go b/test/e2e/broker_event_transformation_test.go index bdae2dc634c..2d160cc0684 100644 --- a/test/e2e/broker_event_transformation_test.go +++ b/test/e2e/broker_event_transformation_test.go @@ -93,7 +93,7 @@ func EventTransformationForTriggerTestHelper(t *testing.T, channels []string) { client.CreateTriggerOrFail( triggerName1, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource1, eventType1), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource1, eventType1), resources.WithSubscriberRefForTrigger(transformationPodName), ) @@ -105,7 +105,7 @@ func EventTransformationForTriggerTestHelper(t *testing.T, channels []string) { client.CreateTriggerOrFail( triggerName2, resources.WithBroker(brokerName), - resources.WithTriggerFilter(eventSource2, eventType2), + resources.WithDeprecatedSourceAndTypeTriggerFilter(eventSource2, eventType2), resources.WithSubscriberRefForTrigger(loggerPodName), ) From f9d1780b73ac124081e8f0f657b728fd6ee22901 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 16 Aug 2019 10:22:34 -0700 Subject: [PATCH 03/10] added e2e test for trigger filter with extension in broker_default_test.go --- test/base/resources/cloud_event.go | 2 + test/base/resources/kube.go | 4 + test/e2e/broker_default_test.go | 166 ++++++++++++++++++---------- test/test_images/sendevents/main.go | 7 ++ 4 files changed, 120 insertions(+), 59 deletions(-) diff --git a/test/base/resources/cloud_event.go b/test/base/resources/cloud_event.go index d370be3be87..7f6d8b1990c 100644 --- a/test/base/resources/cloud_event.go +++ b/test/base/resources/cloud_event.go @@ -21,6 +21,8 @@ type CloudEvent struct { ID string Type string Source string + ExtensionName string + ExtensionValue string Data string // must be in json format Encoding string // binary or structured } diff --git a/test/base/resources/kube.go b/test/base/resources/kube.go index 24ba3c31929..8d5e15524e1 100644 --- a/test/base/resources/kube.go +++ b/test/base/resources/kube.go @@ -51,6 +51,10 @@ func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod { event.Type, "-event-source", event.Source, + "-event-extension-name", + event.ExtensionName, + "-event-extension-value", + event.ExtensionValue, "-event-data", event.Data, "-event-encoding", diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index fcf578ce341..70284014fd1 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -42,29 +42,35 @@ const ( eventType2 = "type2" eventSource1 = "source1" eventSource2 = "source2" -) - -var ( - emptyEventExtension = eventExtension{} + nilString = "nil" + extensionName = `my-extension` + extensionValue = `my-extension-value` ) // eventTypeAndSource specifies the type and source of an Event. -type eventTypeAndSource struct { - Type string - Source string -} +//type eventTypeAndSource struct { +// Type string +// Source string +//} // eventExtension specifies the name and value of an Event extension. -type eventExtension struct { - name string - value string +//type eventExtension struct { +// name string +// value string +//} + +type eventMeta struct { + Type string + Source string + ExtensionName string + ExtensionValue string } // Helper struct to tie the type and sources of the events we expect to receive // in subscribers with the selectors we use when creating their pods. type eventReceiver struct { - typeAndSource eventTypeAndSource - selector map[string]string + meta eventMeta + selector map[string]string } // This test annotates the testing namespace so that a default broker is created. @@ -76,42 +82,59 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { name string eventsToReceive []eventReceiver // These are the event types and sources that triggers will listen to, as well as the selectors // to set in the subscriber and services pods. - eventsToSend []eventTypeAndSource // These are the event types and sources that will be send. - deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not - eventExtension eventExtension //optional event extension + eventsToSend []eventMeta // These are the event types and sources that will be send. + deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not + extensionExists bool }{{ name: "test default broker with many deprecated source and type triggers", eventsToReceive: []eventReceiver{ - {eventTypeAndSource{Type: any, Source: any}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: any}, newSelector()}, - {eventTypeAndSource{Type: any, Source: eventSource1}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: eventSource1}, newSelector()}, + {eventMeta{Type: any, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, + {eventMeta{Type: eventType1, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, + {eventMeta{Type: any, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, + {eventMeta{Type: eventType1, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, }, - eventsToSend: []eventTypeAndSource{ - {eventType1, eventSource1}, - {eventType1, eventSource2}, - {eventType2, eventSource1}, - {eventType2, eventSource2}, + eventsToSend: []eventMeta{ + {eventType1, eventSource1, nilString, nilString}, + {eventType1, eventSource2, nilString, nilString}, + {eventType2, eventSource1, nilString, nilString}, + {eventType2, eventSource2, nilString, nilString}, }, deprecatedTriggerFilter: true, - eventExtension: emptyEventExtension, + extensionExists: false, }, { name: "test default broker with many attribute triggers", eventsToReceive: []eventReceiver{ - {eventTypeAndSource{Type: any, Source: any}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: any}, newSelector()}, - {eventTypeAndSource{Type: any, Source: eventSource1}, newSelector()}, - {eventTypeAndSource{Type: eventType1, Source: eventSource1}, newSelector()}, + {eventMeta{Type: any, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, + {eventMeta{Type: eventType1, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, + {eventMeta{Type: any, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, + {eventMeta{Type: eventType1, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, }, - eventsToSend: []eventTypeAndSource{ - {eventType1, eventSource1}, - {eventType1, eventSource2}, - {eventType2, eventSource1}, - {eventType2, eventSource2}, + eventsToSend: []eventMeta{ + {eventType1, eventSource1, nilString, nilString}, + {eventType1, eventSource2, nilString, nilString}, + {eventType2, eventSource1, nilString, nilString}, + {eventType2, eventSource2, nilString, nilString}, }, deprecatedTriggerFilter: false, - eventExtension: eventExtension{}, + extensionExists: false, }, + { + name: "test default broker with many attribute and extension triggers", + eventsToReceive: []eventReceiver{ + {eventMeta{Type: any, Source: any, ExtensionName: extensionName, ExtensionValue: extensionValue}, newSelector()}, + {eventMeta{Type: eventType1, Source: any, ExtensionName: extensionName, ExtensionValue: extensionValue}, newSelector()}, + {eventMeta{Type: any, Source: any, ExtensionName: extensionName, ExtensionValue: any}, newSelector()}, + {eventMeta{Type: any, Source: eventSource1, ExtensionName: extensionName, ExtensionValue: extensionValue}, newSelector()}, + }, + eventsToSend: []eventMeta{ + {eventType1, eventSource1, extensionName, extensionValue}, + {eventType1, eventSource2, extensionName, extensionValue}, + {eventType2, eventSource1, extensionName, "non matching extension value"}, + {eventType2, eventSource2, "non matching extension name", extensionValue}, + }, + deprecatedTriggerFilter: false, + extensionExists: true, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -130,17 +153,16 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { // Create subscribers. for _, event := range test.eventsToReceive { - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) + subscriberName := name("dumper", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) pod := resources.EventLoggerPod(subscriberName) client.CreatePodOrFail(pod, common.WithService(subscriberName)) } - extensionExists := test.eventExtension == eventExtension{} // Create triggers. for _, event := range test.eventsToReceive { - triggerName := name("trigger", event.typeAndSource.Type, event.typeAndSource.Source) - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) - triggerOption := getTriggerFilterOption(test.deprecatedTriggerFilter, extensionExists, event.typeAndSource, test.eventExtension) + triggerName := name("trigger", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) + subscriberName := name("dumper", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) + triggerOption := getTriggerFilterOption(test.deprecatedTriggerFilter, test.extensionExists, event.meta) client.CreateTriggerOrFail(triggerName, resources.WithSubscriberRefForTrigger(subscriberName), triggerOption, @@ -159,14 +181,11 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { for _, eventToSend := range test.eventsToSend { // Create cloud event. // Using event type and source as part of the body for easier debugging. - body := fmt.Sprintf("Body-%s-%s", eventToSend.Type, eventToSend.Source) - cloudEvent := &resources.CloudEvent{ - Source: eventToSend.Source, - Type: eventToSend.Type, - Data: fmt.Sprintf(`{"msg":%q}`, body), - } + body := fmt.Sprintf("Body:eventType[%s]-eventSource[%s]-eventExtensionName[%s]-eventExtensionValue[%s]", + eventToSend.Type, eventToSend.Source, eventToSend.ExtensionName, eventToSend.ExtensionValue) + cloudEvent := makeCloudEvent(eventToSend, body) // Create sender pod. - senderPodName := name("sender", eventToSend.Type, eventToSend.Source) + senderPodName := name("sender", eventToSend.Type, eventToSend.Source, eventToSend.ExtensionName, eventToSend.ExtensionValue) if err := client.SendFakeEventToAddressable(senderPodName, defaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { t.Fatalf("Error send cloud event to broker: %v", err) } @@ -174,7 +193,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { // Check on every dumper whether we should expect this event or not, and add its body // to the expectedEvents/unexpectedEvents maps. for _, eventToReceive := range test.eventsToReceive { - subscriberName := name("dumper", eventToReceive.typeAndSource.Type, eventToReceive.typeAndSource.Source) + subscriberName := name("dumper", eventToReceive.meta.Type, eventToReceive.meta.Source, eventToReceive.meta.ExtensionName, eventToReceive.meta.ExtensionValue) if shouldExpectEvent(&eventToSend, &eventToReceive, t.Logf) { expectedEvents[subscriberName] = append(expectedEvents[subscriberName], body) } else { @@ -184,7 +203,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } for _, event := range test.eventsToReceive { - subscriberName := name("dumper", event.typeAndSource.Type, event.typeAndSource.Source) + subscriberName := name("dumper", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) if err := client.CheckLog(subscriberName, common.CheckerContainsAll(expectedEvents[subscriberName])); err != nil { t.Fatalf("Event(s) not found in logs of subscriber pod %q: %v", subscriberName, err) } @@ -203,20 +222,31 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } -func getTriggerFilterOption(deprecatedTriggerFilter, extensionExists bool, typeAndSource eventTypeAndSource, extension eventExtension) resources.TriggerOption { +func makeCloudEvent(eventToSend eventMeta, body string) *resources.CloudEvent { + cloudEvent := &resources.CloudEvent{ + Source: eventToSend.Source, + Type: eventToSend.Type, + ExtensionName:eventToSend.ExtensionName, + ExtensionValue:eventToSend.ExtensionValue, + Data: fmt.Sprintf(`{"msg":%q}`, body), + } + return cloudEvent +} + +func getTriggerFilterOption(deprecatedTriggerFilter, extensionExists bool, eventMeta eventMeta) resources.TriggerOption { if deprecatedTriggerFilter { - return resources.WithDeprecatedSourceAndTypeTriggerFilter(typeAndSource.Source, typeAndSource.Type) + return resources.WithDeprecatedSourceAndTypeTriggerFilter(eventMeta.Source, eventMeta.Type) } else { if extensionExists { - return resources.WithAttributesAndExtensionTriggerFilter(typeAndSource.Source, typeAndSource.Type, extension.name, extension.value) + return resources.WithAttributesAndExtensionTriggerFilter(eventMeta.Source, eventMeta.Type, eventMeta.ExtensionValue, eventMeta.ExtensionValue) } else { - return resources.WithAttributesTriggerFilter(typeAndSource.Source, typeAndSource.Type) + return resources.WithAttributesTriggerFilter(eventMeta.Source, eventMeta.Type) } } } // Helper function to create names for different objects (e.g., triggers, services, etc.). -func name(obj, eventType, eventSource string) string { +func name(obj, eventType, eventSource, eventExtensionName, eventExtensionValue string) string { // Pod names need to be lowercase. We might have an eventType as Any, that is why we lowercase them. if eventType == "" { eventType = "testany" @@ -224,7 +254,22 @@ func name(obj, eventType, eventSource string) string { if eventSource == "" { eventSource = "testany" } - return strings.ToLower(fmt.Sprintf("%s-%s-%s", obj, eventType, eventSource)) + if eventExtensionName == nilString { + eventExtensionName = "not exists" + } + if eventExtensionValue == "" { + eventExtensionValue = "testany" + } + if eventExtensionValue == nilString { + eventExtensionValue = "not exists" + } + return strings.ToLower(fmt.Sprintf( + "obj[%s]-eventType[%s]-eventSource[%s]-eventExtensionName[%s]-eventExtensionValue[%s]", + obj, + eventType, + eventSource, + eventExtensionName, + eventExtensionValue)) } // Returns a new selector with a random uuid. @@ -233,11 +278,14 @@ func newSelector() map[string]string { } // Checks whether we should expect to receive 'eventToSend' in 'eventReceiver' based on its type and source pattern. -func shouldExpectEvent(eventToSend *eventTypeAndSource, receiver *eventReceiver, logf logging.FormatLogger) bool { - if receiver.typeAndSource.Type != any && receiver.typeAndSource.Type != eventToSend.Type { +func shouldExpectEvent(eventToSend *eventMeta, receiver *eventReceiver, logf logging.FormatLogger) bool { + if receiver.meta.Type != any && receiver.meta.Type != eventToSend.Type { + return false + } + if receiver.meta.Source != any && receiver.meta.Source != eventToSend.Source { return false } - if receiver.typeAndSource.Source != any && receiver.typeAndSource.Source != eventToSend.Source { + if receiver.meta.ExtensionName != nilString && receiver.meta.ExtensionValue != any && (receiver.meta.ExtensionName != eventToSend.ExtensionName || receiver.meta.ExtensionValue != eventToSend.ExtensionValue) { return false } return true diff --git a/test/test_images/sendevents/main.go b/test/test_images/sendevents/main.go index 65bf6b570c5..edeef43871b 100644 --- a/test/test_images/sendevents/main.go +++ b/test/test_images/sendevents/main.go @@ -35,6 +35,8 @@ var ( eventID string eventType string eventSource string + eventExtensionName string + eventExtensionValue string eventData string eventEncoding string periodStr string @@ -47,6 +49,8 @@ func init() { flag.StringVar(&eventID, "event-id", "", "Event ID to use. Defaults to a generated UUID") flag.StringVar(&eventType, "event-type", "knative.eventing.test.e2e", "The Event Type to use.") flag.StringVar(&eventSource, "event-source", "", "Source URI to use. Defaults to the current machine's hostname") + flag.StringVar(&eventExtensionName, "-event-extension-name", "", "The Event Type to use.") + flag.StringVar(&eventExtensionValue, "-event-extension-value", "", "Source URI to use. Defaults to the current machine's hostname") flag.StringVar(&eventData, "event-data", `{"hello": "world!"}`, "Cloudevent data body.") flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event, one of(binary, structured).") flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") @@ -138,6 +142,9 @@ func main() { } event.SetType(eventType) event.SetSource(eventSource) + if eventExtensionName != "" { + event.SetExtension(eventExtensionName, eventExtensionValue) + } if err := event.SetData(untyped); err != nil { log.Fatalf("failed to set data, %v", err) } From e46c3b0332bafa356a4b5f9b14cb627ccde23007 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 16 Aug 2019 12:13:16 -0700 Subject: [PATCH 04/10] fixed invalid resource name --- test/e2e/broker_default_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 70284014fd1..06e8a79d68d 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -43,8 +43,8 @@ const ( eventSource1 = "source1" eventSource2 = "source2" nilString = "nil" - extensionName = `my-extension` - extensionValue = `my-extension-value` + extensionName = `myextname` + extensionValue = `myextval` ) // eventTypeAndSource specifies the type and source of an Event. @@ -86,7 +86,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not extensionExists bool }{{ - name: "test default broker with many deprecated source and type triggers", + name: "test default broker with many deprecated triggers", eventsToReceive: []eventReceiver{ {eventMeta{Type: any, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, {eventMeta{Type: eventType1, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, @@ -238,7 +238,7 @@ func getTriggerFilterOption(deprecatedTriggerFilter, extensionExists bool, event return resources.WithDeprecatedSourceAndTypeTriggerFilter(eventMeta.Source, eventMeta.Type) } else { if extensionExists { - return resources.WithAttributesAndExtensionTriggerFilter(eventMeta.Source, eventMeta.Type, eventMeta.ExtensionValue, eventMeta.ExtensionValue) + return resources.WithAttributesAndExtensionTriggerFilter(eventMeta.Source, eventMeta.Type, eventMeta.ExtensionName, eventMeta.ExtensionValue) } else { return resources.WithAttributesTriggerFilter(eventMeta.Source, eventMeta.Type) } @@ -264,7 +264,7 @@ func name(obj, eventType, eventSource, eventExtensionName, eventExtensionValue s eventExtensionValue = "not exists" } return strings.ToLower(fmt.Sprintf( - "obj[%s]-eventType[%s]-eventSource[%s]-eventExtensionName[%s]-eventExtensionValue[%s]", + "%s-%s-%s-%s-%s", obj, eventType, eventSource, From 6f7252abfbe1c26939ba6d22d7ce1717bce5e710 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Mon, 19 Aug 2019 14:02:06 -0700 Subject: [PATCH 05/10] fixed e2e tests failure --- test/e2e/broker_default_test.go | 52 +++++++++++++++-------------- test/test_images/sendevents/main.go | 4 +-- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 06e8a79d68d..62910fc083b 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -42,23 +42,11 @@ const ( eventType2 = "type2" eventSource1 = "source1" eventSource2 = "source2" - nilString = "nil" + nilString = "nilstring" extensionName = `myextname` extensionValue = `myextval` ) -// eventTypeAndSource specifies the type and source of an Event. -//type eventTypeAndSource struct { -// Type string -// Source string -//} - -// eventExtension specifies the name and value of an Event extension. -//type eventExtension struct { -// name string -// value string -//} - type eventMeta struct { Type string Source string @@ -129,8 +117,8 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { eventsToSend: []eventMeta{ {eventType1, eventSource1, extensionName, extensionValue}, {eventType1, eventSource2, extensionName, extensionValue}, - {eventType2, eventSource1, extensionName, "non matching extension value"}, - {eventType2, eventSource2, "non matching extension name", extensionValue}, + {eventType2, eventSource1, extensionName, "non.matching.ext.val"}, + {eventType2, eventSource2, "non.matching.ext.name", extensionValue}, }, deprecatedTriggerFilter: false, extensionExists: true, @@ -223,14 +211,21 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } func makeCloudEvent(eventToSend eventMeta, body string) *resources.CloudEvent { - cloudEvent := &resources.CloudEvent{ - Source: eventToSend.Source, - Type: eventToSend.Type, - ExtensionName:eventToSend.ExtensionName, - ExtensionValue:eventToSend.ExtensionValue, - Data: fmt.Sprintf(`{"msg":%q}`, body), + if eventToSend.ExtensionName != nilString { + return &resources.CloudEvent{ + Source: eventToSend.Source, + Type: eventToSend.Type, + ExtensionName: eventToSend.ExtensionName, + ExtensionValue: eventToSend.ExtensionValue, + Data: fmt.Sprintf(`{"msg":%q}`, body), + } + } else { + return &resources.CloudEvent{ + Source: eventToSend.Source, + Type: eventToSend.Type, + Data: fmt.Sprintf(`{"msg":%q}`, body), + } } - return cloudEvent } func getTriggerFilterOption(deprecatedTriggerFilter, extensionExists bool, eventMeta eventMeta) resources.TriggerOption { @@ -255,13 +250,13 @@ func name(obj, eventType, eventSource, eventExtensionName, eventExtensionValue s eventSource = "testany" } if eventExtensionName == nilString { - eventExtensionName = "not exists" + eventExtensionName = "notexists" } if eventExtensionValue == "" { eventExtensionValue = "testany" } if eventExtensionValue == nilString { - eventExtensionValue = "not exists" + eventExtensionValue = "notexists" } return strings.ToLower(fmt.Sprintf( "%s-%s-%s-%s-%s", @@ -285,7 +280,14 @@ func shouldExpectEvent(eventToSend *eventMeta, receiver *eventReceiver, logf log if receiver.meta.Source != any && receiver.meta.Source != eventToSend.Source { return false } - if receiver.meta.ExtensionName != nilString && receiver.meta.ExtensionValue != any && (receiver.meta.ExtensionName != eventToSend.ExtensionName || receiver.meta.ExtensionValue != eventToSend.ExtensionValue) { + //event extension does not exists, return True + if receiver.meta.ExtensionName == nilString { + return true + } + if receiver.meta.ExtensionName != eventToSend.ExtensionName { + return false + } + if receiver.meta.ExtensionValue != any && receiver.meta.ExtensionValue != eventToSend.ExtensionValue { return false } return true diff --git a/test/test_images/sendevents/main.go b/test/test_images/sendevents/main.go index edeef43871b..02c51563397 100644 --- a/test/test_images/sendevents/main.go +++ b/test/test_images/sendevents/main.go @@ -49,8 +49,8 @@ func init() { flag.StringVar(&eventID, "event-id", "", "Event ID to use. Defaults to a generated UUID") flag.StringVar(&eventType, "event-type", "knative.eventing.test.e2e", "The Event Type to use.") flag.StringVar(&eventSource, "event-source", "", "Source URI to use. Defaults to the current machine's hostname") - flag.StringVar(&eventExtensionName, "-event-extension-name", "", "The Event Type to use.") - flag.StringVar(&eventExtensionValue, "-event-extension-value", "", "Source URI to use. Defaults to the current machine's hostname") + flag.StringVar(&eventExtensionName, "event-extension-name", "", "The extension name of event.") + flag.StringVar(&eventExtensionValue, "event-extension-value", "", "The extension value of event") flag.StringVar(&eventData, "event-data", `{"hello": "world!"}`, "Cloudevent data body.") flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event, one of(binary, structured).") flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") From ede45aa3c2790ce77cae173675edd5e7a16224ed Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Tue, 20 Aug 2019 18:42:09 -0700 Subject: [PATCH 06/10] added multiple extensions support, changed fuzzy log content check to extact log content check --- test/base/resources/cloud_event.go | 13 +- test/base/resources/eventing.go | 26 +-- test/base/resources/kube.go | 19 ++- test/common/operation.go | 5 +- test/common/validation.go | 26 ++- test/e2e/broker_default_test.go | 249 +++++++++++++++------------- test/test_images/sendevents/main.go | 35 ++-- 7 files changed, 211 insertions(+), 162 deletions(-) diff --git a/test/base/resources/cloud_event.go b/test/base/resources/cloud_event.go index 7f6d8b1990c..a14af298fbf 100644 --- a/test/base/resources/cloud_event.go +++ b/test/base/resources/cloud_event.go @@ -18,13 +18,12 @@ package resources // CloudEvent specifies the arguments for a CloudEvent used by the sendevents or transformevents image. type CloudEvent struct { - ID string - Type string - Source string - ExtensionName string - ExtensionValue string - Data string // must be in json format - Encoding string // binary or structured + ID string + Type string + Source string + Extensions map[string]interface{} + Data string // must be in json format + Encoding string // binary or structured } // CloudEventBaseData defines a simple struct that can be used as data of a CloudEvent. diff --git a/test/base/resources/eventing.go b/test/base/resources/eventing.go index 355c65a2956..143a55d01b8 100644 --- a/test/base/resources/eventing.go +++ b/test/base/resources/eventing.go @@ -19,6 +19,7 @@ package resources // This file contains functions that construct Eventing resources. import ( + "fmt" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" @@ -119,26 +120,17 @@ func WithDeprecatedSourceAndTypeTriggerFilter(eventSource, eventType string) Tri } // WithAttributesTriggerFilter returns an option that adds a TriggerFilter with Attributes for the given Trigger. -func WithAttributesTriggerFilter(eventSource, eventType string) TriggerOption { - return func(t *eventingv1alpha1.Trigger) { - triggerFilter := &eventingv1alpha1.TriggerFilter{ - Attributes: &eventingv1alpha1.TriggerFilterAttributes{ - "type": eventType, - "source": eventSource, - }, - } - t.Spec.Filter = triggerFilter +func WithAttributesTriggerFilter(eventSource, eventType string, extensions map[string]interface{}) TriggerOption { + attrs := make(map[string]string) + attrs["type"] = eventType + attrs["source"] = eventSource + for k, v := range extensions { + attrs[k] = fmt.Sprintf("%v", v) } -} -// WithAttributesAndExtensionTriggerFilter returns an option that adds a TriggerFilter with Attributes AndExtension. -func WithAttributesAndExtensionTriggerFilter(eventSource, eventType, extensionName, extensionValue string) TriggerOption { + triggerFilterAttributes := eventingv1alpha1.TriggerFilterAttributes(attrs) return func(t *eventingv1alpha1.Trigger) { triggerFilter := &eventingv1alpha1.TriggerFilter{ - Attributes: &eventingv1alpha1.TriggerFilterAttributes{ - "type": eventType, - "source": eventSource, - extensionName:extensionValue, - }, + Attributes: &triggerFilterAttributes, } t.Spec.Filter = triggerFilter } diff --git a/test/base/resources/kube.go b/test/base/resources/kube.go index 8d5e15524e1..7b30352e913 100644 --- a/test/base/resources/kube.go +++ b/test/base/resources/kube.go @@ -19,6 +19,8 @@ package resources // This file contains functions that construct common Kubernetes resources. import ( + "encoding/json" + "fmt" "strconv" corev1 "k8s.io/api/core/v1" @@ -30,11 +32,18 @@ import ( ) // EventSenderPod creates a Pod that sends a single event to the given address. -func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod { +func EventSenderPod(name string, sink string, event *CloudEvent) (*corev1.Pod, error) { const imageName = "sendevents" if event.Encoding == "" { event.Encoding = CloudEventEncodingBinary } + eventExtensionsBytes, error := json.Marshal(event.Extensions) + eventExtensions := string(eventExtensionsBytes) + if error != nil { + + return nil, fmt.Errorf("Encountered error when we marshall cloud event extensions %v", error) + } + return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -51,10 +60,8 @@ func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod { event.Type, "-event-source", event.Source, - "-event-extension-name", - event.ExtensionName, - "-event-extension-value", - event.ExtensionValue, + "-event-extensions", + eventExtensions, "-event-data", event.Data, "-event-encoding", @@ -66,7 +73,7 @@ func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod { //TODO restart on failure? RestartPolicy: corev1.RestartPolicyNever, }, - } + }, nil } // EventLoggerPod creates a Pod that logs events received. diff --git a/test/common/operation.go b/test/common/operation.go index 22ff6d46f54..06f74f4cb24 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -71,7 +71,10 @@ func (client *Client) sendFakeEventToAddress( event *resources.CloudEvent, ) error { namespace := client.Namespace - pod := resources.EventSenderPod(senderName, uri, event) + pod, err := resources.EventSenderPod(senderName, uri, event) + if err != nil { + return err + } client.CreatePodOrFail(pod) if err := pkgTest.WaitForPodRunning(client.Kube, senderName, namespace); err != nil { return err diff --git a/test/common/validation.go b/test/common/validation.go index f375c86fc4d..3c07aa81f96 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -17,6 +17,8 @@ limitations under the License. package common import ( + "encoding/json" + "regexp" "strings" "time" @@ -92,14 +94,36 @@ func (client *Client) FindAnyLogContents(podName string, contents []string) (boo if err != nil { return false, err } + eventContentsSet, err := parseEventContentsFromPodLogs(string(logs)) + if err != nil { + return false, err + } for _, content := range contents { - if strings.Contains(string(logs), content) { + _, ok := eventContentsSet[content] + if ok { return true, nil } } return false, nil } +func parseEventContentsFromPodLogs(logs string) (map[string]bool, error) { + re := regexp.MustCompile(`{.+?}`) + matches := re.FindAllString(logs, -1) + eventContentsSet := make(map[string]bool) + for _, match := range matches { + var matchedLogs map[string]string + err := json.Unmarshal([]byte(match), &matchedLogs) + if err != nil { + return nil, err + } else { + eventContent := matchedLogs["msg"] + eventContentsSet[eventContent] = true + } + } + return eventContentsSet, nil +} + // getContainerName gets name of the first container of the given pod. // Now our logger pod only contains one single container, and is only used for receiving events and validation. func (client *Client) getContainerName(podName, namespace string) (string, error) { diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 62910fc083b..277947739a1 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -22,6 +22,7 @@ import ( "fmt" "knative.dev/eventing/test/base/resources" "knative.dev/eventing/test/common" + "sort" "strings" "testing" "time" @@ -42,22 +43,22 @@ const ( eventType2 = "type2" eventSource1 = "source1" eventSource2 = "source2" - nilString = "nilstring" - extensionName = `myextname` - extensionValue = `myextval` + extensionName1 = "extname1" + extensionValue1 = "extval1" + extensionName2 = "extname2" + extensionValue2 = "extvalue2" ) -type eventMeta struct { - Type string - Source string - ExtensionName string - ExtensionValue string +type eventContext struct { + Type string + Source string + Extensions map[string]interface{} } // Helper struct to tie the type and sources of the events we expect to receive // in subscribers with the selectors we use when creating their pods. type eventReceiver struct { - meta eventMeta + context eventContext selector map[string]string } @@ -68,60 +69,64 @@ type eventReceiver struct { func TestDefaultBrokerWithManyTriggers(t *testing.T) { tests := []struct { name string - eventsToReceive []eventReceiver // These are the event types and sources that triggers will listen to, as well as the selectors - // to set in the subscriber and services pods. - eventsToSend []eventMeta // These are the event types and sources that will be send. - deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not - extensionExists bool - }{{ - name: "test default broker with many deprecated triggers", - eventsToReceive: []eventReceiver{ - {eventMeta{Type: any, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - {eventMeta{Type: eventType1, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - {eventMeta{Type: any, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - {eventMeta{Type: eventType1, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - }, - eventsToSend: []eventMeta{ - {eventType1, eventSource1, nilString, nilString}, - {eventType1, eventSource2, nilString, nilString}, - {eventType2, eventSource1, nilString, nilString}, - {eventType2, eventSource2, nilString, nilString}, - }, - deprecatedTriggerFilter: true, - extensionExists: false, - }, { - name: "test default broker with many attribute triggers", - eventsToReceive: []eventReceiver{ - {eventMeta{Type: any, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - {eventMeta{Type: eventType1, Source: any, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - {eventMeta{Type: any, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - {eventMeta{Type: eventType1, Source: eventSource1, ExtensionName: nilString, ExtensionValue: nilString}, newSelector()}, - }, - eventsToSend: []eventMeta{ - {eventType1, eventSource1, nilString, nilString}, - {eventType1, eventSource2, nilString, nilString}, - {eventType2, eventSource1, nilString, nilString}, - {eventType2, eventSource2, nilString, nilString}, + eventsToReceive []eventReceiver // These are the event context attributes and extension attributes that triggers will listen to, + // to set in the subscriber and services pod + eventsToSend []eventContext // These are the event context attributes and extension attributes that will be send. + deprecatedTriggerFilter bool //TriggerFilter with DeprecatedSourceAndType or not + }{ + { + name: "test default broker with many deprecated triggers", + eventsToReceive: []eventReceiver{ + {eventContext{Type: any, Source: any, Extensions: nil}, newSelector()}, + {eventContext{Type: eventType1, Source: any, Extensions: nil}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: nil}, newSelector()}, + {eventContext{Type: eventType1, Source: eventSource1, Extensions: nil}, newSelector()}, + }, + eventsToSend: []eventContext{ + {Type: eventType1, Source: eventSource1, Extensions: nil}, + {Type: eventType1, Source: eventSource2, Extensions: nil}, + {Type: eventType2, Source: eventSource1, Extensions: nil}, + {Type: eventType2, Source: eventSource2, Extensions: nil}, + }, + deprecatedTriggerFilter: true, + }, { + name: "test default broker with many attribute triggers", + eventsToReceive: []eventReceiver{ + {eventContext{Type: any, Source: any, Extensions: nil}, newSelector()}, + {eventContext{Type: eventType1, Source: any, Extensions: nil}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: nil}, newSelector()}, + {eventContext{Type: eventType1, Source: eventSource1, Extensions: nil}, newSelector()}, + }, + eventsToSend: []eventContext{ + {Type: eventType1, Source: eventSource1, Extensions: nil}, + {Type: eventType1, Source: eventSource2, Extensions: nil}, + {Type: eventType2, Source: eventSource1, Extensions: nil}, + {Type: eventType2, Source: eventSource2, Extensions: nil}, + }, + deprecatedTriggerFilter: false, }, - deprecatedTriggerFilter: false, - extensionExists: false, - }, { name: "test default broker with many attribute and extension triggers", eventsToReceive: []eventReceiver{ - {eventMeta{Type: any, Source: any, ExtensionName: extensionName, ExtensionValue: extensionValue}, newSelector()}, - {eventMeta{Type: eventType1, Source: any, ExtensionName: extensionName, ExtensionValue: extensionValue}, newSelector()}, - {eventMeta{Type: any, Source: any, ExtensionName: extensionName, ExtensionValue: any}, newSelector()}, - {eventMeta{Type: any, Source: eventSource1, ExtensionName: extensionName, ExtensionValue: extensionValue}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1,},}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2},}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName2: extensionValue2},}, newSelector()}, + {eventContext{Type: eventType1, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1,},}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: any,},}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1,},}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2},}, newSelector()}, }, - eventsToSend: []eventMeta{ - {eventType1, eventSource1, extensionName, extensionValue}, - {eventType1, eventSource2, extensionName, extensionValue}, - {eventType2, eventSource1, extensionName, "non.matching.ext.val"}, - {eventType2, eventSource2, "non.matching.ext.name", extensionValue}, + eventsToSend: []eventContext{ + {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1,}}, + {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, + {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName2: extensionValue2}}, + {Type: eventType1, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1,}}, + {Type: eventType2, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: "non.matching.ext.val",}}, + {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{"non.matching.ext.name": extensionValue1,}}, + {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, + {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1, "non.matching.ext.name": extensionValue2}}, }, deprecatedTriggerFilter: false, - extensionExists: true, }, } for _, test := range tests { @@ -141,16 +146,16 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { // Create subscribers. for _, event := range test.eventsToReceive { - subscriberName := name("dumper", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) + subscriberName := name("dumper", event.context.Type, event.context.Source, event.context.Extensions) pod := resources.EventLoggerPod(subscriberName) client.CreatePodOrFail(pod, common.WithService(subscriberName)) } // Create triggers. for _, event := range test.eventsToReceive { - triggerName := name("trigger", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) - subscriberName := name("dumper", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) - triggerOption := getTriggerFilterOption(test.deprecatedTriggerFilter, test.extensionExists, event.meta) + triggerName := name("trigger", event.context.Type, event.context.Source, event.context.Extensions) + subscriberName := name("dumper", event.context.Type, event.context.Source, event.context.Extensions) + triggerOption := getTriggerFilterOption(test.deprecatedTriggerFilter, event.context) client.CreateTriggerOrFail(triggerName, resources.WithSubscriberRefForTrigger(subscriberName), triggerOption, @@ -168,12 +173,12 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { unexpectedEvents := make(map[string][]string) for _, eventToSend := range test.eventsToSend { // Create cloud event. - // Using event type and source as part of the body for easier debugging. - body := fmt.Sprintf("Body:eventType[%s]-eventSource[%s]-eventExtensionName[%s]-eventExtensionValue[%s]", - eventToSend.Type, eventToSend.Source, eventToSend.ExtensionName, eventToSend.ExtensionValue) + // Using event type, source and extensions as part of the body for easier debugging. + extensionsStr := joinSortedExtensions(eventToSend.Extensions) + body := fmt.Sprintf(("Body-%s-%s-%s"), eventToSend.Type, eventToSend.Source, extensionsStr) cloudEvent := makeCloudEvent(eventToSend, body) // Create sender pod. - senderPodName := name("sender", eventToSend.Type, eventToSend.Source, eventToSend.ExtensionName, eventToSend.ExtensionValue) + senderPodName := name("sender", eventToSend.Type, eventToSend.Source, eventToSend.Extensions) if err := client.SendFakeEventToAddressable(senderPodName, defaultBrokerName, common.BrokerTypeMeta, cloudEvent); err != nil { t.Fatalf("Error send cloud event to broker: %v", err) } @@ -181,7 +186,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { // Check on every dumper whether we should expect this event or not, and add its body // to the expectedEvents/unexpectedEvents maps. for _, eventToReceive := range test.eventsToReceive { - subscriberName := name("dumper", eventToReceive.meta.Type, eventToReceive.meta.Source, eventToReceive.meta.ExtensionName, eventToReceive.meta.ExtensionValue) + subscriberName := name("dumper", eventToReceive.context.Type, eventToReceive.context.Source, eventToReceive.context.Extensions) if shouldExpectEvent(&eventToSend, &eventToReceive, t.Logf) { expectedEvents[subscriberName] = append(expectedEvents[subscriberName], body) } else { @@ -191,7 +196,7 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } for _, event := range test.eventsToReceive { - subscriberName := name("dumper", event.meta.Type, event.meta.Source, event.meta.ExtensionName, event.meta.ExtensionValue) + subscriberName := name("dumper", event.context.Type, event.context.Source, event.context.Extensions) if err := client.CheckLog(subscriberName, common.CheckerContainsAll(expectedEvents[subscriberName])); err != nil { t.Fatalf("Event(s) not found in logs of subscriber pod %q: %v", subscriberName, err) } @@ -210,38 +215,24 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { } -func makeCloudEvent(eventToSend eventMeta, body string) *resources.CloudEvent { - if eventToSend.ExtensionName != nilString { - return &resources.CloudEvent{ - Source: eventToSend.Source, - Type: eventToSend.Type, - ExtensionName: eventToSend.ExtensionName, - ExtensionValue: eventToSend.ExtensionValue, - Data: fmt.Sprintf(`{"msg":%q}`, body), - } - } else { - return &resources.CloudEvent{ - Source: eventToSend.Source, - Type: eventToSend.Type, - Data: fmt.Sprintf(`{"msg":%q}`, body), - } - } +func makeCloudEvent(eventToSend eventContext, body string) *resources.CloudEvent { + return &resources.CloudEvent{ + Source: eventToSend.Source, + Type: eventToSend.Type, + Extensions: eventToSend.Extensions, + Data: fmt.Sprintf(`{"msg":%q}`, body),} } -func getTriggerFilterOption(deprecatedTriggerFilter, extensionExists bool, eventMeta eventMeta) resources.TriggerOption { +func getTriggerFilterOption(deprecatedTriggerFilter bool, context eventContext) resources.TriggerOption { if deprecatedTriggerFilter { - return resources.WithDeprecatedSourceAndTypeTriggerFilter(eventMeta.Source, eventMeta.Type) + return resources.WithDeprecatedSourceAndTypeTriggerFilter(context.Source, context.Type) } else { - if extensionExists { - return resources.WithAttributesAndExtensionTriggerFilter(eventMeta.Source, eventMeta.Type, eventMeta.ExtensionName, eventMeta.ExtensionValue) - } else { - return resources.WithAttributesTriggerFilter(eventMeta.Source, eventMeta.Type) - } + return resources.WithAttributesTriggerFilter(context.Source, context.Type, context.Extensions) } } // Helper function to create names for different objects (e.g., triggers, services, etc.). -func name(obj, eventType, eventSource, eventExtensionName, eventExtensionValue string) string { +func name(obj, eventType, eventSource string, extensions map[string]interface{}) string { // Pod names need to be lowercase. We might have an eventType as Any, that is why we lowercase them. if eventType == "" { eventType = "testany" @@ -249,22 +240,48 @@ func name(obj, eventType, eventSource, eventExtensionName, eventExtensionValue s if eventSource == "" { eventSource = "testany" } - if eventExtensionName == nilString { - eventExtensionName = "notexists" + if len(extensions) == 0 { + return strings.ToLower(fmt.Sprintf( + "%s-%s-%s", + obj, + eventType, + eventSource)) + } else { + extensionsStr := joinSortedExtensions(extensions) + return strings.ToLower(fmt.Sprintf( + "%s-%s-%s-%s", + obj, + eventType, + eventSource, + extensionsStr)) } - if eventExtensionValue == "" { - eventExtensionValue = "testany" +} + +func joinSortedExtensions(extensions map[string]interface{}) string { + var sb strings.Builder + sortedExtensionNames := sortedKeys(extensions) + for _, sortedExtensionName := range sortedExtensionNames { + sb.WriteString("-") + sb.WriteString(sortedExtensionName) + sb.WriteString("-") + vStr := fmt.Sprintf("%v", extensions[sortedExtensionName]) + if vStr == "" { + vStr = "testany" + } + sb.WriteString(vStr) } - if eventExtensionValue == nilString { - eventExtensionValue = "notexists" + return sb.String() +} + +func sortedKeys(m map[string]interface{}) []string { + keys := make([]string, len(m)) + i := 0 + for k := range m { + keys[i] = k + i++ } - return strings.ToLower(fmt.Sprintf( - "%s-%s-%s-%s-%s", - obj, - eventType, - eventSource, - eventExtensionName, - eventExtensionValue)) + sort.Strings(keys) + return keys } // Returns a new selector with a random uuid. @@ -273,22 +290,24 @@ func newSelector() map[string]string { } // Checks whether we should expect to receive 'eventToSend' in 'eventReceiver' based on its type and source pattern. -func shouldExpectEvent(eventToSend *eventMeta, receiver *eventReceiver, logf logging.FormatLogger) bool { - if receiver.meta.Type != any && receiver.meta.Type != eventToSend.Type { - return false - } - if receiver.meta.Source != any && receiver.meta.Source != eventToSend.Source { +func shouldExpectEvent(eventToSend *eventContext, receiver *eventReceiver, logf logging.FormatLogger) bool { + if receiver.context.Type != any && receiver.context.Type != eventToSend.Type { return false } - //event extension does not exists, return True - if receiver.meta.ExtensionName == nilString { - return true - } - if receiver.meta.ExtensionName != eventToSend.ExtensionName { + if receiver.context.Source != any && receiver.context.Source != eventToSend.Source { return false } - if receiver.meta.ExtensionValue != any && receiver.meta.ExtensionValue != eventToSend.ExtensionValue { - return false + for k, v := range receiver.context.Extensions { + var value interface{} + value, ok := eventToSend.Extensions[k] + // If the attribute does not exist in the event, return false. + if !ok { + return false + } + // If the attribute is not set to any and is different than the one from the event, return false. + if v != any && v != value { + return false + } } return true } diff --git a/test/test_images/sendevents/main.go b/test/test_images/sendevents/main.go index 02c51563397..34e10359d53 100644 --- a/test/test_images/sendevents/main.go +++ b/test/test_images/sendevents/main.go @@ -31,17 +31,16 @@ import ( ) var ( - sink string - eventID string - eventType string - eventSource string - eventExtensionName string - eventExtensionValue string - eventData string - eventEncoding string - periodStr string - delayStr string - maxMsgStr string + sink string + eventID string + eventType string + eventSource string + eventExtensionsMapStr string + eventData string + eventEncoding string + periodStr string + delayStr string + maxMsgStr string ) func init() { @@ -49,8 +48,7 @@ func init() { flag.StringVar(&eventID, "event-id", "", "Event ID to use. Defaults to a generated UUID") flag.StringVar(&eventType, "event-type", "knative.eventing.test.e2e", "The Event Type to use.") flag.StringVar(&eventSource, "event-source", "", "Source URI to use. Defaults to the current machine's hostname") - flag.StringVar(&eventExtensionName, "event-extension-name", "", "The extension name of event.") - flag.StringVar(&eventExtensionValue, "event-extension-value", "", "The extension value of event") + flag.StringVar(&eventExtensionsMapStr, "event-extensions", "", "The extensions of event with json format.") flag.StringVar(&eventData, "event-data", `{"hello": "world!"}`, "Cloudevent data body.") flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event, one of(binary, structured).") flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") @@ -142,9 +140,16 @@ func main() { } event.SetType(eventType) event.SetSource(eventSource) - if eventExtensionName != "" { - event.SetExtension(eventExtensionName, eventExtensionValue) + + var eventExtensions map[string]interface{} + if err := json.Unmarshal([]byte(eventExtensionsMapStr), &eventExtensions); err != nil { + log.Println("Can't unmarshall cloud event extensions to map[string]interface{}") + os.Exit(1) + } + for k, v := range eventExtensions { + event.SetExtension(k, v) } + if err := event.SetData(untyped); err != nil { log.Fatalf("failed to set data, %v", err) } From 7e0e23633f9ed2a4f2bac634b02280d326501cb0 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Wed, 21 Aug 2019 16:04:14 -0700 Subject: [PATCH 07/10] edited the code based on code review, changed some format --- test/base/resources/kube.go | 3 +- test/common/validation.go | 7 +++- test/e2e/broker_default_test.go | 58 ++++++++++++----------------- test/test_images/sendevents/main.go | 31 ++++++++------- 4 files changed, 45 insertions(+), 54 deletions(-) diff --git a/test/base/resources/kube.go b/test/base/resources/kube.go index 7b30352e913..d60053809a9 100644 --- a/test/base/resources/kube.go +++ b/test/base/resources/kube.go @@ -40,8 +40,7 @@ func EventSenderPod(name string, sink string, event *CloudEvent) (*corev1.Pod, e eventExtensionsBytes, error := json.Marshal(event.Extensions) eventExtensions := string(eventExtensionsBytes) if error != nil { - - return nil, fmt.Errorf("Encountered error when we marshall cloud event extensions %v", error) + return nil, fmt.Errorf("encountered error when we marshall cloud event extensions %v", error) } return &corev1.Pod{ diff --git a/test/common/validation.go b/test/common/validation.go index 3c07aa81f96..38bd6226079 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -99,14 +99,17 @@ func (client *Client) FindAnyLogContents(podName string, contents []string) (boo return false, err } for _, content := range contents { - _, ok := eventContentsSet[content] - if ok { + if _, ok := eventContentsSet[content]; ok { return true, nil } } return false, nil } +//Example log entry: 2019/08/21 22:46:38 {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} +//Use regex to get the event content with json format: {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} +//Get the eventContent with key "msg" +//Returns a set(map implementation) with all unique event contents func parseEventContentsFromPodLogs(logs string) (map[string]bool, error) { re := regexp.MustCompile(`{.+?}`) matches := re.FindAllString(logs, -1) diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 277947739a1..6becd69e143 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -43,6 +43,8 @@ const ( eventType2 = "type2" eventSource1 = "source1" eventSource2 = "source2" + //Be careful with the length of extension name and values, + // we use extension name and value as a part of the name of resources like subscriber and trigger, the maximum characters allowed of resource name is 63 extensionName1 = "extname1" extensionValue1 = "extval1" extensionName2 = "extname2" @@ -77,31 +79,31 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { { name: "test default broker with many deprecated triggers", eventsToReceive: []eventReceiver{ - {eventContext{Type: any, Source: any, Extensions: nil}, newSelector()}, - {eventContext{Type: eventType1, Source: any, Extensions: nil}, newSelector()}, - {eventContext{Type: any, Source: eventSource1, Extensions: nil}, newSelector()}, - {eventContext{Type: eventType1, Source: eventSource1, Extensions: nil}, newSelector()}, + {eventContext{Type: any, Source: any}, newSelector()}, + {eventContext{Type: eventType1, Source: any}, newSelector()}, + {eventContext{Type: any, Source: eventSource1}, newSelector()}, + {eventContext{Type: eventType1, Source: eventSource1}, newSelector()}, }, eventsToSend: []eventContext{ - {Type: eventType1, Source: eventSource1, Extensions: nil}, - {Type: eventType1, Source: eventSource2, Extensions: nil}, - {Type: eventType2, Source: eventSource1, Extensions: nil}, - {Type: eventType2, Source: eventSource2, Extensions: nil}, + {Type: eventType1, Source: eventSource1}, + {Type: eventType1, Source: eventSource2}, + {Type: eventType2, Source: eventSource1}, + {Type: eventType2, Source: eventSource2}, }, deprecatedTriggerFilter: true, }, { name: "test default broker with many attribute triggers", eventsToReceive: []eventReceiver{ - {eventContext{Type: any, Source: any, Extensions: nil}, newSelector()}, - {eventContext{Type: eventType1, Source: any, Extensions: nil}, newSelector()}, - {eventContext{Type: any, Source: eventSource1, Extensions: nil}, newSelector()}, - {eventContext{Type: eventType1, Source: eventSource1, Extensions: nil}, newSelector()}, + {eventContext{Type: any, Source: any}, newSelector()}, + {eventContext{Type: eventType1, Source: any}, newSelector()}, + {eventContext{Type: any, Source: eventSource1}, newSelector()}, + {eventContext{Type: eventType1, Source: eventSource1}, newSelector()}, }, eventsToSend: []eventContext{ - {Type: eventType1, Source: eventSource1, Extensions: nil}, - {Type: eventType1, Source: eventSource2, Extensions: nil}, - {Type: eventType2, Source: eventSource1, Extensions: nil}, - {Type: eventType2, Source: eventSource2, Extensions: nil}, + {Type: eventType1, Source: eventSource1}, + {Type: eventType1, Source: eventSource2}, + {Type: eventType2, Source: eventSource1}, + {Type: eventType2, Source: eventSource2}, }, deprecatedTriggerFilter: false, }, @@ -240,21 +242,11 @@ func name(obj, eventType, eventSource string, extensions map[string]interface{}) if eventSource == "" { eventSource = "testany" } - if len(extensions) == 0 { - return strings.ToLower(fmt.Sprintf( - "%s-%s-%s", - obj, - eventType, - eventSource)) - } else { - extensionsStr := joinSortedExtensions(extensions) - return strings.ToLower(fmt.Sprintf( - "%s-%s-%s-%s", - obj, - eventType, - eventSource, - extensionsStr)) + name := strings.ToLower(fmt.Sprintf("%s-%s-%s", obj, eventType, eventSource)) + if len(extensions) > 0 { + name = strings.ToLower(fmt.Sprintf("%s-%s", name, joinSortedExtensions(extensions))) } + return name } func joinSortedExtensions(extensions map[string]interface{}) string { @@ -274,11 +266,9 @@ func joinSortedExtensions(extensions map[string]interface{}) string { } func sortedKeys(m map[string]interface{}) []string { - keys := make([]string, len(m)) - i := 0 + keys := make([]string, 0, len(m)) for k := range m { - keys[i] = k - i++ + keys = append(keys, k) } sort.Strings(keys) return keys diff --git a/test/test_images/sendevents/main.go b/test/test_images/sendevents/main.go index 34e10359d53..7812013c48b 100644 --- a/test/test_images/sendevents/main.go +++ b/test/test_images/sendevents/main.go @@ -31,16 +31,16 @@ import ( ) var ( - sink string - eventID string - eventType string - eventSource string - eventExtensionsMapStr string - eventData string - eventEncoding string - periodStr string - delayStr string - maxMsgStr string + sink string + eventID string + eventType string + eventSource string + eventExtensions string + eventData string + eventEncoding string + periodStr string + delayStr string + maxMsgStr string ) func init() { @@ -48,7 +48,7 @@ func init() { flag.StringVar(&eventID, "event-id", "", "Event ID to use. Defaults to a generated UUID") flag.StringVar(&eventType, "event-type", "knative.eventing.test.e2e", "The Event Type to use.") flag.StringVar(&eventSource, "event-source", "", "Source URI to use. Defaults to the current machine's hostname") - flag.StringVar(&eventExtensionsMapStr, "event-extensions", "", "The extensions of event with json format.") + flag.StringVar(&eventExtensions, "event-extensions", "", "The extensions of event with json format.") flag.StringVar(&eventData, "event-data", `{"hello": "world!"}`, "Cloudevent data body.") flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event, one of(binary, structured).") flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") @@ -141,12 +141,11 @@ func main() { event.SetType(eventType) event.SetSource(eventSource) - var eventExtensions map[string]interface{} - if err := json.Unmarshal([]byte(eventExtensionsMapStr), &eventExtensions); err != nil { - log.Println("Can't unmarshall cloud event extensions to map[string]interface{}") - os.Exit(1) + var extensions map[string]interface{} + if err := json.Unmarshal([]byte(eventExtensions), &extensions); err != nil { + log.Fatalf("Encountered error when unmarshalling cloud event extensions to map[string]interface{}: %v", err) } - for k, v := range eventExtensions { + for k, v := range extensions { event.SetExtension(k, v) } From fe1c97e2ac371e3cfb2699cfeb2e7be91cecc02c Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Wed, 21 Aug 2019 17:52:50 -0700 Subject: [PATCH 08/10] changed comment format, changed set with map implementation to k8s set --- test/base/resources/eventing.go | 3 +-- test/common/validation.go | 18 ++++++++++-------- test/e2e/broker_default_test.go | 2 +- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/test/base/resources/eventing.go b/test/base/resources/eventing.go index 143a55d01b8..55f5bdedbe4 100644 --- a/test/base/resources/eventing.go +++ b/test/base/resources/eventing.go @@ -129,10 +129,9 @@ func WithAttributesTriggerFilter(eventSource, eventType string, extensions map[s } triggerFilterAttributes := eventingv1alpha1.TriggerFilterAttributes(attrs) return func(t *eventingv1alpha1.Trigger) { - triggerFilter := &eventingv1alpha1.TriggerFilter{ + t.Spec.Filter = &eventingv1alpha1.TriggerFilter{ Attributes: &triggerFilterAttributes, } - t.Spec.Filter = triggerFilter } } diff --git a/test/common/validation.go b/test/common/validation.go index 38bd6226079..4e5aeb96b5d 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -23,6 +23,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" ) @@ -99,21 +100,22 @@ func (client *Client) FindAnyLogContents(podName string, contents []string) (boo return false, err } for _, content := range contents { - if _, ok := eventContentsSet[content]; ok { + if eventContentsSet.Has(content) { return true, nil } } return false, nil } -//Example log entry: 2019/08/21 22:46:38 {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} -//Use regex to get the event content with json format: {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} -//Get the eventContent with key "msg" -//Returns a set(map implementation) with all unique event contents -func parseEventContentsFromPodLogs(logs string) (map[string]bool, error) { +// parseEventContentsFromPodLogs extracts the contents of events from a Pod logs +// Example log entry: 2019/08/21 22:46:38 {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} +// Use regex to get the event content with json format: {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"} +// Get the eventContent with key "msg" +// Returns a set with all unique event contents +func parseEventContentsFromPodLogs(logs string) (sets.String, error) { re := regexp.MustCompile(`{.+?}`) matches := re.FindAllString(logs, -1) - eventContentsSet := make(map[string]bool) + eventContentsSet := sets.String{} for _, match := range matches { var matchedLogs map[string]string err := json.Unmarshal([]byte(match), &matchedLogs) @@ -121,7 +123,7 @@ func parseEventContentsFromPodLogs(logs string) (map[string]bool, error) { return nil, err } else { eventContent := matchedLogs["msg"] - eventContentsSet[eventContent] = true + eventContentsSet.Insert(eventContent) } } return eventContentsSet, nil diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 6becd69e143..831ceaeaa1f 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -43,7 +43,7 @@ const ( eventType2 = "type2" eventSource1 = "source1" eventSource2 = "source2" - //Be careful with the length of extension name and values, + // Be careful with the length of extension name and values, // we use extension name and value as a part of the name of resources like subscriber and trigger, the maximum characters allowed of resource name is 63 extensionName1 = "extname1" extensionValue1 = "extval1" From 4a429168e54d29f8bdce708ab7a081dc3af81701 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Wed, 21 Aug 2019 22:09:22 -0700 Subject: [PATCH 09/10] use gofmt to change formatting --- test/e2e/broker_default_test.go | 36 ++++++++++++++++----------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/test/e2e/broker_default_test.go b/test/e2e/broker_default_test.go index 831ceaeaa1f..d8eccc13b83 100644 --- a/test/e2e/broker_default_test.go +++ b/test/e2e/broker_default_test.go @@ -20,8 +20,6 @@ package e2e import ( "fmt" - "knative.dev/eventing/test/base/resources" - "knative.dev/eventing/test/common" "sort" "strings" "testing" @@ -30,6 +28,8 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/pkg/apis/eventing/v1alpha1" pkgResources "knative.dev/eventing/pkg/reconciler/namespace/resources" + "knative.dev/eventing/test/base/resources" + "knative.dev/eventing/test/common" "knative.dev/pkg/test/logging" ) @@ -45,10 +45,10 @@ const ( eventSource2 = "source2" // Be careful with the length of extension name and values, // we use extension name and value as a part of the name of resources like subscriber and trigger, the maximum characters allowed of resource name is 63 - extensionName1 = "extname1" - extensionValue1 = "extval1" - extensionName2 = "extname2" - extensionValue2 = "extvalue2" + extensionName1 = "extname1" + extensionValue1 = "extval1" + extensionName2 = "extname2" + extensionValue2 = "extvalue2" ) type eventContext struct { @@ -110,21 +110,21 @@ func TestDefaultBrokerWithManyTriggers(t *testing.T) { { name: "test default broker with many attribute and extension triggers", eventsToReceive: []eventReceiver{ - {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1,},}, newSelector()}, - {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2},}, newSelector()}, - {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName2: extensionValue2},}, newSelector()}, - {eventContext{Type: eventType1, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1,},}, newSelector()}, - {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: any,},}, newSelector()}, - {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1,},}, newSelector()}, - {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2},}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName2: extensionValue2}}, newSelector()}, + {eventContext{Type: eventType1, Source: any, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, newSelector()}, + {eventContext{Type: any, Source: any, Extensions: map[string]interface{}{extensionName1: any}}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, newSelector()}, + {eventContext{Type: any, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, newSelector()}, }, eventsToSend: []eventContext{ - {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1,}}, + {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, {Type: eventType1, Source: eventSource1, Extensions: map[string]interface{}{extensionName2: extensionValue2}}, - {Type: eventType1, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1,}}, - {Type: eventType2, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: "non.matching.ext.val",}}, - {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{"non.matching.ext.name": extensionValue1,}}, + {Type: eventType1, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1}}, + {Type: eventType2, Source: eventSource1, Extensions: map[string]interface{}{extensionName1: "non.matching.ext.val"}}, + {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{"non.matching.ext.name": extensionValue1}}, {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1, extensionName2: extensionValue2}}, {Type: eventType2, Source: eventSource2, Extensions: map[string]interface{}{extensionName1: extensionValue1, "non.matching.ext.name": extensionValue2}}, }, @@ -222,7 +222,7 @@ func makeCloudEvent(eventToSend eventContext, body string) *resources.CloudEvent Source: eventToSend.Source, Type: eventToSend.Type, Extensions: eventToSend.Extensions, - Data: fmt.Sprintf(`{"msg":%q}`, body),} + Data: fmt.Sprintf(`{"msg":%q}`, body)} } func getTriggerFilterOption(deprecatedTriggerFilter bool, context eventContext) resources.TriggerOption { From 9ad9709c7d58df02a722d8067b23d3fa55c2f06c Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Thu, 22 Aug 2019 16:48:04 -0700 Subject: [PATCH 10/10] changed regex to parse event contents from logs --- test/common/validation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/validation.go b/test/common/validation.go index 4e5aeb96b5d..ad41cc7591c 100644 --- a/test/common/validation.go +++ b/test/common/validation.go @@ -113,7 +113,7 @@ func (client *Client) FindAnyLogContents(podName string, contents []string) (boo // Get the eventContent with key "msg" // Returns a set with all unique event contents func parseEventContentsFromPodLogs(logs string) (sets.String, error) { - re := regexp.MustCompile(`{.+?}`) + re := regexp.MustCompile(`{.+}`) matches := re.FindAllString(logs, -1) eventContentsSet := sets.String{} for _, match := range matches {