diff --git a/test/conformance/broker_data_plane_test.go b/test/conformance/broker_data_plane_test.go new file mode 100644 index 00000000000..83d15c1a2b3 --- /dev/null +++ b/test/conformance/broker_data_plane_test.go @@ -0,0 +1,41 @@ +// +build e2e + +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package conformance + +import ( + "testing" + + "knative.dev/eventing/test/conformance/helpers" + testlib "knative.dev/eventing/test/lib" +) + +func TestBrokerV1Beta1DataPlaneIngress(t *testing.T) { + client := testlib.Setup(t, true, testlib.SetupClientOptionNoop) + defer testlib.TearDown(client) + + broker := helpers.BrokerDataPlaneSetupHelper(client, brokerName, brokerNamespace, brokerClass) + helpers.BrokerV1Beta1IngressDataPlaneTestHelper(t, client, broker) +} +func TestBrokerV1Beta1DataPlaneConsumer(t *testing.T) { + client := testlib.Setup(t, true, testlib.SetupClientOptionNoop) + defer testlib.TearDown(client) + + broker := helpers.BrokerDataPlaneSetupHelper(client, brokerName, brokerNamespace, brokerClass) + helpers.BrokerV1Beta1ConsumerDataPlaneTestHelper(t, client, broker) +} diff --git a/test/conformance/helpers/broker_data_plane_test_helper.go b/test/conformance/helpers/broker_data_plane_test_helper.go new file mode 100644 index 00000000000..1e34098dd67 --- /dev/null +++ b/test/conformance/helpers/broker_data_plane_test_helper.go @@ -0,0 +1,353 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helpers + +import ( + "fmt" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" + "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/lib/sender" + + ce "github.com/cloudevents/sdk-go/v2" + cetest "github.com/cloudevents/sdk-go/v2/test" +) + +func BrokerDataPlaneSetupHelper(client *testlib.Client, brokerName, brokerNamespace, brokerClass string) *eventingv1beta1.Broker { + var broker *eventingv1beta1.Broker + var err error + if brokerName == "" || brokerNamespace == "" { + brokerName = "br" + config := client.CreateBrokerConfigMapOrFail(brokerName, &testlib.DefaultChannel) + + broker = client.CreateBrokerV1Beta1OrFail( + brokerName, + resources.WithBrokerClassForBrokerV1Beta1(brokerClass), + resources.WithConfigForBrokerV1Beta1(config), + ) + client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta) + } else { + if broker, err = client.Eventing.EventingV1beta1().Brokers(brokerNamespace).Get(brokerName, metav1.GetOptions{}); err != nil { + client.T.Fatalf("Could not Get broker %s/%s: %v", brokerNamespace, brokerName, err) + } + } + return broker +} + +//At ingress +//Supports CE 0.3 or CE 1.0 via HTTP +//Supports structured or Binary mode +//Respond with 2xx on good CE +//Respond with 400 on bad CE +//Reject non-POST requests to publish URI +func BrokerV1Beta1IngressDataPlaneTestHelper( + t *testing.T, + client *testlib.Client, + broker *eventingv1beta1.Broker, +) { + triggerName := "trigger" + loggerName := "logger-pod" + eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) + defer eventTracker.Cleanup() + client.WaitForAllTestResourcesReadyOrFail() + + trigger := client.CreateTriggerOrFailV1Beta1( + triggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + + client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) + + t.Run("Ingress Supports CE0.3", func(t *testing.T) { + eventID := "CE0.3" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("0.3.event.sender.test.knative.dev") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + event.Context.AsV03() + event.SetSpecVersion("0.3") + client.SendEventToAddressable("v03-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasId(eventID), + cetest.HasSpecVersion("0.3"), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Ingress Supports CE1.0", func(t *testing.T) { + eventID := "CE1.0" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("1.0.event.sender.test.knative.dev") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("v10-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasId(eventID), + cetest.HasSpecVersion("1.0"), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + + }) + t.Run("Ingress Supports Structured Mode", func(t *testing.T) { + eventID := "Structured-Mode" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("structured.mode.event.sender.test.knative.dev") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("structured-test-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingStructured)) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasId(eventID), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Ingress Supports Binary Mode", func(t *testing.T) { + eventID := "Binary-Mode" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("binary.mode.event.sender.test.knative.dev") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("binary-test-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingBinary)) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasId(eventID), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Respond with 2XX on good CE", func(t *testing.T) { + eventID := "2hundred-on-good-ce" + body := fmt.Sprintf(`{"msg":%q}`, eventID) + responseSink := "http://" + client.GetServiceHost(loggerName) + client.SendRequestToAddressable("twohundred-test-sender", broker.Name, testlib.BrokerTypeMeta, + map[string]string{ + "ce-specversion": "1.0", + "ce-type": testlib.DefaultEventType, + "ce-source": "2xx.request.sender.test.knative.dev", + "ce-id": eventID, + "content-type": ce.ApplicationJSON, + }, + body, + sender.WithResponseSink(responseSink), + ) + + eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(202))) // should probably be a range + + }) + //Respond with 400 on bad CE + t.Run("Respond with 400 on bad CE", func(t *testing.T) { + eventID := "four-hundred-on-bad-ce" + body := ";la}{kjsdf;oai2095{}{}8234092349807asdfashdf" + responseSink := "http://" + client.GetServiceHost(loggerName) + client.SendRequestToAddressable("fourhundres-test-sender", broker.Name, testlib.BrokerTypeMeta, + map[string]string{ + "ce-specversion": "9000.1", //its over 9,000! + "ce-type": testlib.DefaultEventType, + "ce-source": "400.request.sender.test.knative.dev", + "ce-id": eventID, + "content-type": ce.ApplicationJSON, + }, + body, + sender.WithResponseSink(responseSink)) + eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(400))) + }) + +} + +//At consumer +//No upgrade of version +//Attributes received should be the same as produced (attributes may be added) +//Events are filtered +//Events are delivered to multiple subscribers +//Deliveries succeed at least once +//Replies are accepted and delivered +//Replies that are unsuccessfully forwarded cause initial message to be redelivered (Very difficult to test, can be ignored) +func BrokerV1Beta1ConsumerDataPlaneTestHelper( + t *testing.T, + client *testlib.Client, + broker *eventingv1beta1.Broker, +) { + triggerName := "trigger" + secondTriggerName := "second-trigger" + loggerName := "logger-pod" + secondLoggerName := "second-logger-pod" + eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) + defer eventTracker.Cleanup() + secondTracker, _ := recordevents.StartEventRecordOrFail(client, secondLoggerName) + defer secondTracker.Cleanup() + client.WaitForAllTestResourcesReadyOrFail() + + trigger := client.CreateTriggerOrFailV1Beta1( + triggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + + client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) + secondTrigger := client.CreateTriggerOrFailV1Beta1( + secondTriggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1("filtered-event", eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(secondLoggerName), + ) + client.WaitForResourceReadyOrFail(secondTrigger.Name, testlib.TriggerTypeMeta) + eventID := "consumer-broker-tests" + baseSource := "consumer-test-sender" + baseEvent := ce.NewEvent() + baseEvent.SetID(eventID) + baseEvent.SetType(testlib.DefaultEventType) + baseEvent.SetSource(baseSource) + baseEvent.SetSpecVersion("1.0") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := baseEvent.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the baseEvent: %s", err.Error()) + } + + t.Run("No upgrade of version", func(t *testing.T) { + event := baseEvent + source := "no-upgrade" + event.SetID(source) + event.Context = event.Context.AsV03() + + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingStructured)) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSpecVersion("0.3"), + cetest.HasId("no-upgrade"), + )) + eventTracker.AssertExact(1, originalEventMatcher) + + }) + + t.Run("Attributes received should be the same as produced (attributes may be added)", func(t *testing.T) { + event := baseEvent + id := "identical-attibutes" + event.SetID(id) + client.SendEventToAddressable(id+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent( + cetest.HasId(id), + cetest.HasType(testlib.DefaultEventType), + cetest.HasSource(baseSource), + cetest.HasSpecVersion("1.0"), + ) + eventTracker.AssertExact(1, originalEventMatcher) + }) + + t.Run("Events are filtered", func(t *testing.T) { + event := baseEvent + source := "filtered-event" + event.SetSource(source) + secondEvent := baseEvent + + client.SendEventToAddressable("first-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + client.SendEventToAddressable("second-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, secondEvent) + filteredEventMatcher := recordevents.MatchEvent( + cetest.HasSource(source), + ) + nonEventMatcher := recordevents.MatchEvent( + cetest.HasSource(baseSource), + ) + secondTracker.AssertAtLeast(1, filteredEventMatcher) + secondTracker.AssertNot(nonEventMatcher) + }) + + t.Run("Events are delivered to multiple subscribers", func(t *testing.T) { + event := baseEvent + source := "filtered-event" + event.SetSource(source) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + filteredEventMatcher := recordevents.MatchEvent( + cetest.HasSource(source), + ) + eventTracker.AssertAtLeast(1, filteredEventMatcher) + secondTracker.AssertAtLeast(1, filteredEventMatcher) + }) + + t.Run("Deliveries succeed at least once", func(t *testing.T) { + event := baseEvent + source := "delivery-check" + event.SetSource(source) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent( + cetest.HasSource(source), + ) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Replies are accepted and delivered", func(t *testing.T) { + event := baseEvent + source := "origin-for-reply" + event.SetSource(source) + msg := []byte(`{"msg":"Transformed!"}`) + transformPod := resources.EventTransformationPod( + "tranformer-pod", + "reply-check-type", + "reply-check-source", + msg, + ) + client.CreatePodOrFail(transformPod, testlib.WithService("transformer-pod")) + transformTrigger := client.CreateTriggerOrFailV1Beta1( + "transform-trigger", + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(source, baseEvent.Type(), nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1("transformer-pod"), + ) + client.WaitForResourceReadyOrFail(transformTrigger.Name, testlib.TriggerTypeMeta) + transformEventTracker, _ := recordevents.StartEventRecordOrFail(client, "transform-events-logger") + defer transformEventTracker.Cleanup() + + replyTrigger := client.CreateTriggerOrFailV1Beta1( + "reply-trigger", + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1("reply-check-source", "reply-check-type", nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + client.WaitForResourceReadyOrFail(replyTrigger.Name, testlib.TriggerTypeMeta) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + transformedEventMatcher := recordevents.MatchEvent( + cetest.HasSource("reply-check-source"), + cetest.HasType("reply-check-type"), + cetest.HasData(msg), + ) + eventTracker.AssertAtLeast(2, transformedEventMatcher) + }) +} diff --git a/test/conformance/main_test.go b/test/conformance/main_test.go index ee6f100f53f..53d79b1a3e7 100644 --- a/test/conformance/main_test.go +++ b/test/conformance/main_test.go @@ -29,6 +29,8 @@ import ( var channelTestRunner testlib.ComponentsTestRunner var sourcesTestRunner testlib.ComponentsTestRunner var brokerClass string +var brokerName string +var brokerNamespace string func TestMain(m *testing.M) { os.Exit(func() int { @@ -41,6 +43,8 @@ func TestMain(m *testing.M) { ComponentsToTest: test.EventingFlags.Sources, } brokerClass = test.EventingFlags.BrokerClass + brokerName = test.EventingFlags.BrokerName + brokerNamespace = test.EventingFlags.BrokerNamespace // Any tests may SetupZipkinTracing, it will only actually be done once. This should be the ONLY // place that cleans it up. If an individual test calls this instead, then it will break other diff --git a/test/e2e_flags.go b/test/e2e_flags.go index 37b43efd973..0698deb2df3 100644 --- a/test/e2e_flags.go +++ b/test/e2e_flags.go @@ -38,6 +38,9 @@ const ( SourceUsage = "The names of the source type metas, separated by comma. " + "Example: \"sources.knative.dev/v1alpha1:ApiServerSource," + "sources.knative.dev/v1alpha1:PingSource\"." + BrokerNameUsage = "When testing a pre-existing broker, specify the Broker name so the conformance tests " + + "won't create their own." + BrokerNamespaceUsage = "When testing a pre-existing broker, this variable specifies the namespace the broker can be found in." ) // EventingFlags holds the command line flags specific to knative/eventing. @@ -52,6 +55,8 @@ func InitializeEventingFlags() { flag.Var(&EventingFlags.Sources, "sources", SourceUsage) flag.StringVar(&EventingFlags.PipeFile, "pipefile", "/tmp/prober-signal", "Temporary file to write the prober signal into.") flag.StringVar(&EventingFlags.ReadyFile, "readyfile", "/tmp/prober-ready", "Temporary file to get the prober result.") + flag.StringVar(&EventingFlags.BrokerName, "brokername", "", BrokerNameUsage) + flag.StringVar(&EventingFlags.BrokerNamespace, "brokernamespace", "", BrokerNamespaceUsage) flag.Parse() // If no channel is passed through the flag, initialize it as the DefaultChannel. diff --git a/test/flags/eventing_environment.go b/test/flags/eventing_environment.go index e76ce4d7f28..1435108cfc9 100644 --- a/test/flags/eventing_environment.go +++ b/test/flags/eventing_environment.go @@ -21,6 +21,8 @@ type EventingEnvironmentFlags struct { BrokerClass string Channels Sources - PipeFile string - ReadyFile string + PipeFile string + ReadyFile string + BrokerName string + BrokerNamespace string }