From f17285eeaf892d61b118d591927f2ddb0e5c3897 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 7 Jul 2020 14:58:48 -0400 Subject: [PATCH 1/5] Add Broker Data Plane Conformance tests for the Ingress and Consmer --- test/conformance/broker_data_plane_test.go | 33 ++ .../helpers/broker_data_plane_test_helper.go | 363 ++++++++++++++++++ 2 files changed, 396 insertions(+) create mode 100644 test/conformance/broker_data_plane_test.go create mode 100644 test/conformance/helpers/broker_data_plane_test_helper.go diff --git a/test/conformance/broker_data_plane_test.go b/test/conformance/broker_data_plane_test.go new file mode 100644 index 00000000000..5cbf282617e --- /dev/null +++ b/test/conformance/broker_data_plane_test.go @@ -0,0 +1,33 @@ +// +build e2e + +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package conformance + +import ( + "testing" + + "knative.dev/eventing/test/conformance/helpers" + "knative.dev/eventing/test/lib" +) + +func TestBrokerV1Beta1DataPlaneIngress(t *testing.T) { + helpers.BrokerV1Beta1IngressDataPlaneTestHelperWithChannelTestRunner(t, brokerClass, channelTestRunner, lib.SetupClientOptionNoop) +} +func TestBrokerV1Beta1DataPlaneConsumer(t *testing.T) { + helpers.BrokerV1Beta1ConsumerDataPlaneTestHelperWithChannelTestRunner(t, brokerClass, channelTestRunner, lib.SetupClientOptionNoop) +} diff --git a/test/conformance/helpers/broker_data_plane_test_helper.go b/test/conformance/helpers/broker_data_plane_test_helper.go new file mode 100644 index 00000000000..0635ed153d5 --- /dev/null +++ b/test/conformance/helpers/broker_data_plane_test_helper.go @@ -0,0 +1,363 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helpers + +import ( + "fmt" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + "knative.dev/eventing/test/lib" + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" + "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/lib/sender" + + ce "github.com/cloudevents/sdk-go/v2" + cetest "github.com/cloudevents/sdk-go/v2/test" +) + +//At ingress +//Supports CE 0.3 or CE 1.0 via HTTP +//Supports structured or Binary mode +//Respond with 2xx on good CE +//Respond with 400 on bad CE +//Reject non-POST requests to publish URI +func BrokerV1Beta1IngressDataPlaneTestHelperWithChannelTestRunner( + t *testing.T, + brokerClass string, + channelTestRunner testlib.ComponentsTestRunner, + setupClient ...testlib.SetupClientOption, +) { + channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { + client := testlib.Setup(t, true, setupClient...) + defer testlib.TearDown(client) + brokerName := "br" + triggerName := "trigger" + loggerName := "logger-pod" + eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) + defer eventTracker.Cleanup() + client.WaitForAllTestResourcesReadyOrFail() + + config := client.CreateBrokerConfigMapOrFail(brokerName, &channel) + + broker := client.CreateBrokerV1Beta1OrFail( + brokerName, + resources.WithBrokerClassForBrokerV1Beta1(brokerClass), + resources.WithConfigForBrokerV1Beta1(config), + ) + client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta) + + trigger := client.CreateTriggerOrFailV1Beta1( + triggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + + client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) + + t.Run("Ingress Supports CE0.3", func(t *testing.T) { + eventID := "CE0.3" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CE0.3") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + event.Context.AsV03() + event.SetSpecVersion("0.3") + client.SendEventToAddressable("v03-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + cetest.HasSpecVersion("0.3"), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Ingress Supports CE1.0", func(t *testing.T) { + eventID := "CE1.0" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CE1.0") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("v10-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + + }) + t.Run("Ingress Supports Structured Mode", func(t *testing.T) { + eventID := "Structured-Mode" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CEStructured Mode") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("structured-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Ingress Supports Binary Mode", func(t *testing.T) { + eventID := "Binary-Mode" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CEBindary Mode") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.EncodingBinary.String(), []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("binary-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Respond with 2XX on good CE", func(t *testing.T) { + eventID := "2hundred-on-good-ce" + body := fmt.Sprintf(`{"msg":%q}`, eventID) + responseSink := "http://" + client.GetServiceHost(loggerName) + client.SendRequestToAddressable("twohundred-test-sender", broker.Name, testlib.BrokerTypeMeta, + map[string]string{ + "ce-specversion": "1.0", + "ce-type": testlib.DefaultEventType, + "ce-source": "2XX on good CE", + "ce-id": eventID, + "content-type": ce.ApplicationJSON, + }, + body, + sender.WithResponseSink(responseSink), + ) + + eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(202))) // should probably be a range + + }) + //Respond with 400 on bad CE + t.Run("Repsond with 400 on bad CE", func(t *testing.T) { + eventID := "four-hundred-on-bad-ce" + body := ";la}{kjsdf;oai2095{}{}8234092349807asdfashdf" + responseSink := "http://" + client.GetServiceHost(loggerName) + client.SendRequestToAddressable("fourhundres-test-sender", broker.Name, testlib.BrokerTypeMeta, + map[string]string{ + "ce-specversion": "9000.1", //its over 9,000! + "ce-type": testlib.DefaultEventType, + "ce-source": "400 on bad CE", + "ce-id": eventID, + "content-type": ce.ApplicationJSON, + }, + body, + sender.WithResponseSink(responseSink)) + eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(400))) + }) + + }) +} + +//At consumer +//No upgrade of version +//Attributes received should be the same as produced (attributes may be added) +//Events are filtered +//Events are delivered to multiple subscribers +//Deliveries succeed at least once +//Replies are accepted and delivered +//Replies that are unsuccessfully forwarded cause initial message to be redelivered (Very difficult to test, can be ignored) +func BrokerV1Beta1ConsumerDataPlaneTestHelperWithChannelTestRunner( + t *testing.T, + brokerClass string, + channelTestRunner testlib.ComponentsTestRunner, + setupClient ...testlib.SetupClientOption, +) { + channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { + client := lib.Setup(t, true, setupClient...) + defer lib.TearDown(client) + + brokerName := "br" + triggerName := "trigger" + secondTriggerName := "second-trigger" + loggerName := "logger-pod" + secondLoggerName := "second-logger-pod" + eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) + defer eventTracker.Cleanup() + secondTracker, _ := recordevents.StartEventRecordOrFail(client, secondLoggerName) + defer secondTracker.Cleanup() + client.WaitForAllTestResourcesReadyOrFail() + + config := client.CreateBrokerConfigMapOrFail(brokerName, &channel) + + broker := client.CreateBrokerV1Beta1OrFail( + brokerName, + resources.WithBrokerClassForBrokerV1Beta1(brokerClass), + resources.WithConfigForBrokerV1Beta1(config), + ) + client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta) + + trigger := client.CreateTriggerOrFailV1Beta1( + triggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + + client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) + secondTrigger := client.CreateTriggerOrFailV1Beta1( + secondTriggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1("filtered-event", eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(secondLoggerName), + ) + client.WaitForResourceReadyOrFail(secondTrigger.Name, testlib.TriggerTypeMeta) + eventID := "consumer-broker-tests" + baseSource := "consumer-test-sender" + baseEvent := ce.NewEvent() + baseEvent.SetID(eventID) + baseEvent.SetType(testlib.DefaultEventType) + baseEvent.SetSource(baseSource) + baseEvent.SetSpecVersion("1.0") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := baseEvent.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the baseEvent: %s", err.Error()) + } + + t.Run("No upgrade of version", func(t *testing.T) { + event := baseEvent + source := "no-upgrade" + event.SetID(source) + event.Context = event.Context.AsV03() + + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSpecVersion("0.3"), + cetest.HasId("no-upgrade"), + )) + eventTracker.AssertExact(1, originalEventMatcher) + + }) + + t.Run("Attributes received should be the same as produced (attributes may be added)", func(t *testing.T) { + event := baseEvent + id := "identical-attibutes" + event.SetID(id) + client.SendEventToAddressable(id+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasId(id), + cetest.HasType(testlib.DefaultEventType), + cetest.HasSource(baseSource), + cetest.HasSpecVersion("1.0"), + )) + eventTracker.AssertExact(1, originalEventMatcher) + }) + + t.Run("Events are filtered", func(t *testing.T) { + event := baseEvent + source := "filtered-event" + event.SetSource(source) + secondEvent := baseEvent + + client.SendEventToAddressable("first-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + client.SendEventToAddressable("second-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, secondEvent) + filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(source), + )) + nonEventMatcher := recordevents.MatchEvent( + cetest.HasSource(baseSource), + ) + secondTracker.AssertAtLeast(1, filteredEventMatcher) + secondTracker.AssertNot(nonEventMatcher) + }) + + t.Run("Events are delivered to multiple subscribers", func(t *testing.T) { + event := baseEvent + source := "filtered-event" + event.SetSource(source) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(source), + )) + eventTracker.AssertAtLeast(1, filteredEventMatcher) + secondTracker.AssertAtLeast(1, filteredEventMatcher) + }) + + t.Run("Deliveries succeed at least once", func(t *testing.T) { + event := baseEvent + source := "delivery-check" + event.SetSource(source) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent( + cetest.HasSource(source), + ) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Replies are accepted and delivered", func(t *testing.T) { + event := baseEvent + source := "origin-for-reply" + event.SetSource(source) + msg := []byte(`{"msg":"Transformed!"}`) + transformPod := resources.EventTransformationPod( + "tranformer-pod", + "reply-check-type", + "reply-check-source", + msg, + ) + client.CreatePodOrFail(transformPod, testlib.WithService("transformer-pod")) + transformTrigger := client.CreateTriggerOrFailV1Beta1( + "transform-trigger", + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(source, baseEvent.Type(), nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1("transformer-pod"), + ) + client.WaitForResourceReadyOrFail(transformTrigger.Name, testlib.TriggerTypeMeta) + transformEventTracker, _ := recordevents.StartEventRecordOrFail(client, "transform-events-logger") + defer transformEventTracker.Cleanup() + + replyTrigger := client.CreateTriggerOrFailV1Beta1( + "reply-trigger", + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1("reply-check-source", "reply-check-type", nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + client.WaitForResourceReadyOrFail(replyTrigger.Name, testlib.TriggerTypeMeta) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + transformedEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource("reply-check-source"), + cetest.HasType("reply-check-type"), + cetest.HasData(msg), + )) + eventTracker.AssertAtLeast(2, transformedEventMatcher) + }) + }) +} From f2e6b8a21d9b52a7241de420bffe8ed191073002 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Mon, 20 Jul 2020 13:06:46 -0400 Subject: [PATCH 2/5] Move broker creation to its own function, allow different broker --- test/conformance/broker_data_plane_test.go | 14 +- .../helpers/broker_data_plane_test_helper.go | 581 +++++++++--------- test/conformance/main_test.go | 4 + test/e2e_flags.go | 6 + test/flags/eventing_environment.go | 3 + 5 files changed, 309 insertions(+), 299 deletions(-) diff --git a/test/conformance/broker_data_plane_test.go b/test/conformance/broker_data_plane_test.go index 5cbf282617e..83d15c1a2b3 100644 --- a/test/conformance/broker_data_plane_test.go +++ b/test/conformance/broker_data_plane_test.go @@ -22,12 +22,20 @@ import ( "testing" "knative.dev/eventing/test/conformance/helpers" - "knative.dev/eventing/test/lib" + testlib "knative.dev/eventing/test/lib" ) func TestBrokerV1Beta1DataPlaneIngress(t *testing.T) { - helpers.BrokerV1Beta1IngressDataPlaneTestHelperWithChannelTestRunner(t, brokerClass, channelTestRunner, lib.SetupClientOptionNoop) + client := testlib.Setup(t, true, testlib.SetupClientOptionNoop) + defer testlib.TearDown(client) + + broker := helpers.BrokerDataPlaneSetupHelper(client, brokerName, brokerNamespace, brokerClass) + helpers.BrokerV1Beta1IngressDataPlaneTestHelper(t, client, broker) } func TestBrokerV1Beta1DataPlaneConsumer(t *testing.T) { - helpers.BrokerV1Beta1ConsumerDataPlaneTestHelperWithChannelTestRunner(t, brokerClass, channelTestRunner, lib.SetupClientOptionNoop) + client := testlib.Setup(t, true, testlib.SetupClientOptionNoop) + defer testlib.TearDown(client) + + broker := helpers.BrokerDataPlaneSetupHelper(client, brokerName, brokerNamespace, brokerClass) + helpers.BrokerV1Beta1ConsumerDataPlaneTestHelper(t, client, broker) } diff --git a/test/conformance/helpers/broker_data_plane_test_helper.go b/test/conformance/helpers/broker_data_plane_test_helper.go index 0635ed153d5..8d14d275fac 100644 --- a/test/conformance/helpers/broker_data_plane_test_helper.go +++ b/test/conformance/helpers/broker_data_plane_test_helper.go @@ -22,7 +22,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" - "knative.dev/eventing/test/lib" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" @@ -32,159 +31,165 @@ import ( cetest "github.com/cloudevents/sdk-go/v2/test" ) +func BrokerDataPlaneSetupHelper(client *testlib.Client, brokerName, brokerNamespace, brokerClass string) *eventingv1beta1.Broker { + var broker *eventingv1beta1.Broker + var err error + if brokerName == "" || brokerNamespace == "" { + brokerName = "br" + config := client.CreateBrokerConfigMapOrFail(brokerName, &testlib.DefaultChannel) + + broker = client.CreateBrokerV1Beta1OrFail( + brokerName, + resources.WithBrokerClassForBrokerV1Beta1(brokerClass), + resources.WithConfigForBrokerV1Beta1(config), + ) + client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta) + } else { + if broker, err = client.Eventing.EventingV1beta1().Brokers(brokerNamespace).Get(brokerName, metav1.GetOptions{}); err != nil { + client.T.Fatalf("Could not Get broker %s/%s: %v", brokerNamespace, brokerName, err) + } + } + return broker +} + //At ingress //Supports CE 0.3 or CE 1.0 via HTTP //Supports structured or Binary mode //Respond with 2xx on good CE //Respond with 400 on bad CE //Reject non-POST requests to publish URI -func BrokerV1Beta1IngressDataPlaneTestHelperWithChannelTestRunner( +func BrokerV1Beta1IngressDataPlaneTestHelper( t *testing.T, - brokerClass string, - channelTestRunner testlib.ComponentsTestRunner, - setupClient ...testlib.SetupClientOption, + client *testlib.Client, + broker *eventingv1beta1.Broker, ) { - channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { - client := testlib.Setup(t, true, setupClient...) - defer testlib.TearDown(client) - brokerName := "br" - triggerName := "trigger" - loggerName := "logger-pod" - eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) - defer eventTracker.Cleanup() - client.WaitForAllTestResourcesReadyOrFail() - - config := client.CreateBrokerConfigMapOrFail(brokerName, &channel) - - broker := client.CreateBrokerV1Beta1OrFail( - brokerName, - resources.WithBrokerClassForBrokerV1Beta1(brokerClass), - resources.WithConfigForBrokerV1Beta1(config), - ) - client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta) + triggerName := "trigger" + loggerName := "logger-pod" + eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) + defer eventTracker.Cleanup() + client.WaitForAllTestResourcesReadyOrFail() + + trigger := client.CreateTriggerOrFailV1Beta1( + triggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + + client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) + + t.Run("Ingress Supports CE0.3", func(t *testing.T) { + eventID := "CE0.3" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CE0.3") + body := fmt.Sprintf(`{"msg":%q}`, eventID) - trigger := client.CreateTriggerOrFailV1Beta1( - triggerName, - resources.WithBrokerV1Beta1(broker.Name), - resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), - resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + event.Context.AsV03() + event.SetSpecVersion("0.3") + client.SendEventToAddressable("v03-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + cetest.HasSpecVersion("0.3"), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Ingress Supports CE1.0", func(t *testing.T) { + eventID := "CE1.0" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CE1.0") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("v10-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + + }) + t.Run("Ingress Supports Structured Mode", func(t *testing.T) { + eventID := "Structured-Mode" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CEStructured Mode") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("structured-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Ingress Supports Binary Mode", func(t *testing.T) { + eventID := "Binary-Mode" + event := ce.NewEvent() + event.SetID(eventID) + event.SetType(testlib.DefaultEventType) + event.SetSource("CEBindary Mode") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := event.SetData(ce.EncodingBinary.String(), []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the event: %s", err.Error()) + } + + client.SendEventToAddressable("binary-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(event.Source()), + )) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) + + t.Run("Respond with 2XX on good CE", func(t *testing.T) { + eventID := "2hundred-on-good-ce" + body := fmt.Sprintf(`{"msg":%q}`, eventID) + responseSink := "http://" + client.GetServiceHost(loggerName) + client.SendRequestToAddressable("twohundred-test-sender", broker.Name, testlib.BrokerTypeMeta, + map[string]string{ + "ce-specversion": "1.0", + "ce-type": testlib.DefaultEventType, + "ce-source": "2XX on good CE", + "ce-id": eventID, + "content-type": ce.ApplicationJSON, + }, + body, + sender.WithResponseSink(responseSink), ) - client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) - - t.Run("Ingress Supports CE0.3", func(t *testing.T) { - eventID := "CE0.3" - event := ce.NewEvent() - event.SetID(eventID) - event.SetType(testlib.DefaultEventType) - event.SetSource("CE0.3") - body := fmt.Sprintf(`{"msg":%q}`, eventID) - - if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { - t.Fatalf("Cannot set the payload of the event: %s", err.Error()) - } - event.Context.AsV03() - event.SetSpecVersion("0.3") - client.SendEventToAddressable("v03-test-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), - cetest.HasSpecVersion("0.3"), - )) - eventTracker.AssertAtLeast(1, originalEventMatcher) - }) - - t.Run("Ingress Supports CE1.0", func(t *testing.T) { - eventID := "CE1.0" - event := ce.NewEvent() - event.SetID(eventID) - event.SetType(testlib.DefaultEventType) - event.SetSource("CE1.0") - body := fmt.Sprintf(`{"msg":%q}`, eventID) - if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { - t.Fatalf("Cannot set the payload of the event: %s", err.Error()) - } - - client.SendEventToAddressable("v10-test-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), - )) - eventTracker.AssertAtLeast(1, originalEventMatcher) - - }) - t.Run("Ingress Supports Structured Mode", func(t *testing.T) { - eventID := "Structured-Mode" - event := ce.NewEvent() - event.SetID(eventID) - event.SetType(testlib.DefaultEventType) - event.SetSource("CEStructured Mode") - body := fmt.Sprintf(`{"msg":%q}`, eventID) - if err := event.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { - t.Fatalf("Cannot set the payload of the event: %s", err.Error()) - } - - client.SendEventToAddressable("structured-test-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), - )) - eventTracker.AssertAtLeast(1, originalEventMatcher) - }) - - t.Run("Ingress Supports Binary Mode", func(t *testing.T) { - eventID := "Binary-Mode" - event := ce.NewEvent() - event.SetID(eventID) - event.SetType(testlib.DefaultEventType) - event.SetSource("CEBindary Mode") - body := fmt.Sprintf(`{"msg":%q}`, eventID) - if err := event.SetData(ce.EncodingBinary.String(), []byte(body)); err != nil { - t.Fatalf("Cannot set the payload of the event: %s", err.Error()) - } - - client.SendEventToAddressable("binary-test-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), - )) - eventTracker.AssertAtLeast(1, originalEventMatcher) - }) - - t.Run("Respond with 2XX on good CE", func(t *testing.T) { - eventID := "2hundred-on-good-ce" - body := fmt.Sprintf(`{"msg":%q}`, eventID) - responseSink := "http://" + client.GetServiceHost(loggerName) - client.SendRequestToAddressable("twohundred-test-sender", broker.Name, testlib.BrokerTypeMeta, - map[string]string{ - "ce-specversion": "1.0", - "ce-type": testlib.DefaultEventType, - "ce-source": "2XX on good CE", - "ce-id": eventID, - "content-type": ce.ApplicationJSON, - }, - body, - sender.WithResponseSink(responseSink), - ) - - eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(202))) // should probably be a range - - }) - //Respond with 400 on bad CE - t.Run("Repsond with 400 on bad CE", func(t *testing.T) { - eventID := "four-hundred-on-bad-ce" - body := ";la}{kjsdf;oai2095{}{}8234092349807asdfashdf" - responseSink := "http://" + client.GetServiceHost(loggerName) - client.SendRequestToAddressable("fourhundres-test-sender", broker.Name, testlib.BrokerTypeMeta, - map[string]string{ - "ce-specversion": "9000.1", //its over 9,000! - "ce-type": testlib.DefaultEventType, - "ce-source": "400 on bad CE", - "ce-id": eventID, - "content-type": ce.ApplicationJSON, - }, - body, - sender.WithResponseSink(responseSink)) - eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(400))) - }) + eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(202))) // should probably be a range }) + //Respond with 400 on bad CE + t.Run("Repsond with 400 on bad CE", func(t *testing.T) { + eventID := "four-hundred-on-bad-ce" + body := ";la}{kjsdf;oai2095{}{}8234092349807asdfashdf" + responseSink := "http://" + client.GetServiceHost(loggerName) + client.SendRequestToAddressable("fourhundres-test-sender", broker.Name, testlib.BrokerTypeMeta, + map[string]string{ + "ce-specversion": "9000.1", //its over 9,000! + "ce-type": testlib.DefaultEventType, + "ce-source": "400 on bad CE", + "ce-id": eventID, + "content-type": ce.ApplicationJSON, + }, + body, + sender.WithResponseSink(responseSink)) + eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(400))) + }) + } //At consumer @@ -195,169 +200,153 @@ func BrokerV1Beta1IngressDataPlaneTestHelperWithChannelTestRunner( //Deliveries succeed at least once //Replies are accepted and delivered //Replies that are unsuccessfully forwarded cause initial message to be redelivered (Very difficult to test, can be ignored) -func BrokerV1Beta1ConsumerDataPlaneTestHelperWithChannelTestRunner( +func BrokerV1Beta1ConsumerDataPlaneTestHelper( t *testing.T, - brokerClass string, - channelTestRunner testlib.ComponentsTestRunner, - setupClient ...testlib.SetupClientOption, + client *testlib.Client, + broker *eventingv1beta1.Broker, ) { - channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { - client := lib.Setup(t, true, setupClient...) - defer lib.TearDown(client) - - brokerName := "br" - triggerName := "trigger" - secondTriggerName := "second-trigger" - loggerName := "logger-pod" - secondLoggerName := "second-logger-pod" - eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) - defer eventTracker.Cleanup() - secondTracker, _ := recordevents.StartEventRecordOrFail(client, secondLoggerName) - defer secondTracker.Cleanup() - client.WaitForAllTestResourcesReadyOrFail() - - config := client.CreateBrokerConfigMapOrFail(brokerName, &channel) - - broker := client.CreateBrokerV1Beta1OrFail( - brokerName, - resources.WithBrokerClassForBrokerV1Beta1(brokerClass), - resources.WithConfigForBrokerV1Beta1(config), + triggerName := "trigger" + secondTriggerName := "second-trigger" + loggerName := "logger-pod" + secondLoggerName := "second-logger-pod" + eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName) + defer eventTracker.Cleanup() + secondTracker, _ := recordevents.StartEventRecordOrFail(client, secondLoggerName) + defer secondTracker.Cleanup() + client.WaitForAllTestResourcesReadyOrFail() + + trigger := client.CreateTriggerOrFailV1Beta1( + triggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + + client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) + secondTrigger := client.CreateTriggerOrFailV1Beta1( + secondTriggerName, + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1("filtered-event", eventingv1beta1.TriggerAnyFilter, nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(secondLoggerName), + ) + client.WaitForResourceReadyOrFail(secondTrigger.Name, testlib.TriggerTypeMeta) + eventID := "consumer-broker-tests" + baseSource := "consumer-test-sender" + baseEvent := ce.NewEvent() + baseEvent.SetID(eventID) + baseEvent.SetType(testlib.DefaultEventType) + baseEvent.SetSource(baseSource) + baseEvent.SetSpecVersion("1.0") + body := fmt.Sprintf(`{"msg":%q}`, eventID) + if err := baseEvent.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { + t.Fatalf("Cannot set the payload of the baseEvent: %s", err.Error()) + } + + t.Run("No upgrade of version", func(t *testing.T) { + event := baseEvent + source := "no-upgrade" + event.SetID(source) + event.Context = event.Context.AsV03() + + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSpecVersion("0.3"), + cetest.HasId("no-upgrade"), + )) + eventTracker.AssertExact(1, originalEventMatcher) + + }) + + t.Run("Attributes received should be the same as produced (attributes may be added)", func(t *testing.T) { + event := baseEvent + id := "identical-attibutes" + event.SetID(id) + client.SendEventToAddressable(id+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasId(id), + cetest.HasType(testlib.DefaultEventType), + cetest.HasSource(baseSource), + cetest.HasSpecVersion("1.0"), + )) + eventTracker.AssertExact(1, originalEventMatcher) + }) + + t.Run("Events are filtered", func(t *testing.T) { + event := baseEvent + source := "filtered-event" + event.SetSource(source) + secondEvent := baseEvent + + client.SendEventToAddressable("first-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + client.SendEventToAddressable("second-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, secondEvent) + filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(source), + )) + nonEventMatcher := recordevents.MatchEvent( + cetest.HasSource(baseSource), ) - client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta) + secondTracker.AssertAtLeast(1, filteredEventMatcher) + secondTracker.AssertNot(nonEventMatcher) + }) - trigger := client.CreateTriggerOrFailV1Beta1( - triggerName, - resources.WithBrokerV1Beta1(broker.Name), - resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil), - resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + t.Run("Events are delivered to multiple subscribers", func(t *testing.T) { + event := baseEvent + source := "filtered-event" + event.SetSource(source) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource(source), + )) + eventTracker.AssertAtLeast(1, filteredEventMatcher) + secondTracker.AssertAtLeast(1, filteredEventMatcher) + }) + + t.Run("Deliveries succeed at least once", func(t *testing.T) { + event := baseEvent + source := "delivery-check" + event.SetSource(source) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + originalEventMatcher := recordevents.MatchEvent( + cetest.HasSource(source), ) + eventTracker.AssertAtLeast(1, originalEventMatcher) + }) - client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta) - secondTrigger := client.CreateTriggerOrFailV1Beta1( - secondTriggerName, + t.Run("Replies are accepted and delivered", func(t *testing.T) { + event := baseEvent + source := "origin-for-reply" + event.SetSource(source) + msg := []byte(`{"msg":"Transformed!"}`) + transformPod := resources.EventTransformationPod( + "tranformer-pod", + "reply-check-type", + "reply-check-source", + msg, + ) + client.CreatePodOrFail(transformPod, testlib.WithService("transformer-pod")) + transformTrigger := client.CreateTriggerOrFailV1Beta1( + "transform-trigger", resources.WithBrokerV1Beta1(broker.Name), - resources.WithAttributesTriggerFilterV1Beta1("filtered-event", eventingv1beta1.TriggerAnyFilter, nil), - resources.WithSubscriberServiceRefForTriggerV1Beta1(secondLoggerName), + resources.WithAttributesTriggerFilterV1Beta1(source, baseEvent.Type(), nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1("transformer-pod"), ) - client.WaitForResourceReadyOrFail(secondTrigger.Name, testlib.TriggerTypeMeta) - eventID := "consumer-broker-tests" - baseSource := "consumer-test-sender" - baseEvent := ce.NewEvent() - baseEvent.SetID(eventID) - baseEvent.SetType(testlib.DefaultEventType) - baseEvent.SetSource(baseSource) - baseEvent.SetSpecVersion("1.0") - body := fmt.Sprintf(`{"msg":%q}`, eventID) - if err := baseEvent.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { - t.Fatalf("Cannot set the payload of the baseEvent: %s", err.Error()) - } + client.WaitForResourceReadyOrFail(transformTrigger.Name, testlib.TriggerTypeMeta) + transformEventTracker, _ := recordevents.StartEventRecordOrFail(client, "transform-events-logger") + defer transformEventTracker.Cleanup() - t.Run("No upgrade of version", func(t *testing.T) { - event := baseEvent - source := "no-upgrade" - event.SetID(source) - event.Context = event.Context.AsV03() - - client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSpecVersion("0.3"), - cetest.HasId("no-upgrade"), - )) - eventTracker.AssertExact(1, originalEventMatcher) - - }) - - t.Run("Attributes received should be the same as produced (attributes may be added)", func(t *testing.T) { - event := baseEvent - id := "identical-attibutes" - event.SetID(id) - client.SendEventToAddressable(id+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasId(id), - cetest.HasType(testlib.DefaultEventType), - cetest.HasSource(baseSource), - cetest.HasSpecVersion("1.0"), - )) - eventTracker.AssertExact(1, originalEventMatcher) - }) - - t.Run("Events are filtered", func(t *testing.T) { - event := baseEvent - source := "filtered-event" - event.SetSource(source) - secondEvent := baseEvent - - client.SendEventToAddressable("first-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - client.SendEventToAddressable("second-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, secondEvent) - filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(source), - )) - nonEventMatcher := recordevents.MatchEvent( - cetest.HasSource(baseSource), - ) - secondTracker.AssertAtLeast(1, filteredEventMatcher) - secondTracker.AssertNot(nonEventMatcher) - }) - - t.Run("Events are delivered to multiple subscribers", func(t *testing.T) { - event := baseEvent - source := "filtered-event" - event.SetSource(source) - client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(source), - )) - eventTracker.AssertAtLeast(1, filteredEventMatcher) - secondTracker.AssertAtLeast(1, filteredEventMatcher) - }) - - t.Run("Deliveries succeed at least once", func(t *testing.T) { - event := baseEvent - source := "delivery-check" - event.SetSource(source) - client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent( - cetest.HasSource(source), - ) - eventTracker.AssertAtLeast(1, originalEventMatcher) - }) - - t.Run("Replies are accepted and delivered", func(t *testing.T) { - event := baseEvent - source := "origin-for-reply" - event.SetSource(source) - msg := []byte(`{"msg":"Transformed!"}`) - transformPod := resources.EventTransformationPod( - "tranformer-pod", - "reply-check-type", - "reply-check-source", - msg, - ) - client.CreatePodOrFail(transformPod, testlib.WithService("transformer-pod")) - transformTrigger := client.CreateTriggerOrFailV1Beta1( - "transform-trigger", - resources.WithBrokerV1Beta1(broker.Name), - resources.WithAttributesTriggerFilterV1Beta1(source, baseEvent.Type(), nil), - resources.WithSubscriberServiceRefForTriggerV1Beta1("transformer-pod"), - ) - client.WaitForResourceReadyOrFail(transformTrigger.Name, testlib.TriggerTypeMeta) - transformEventTracker, _ := recordevents.StartEventRecordOrFail(client, "transform-events-logger") - defer transformEventTracker.Cleanup() - - replyTrigger := client.CreateTriggerOrFailV1Beta1( - "reply-trigger", - resources.WithBrokerV1Beta1(broker.Name), - resources.WithAttributesTriggerFilterV1Beta1("reply-check-source", "reply-check-type", nil), - resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), - ) - client.WaitForResourceReadyOrFail(replyTrigger.Name, testlib.TriggerTypeMeta) - client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - transformedEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource("reply-check-source"), - cetest.HasType("reply-check-type"), - cetest.HasData(msg), - )) - eventTracker.AssertAtLeast(2, transformedEventMatcher) - }) + replyTrigger := client.CreateTriggerOrFailV1Beta1( + "reply-trigger", + resources.WithBrokerV1Beta1(broker.Name), + resources.WithAttributesTriggerFilterV1Beta1("reply-check-source", "reply-check-type", nil), + resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName), + ) + client.WaitForResourceReadyOrFail(replyTrigger.Name, testlib.TriggerTypeMeta) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + transformedEventMatcher := recordevents.MatchEvent(cetest.AllOf( + cetest.HasSource("reply-check-source"), + cetest.HasType("reply-check-type"), + cetest.HasData(msg), + )) + eventTracker.AssertAtLeast(2, transformedEventMatcher) }) } diff --git a/test/conformance/main_test.go b/test/conformance/main_test.go index ee6f100f53f..ae9ab0a71f8 100644 --- a/test/conformance/main_test.go +++ b/test/conformance/main_test.go @@ -29,6 +29,8 @@ import ( var channelTestRunner testlib.ComponentsTestRunner var sourcesTestRunner testlib.ComponentsTestRunner var brokerClass string +var brokerName string +var brokerNamespace string func TestMain(m *testing.M) { os.Exit(func() int { @@ -41,6 +43,8 @@ func TestMain(m *testing.M) { ComponentsToTest: test.EventingFlags.Sources, } brokerClass = test.EventingFlags.BrokerClass + brokerName = test.EventingFlags.BrokerName + brokerNamespace = test.EventingFlags.BrokerNamespace // Any tests may SetupZipkinTracing, it will only actually be done once. This should be the ONLY // place that cleans it up. If an individual test calls this instead, then it will break other diff --git a/test/e2e_flags.go b/test/e2e_flags.go index 37b43efd973..f9e7e5d7b3f 100644 --- a/test/e2e_flags.go +++ b/test/e2e_flags.go @@ -38,6 +38,10 @@ const ( SourceUsage = "The names of the source type metas, separated by comma. " + "Example: \"sources.knative.dev/v1alpha1:ApiServerSource," + "sources.knative.dev/v1alpha1:PingSource\"." + BrokerNameUsage = "When testing a pre-existing broker, specify the Broker name so the conformance tests " + + "won't create their own." + BrokerNamespaceUsage = "When testing a pre-existing broker, this variable specifies the namespace the broker can be found in." + ) // EventingFlags holds the command line flags specific to knative/eventing. @@ -52,6 +56,8 @@ func InitializeEventingFlags() { flag.Var(&EventingFlags.Sources, "sources", SourceUsage) flag.StringVar(&EventingFlags.PipeFile, "pipefile", "/tmp/prober-signal", "Temporary file to write the prober signal into.") flag.StringVar(&EventingFlags.ReadyFile, "readyfile", "/tmp/prober-ready", "Temporary file to get the prober result.") + flag.StringVar(&EventingFlags.BrokerName, "brokername", "", BrokerNameUsage) + flag.StringVar(&EventingFlags.BrokerNamespace, "brokernamespace", "", BrokerNamespaceUsage) flag.Parse() // If no channel is passed through the flag, initialize it as the DefaultChannel. diff --git a/test/flags/eventing_environment.go b/test/flags/eventing_environment.go index e76ce4d7f28..c01cae97be4 100644 --- a/test/flags/eventing_environment.go +++ b/test/flags/eventing_environment.go @@ -23,4 +23,7 @@ type EventingEnvironmentFlags struct { Sources PipeFile string ReadyFile string + BrokerName string + BrokerNamespace string + } From 3ef4e9534608f75af5898d1a718645153882a62b Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Mon, 20 Jul 2020 13:07:34 -0400 Subject: [PATCH 3/5] Fix typo --- test/conformance/helpers/broker_data_plane_test_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/conformance/helpers/broker_data_plane_test_helper.go b/test/conformance/helpers/broker_data_plane_test_helper.go index 8d14d275fac..1c335eb418d 100644 --- a/test/conformance/helpers/broker_data_plane_test_helper.go +++ b/test/conformance/helpers/broker_data_plane_test_helper.go @@ -173,7 +173,7 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( }) //Respond with 400 on bad CE - t.Run("Repsond with 400 on bad CE", func(t *testing.T) { + t.Run("Respond with 400 on bad CE", func(t *testing.T) { eventID := "four-hundred-on-bad-ce" body := ";la}{kjsdf;oai2095{}{}8234092349807asdfashdf" responseSink := "http://" + client.GetServiceHost(loggerName) From a0f8906396a302214391cf94d1d1df86a3d60b8a Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Mon, 20 Jul 2020 14:23:56 -0400 Subject: [PATCH 4/5] Suggested linting changes --- test/conformance/main_test.go | 2 +- test/e2e_flags.go | 1 - test/flags/eventing_environment.go | 5 ++--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/test/conformance/main_test.go b/test/conformance/main_test.go index ae9ab0a71f8..53d79b1a3e7 100644 --- a/test/conformance/main_test.go +++ b/test/conformance/main_test.go @@ -44,7 +44,7 @@ func TestMain(m *testing.M) { } brokerClass = test.EventingFlags.BrokerClass brokerName = test.EventingFlags.BrokerName - brokerNamespace = test.EventingFlags.BrokerNamespace + brokerNamespace = test.EventingFlags.BrokerNamespace // Any tests may SetupZipkinTracing, it will only actually be done once. This should be the ONLY // place that cleans it up. If an individual test calls this instead, then it will break other diff --git a/test/e2e_flags.go b/test/e2e_flags.go index f9e7e5d7b3f..0698deb2df3 100644 --- a/test/e2e_flags.go +++ b/test/e2e_flags.go @@ -41,7 +41,6 @@ const ( BrokerNameUsage = "When testing a pre-existing broker, specify the Broker name so the conformance tests " + "won't create their own." BrokerNamespaceUsage = "When testing a pre-existing broker, this variable specifies the namespace the broker can be found in." - ) // EventingFlags holds the command line flags specific to knative/eventing. diff --git a/test/flags/eventing_environment.go b/test/flags/eventing_environment.go index c01cae97be4..1435108cfc9 100644 --- a/test/flags/eventing_environment.go +++ b/test/flags/eventing_environment.go @@ -21,9 +21,8 @@ type EventingEnvironmentFlags struct { BrokerClass string Channels Sources - PipeFile string - ReadyFile string + PipeFile string + ReadyFile string BrokerName string BrokerNamespace string - } From ec9f35d2fc46c6f4009ad184f5c441f50bc61350 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Mon, 20 Jul 2020 17:05:21 -0400 Subject: [PATCH 5/5] Feedback on CE sdk usage --- .../helpers/broker_data_plane_test_helper.go | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/test/conformance/helpers/broker_data_plane_test_helper.go b/test/conformance/helpers/broker_data_plane_test_helper.go index 1c335eb418d..1e34098dd67 100644 --- a/test/conformance/helpers/broker_data_plane_test_helper.go +++ b/test/conformance/helpers/broker_data_plane_test_helper.go @@ -83,7 +83,7 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( event := ce.NewEvent() event.SetID(eventID) event.SetType(testlib.DefaultEventType) - event.SetSource("CE0.3") + event.SetSource("0.3.event.sender.test.knative.dev") body := fmt.Sprintf(`{"msg":%q}`, eventID) if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { @@ -93,7 +93,7 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( event.SetSpecVersion("0.3") client.SendEventToAddressable("v03-test-sender", broker.Name, testlib.BrokerTypeMeta, event) originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), + cetest.HasId(eventID), cetest.HasSpecVersion("0.3"), )) eventTracker.AssertAtLeast(1, originalEventMatcher) @@ -104,7 +104,7 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( event := ce.NewEvent() event.SetID(eventID) event.SetType(testlib.DefaultEventType) - event.SetSource("CE1.0") + event.SetSource("1.0.event.sender.test.knative.dev") body := fmt.Sprintf(`{"msg":%q}`, eventID) if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { t.Fatalf("Cannot set the payload of the event: %s", err.Error()) @@ -112,7 +112,8 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( client.SendEventToAddressable("v10-test-sender", broker.Name, testlib.BrokerTypeMeta, event) originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), + cetest.HasId(eventID), + cetest.HasSpecVersion("1.0"), )) eventTracker.AssertAtLeast(1, originalEventMatcher) @@ -122,15 +123,15 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( event := ce.NewEvent() event.SetID(eventID) event.SetType(testlib.DefaultEventType) - event.SetSource("CEStructured Mode") + event.SetSource("structured.mode.event.sender.test.knative.dev") body := fmt.Sprintf(`{"msg":%q}`, eventID) - if err := event.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { t.Fatalf("Cannot set the payload of the event: %s", err.Error()) } - client.SendEventToAddressable("structured-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + client.SendEventToAddressable("structured-test-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingStructured)) originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), + cetest.HasId(eventID), )) eventTracker.AssertAtLeast(1, originalEventMatcher) }) @@ -140,15 +141,15 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( event := ce.NewEvent() event.SetID(eventID) event.SetType(testlib.DefaultEventType) - event.SetSource("CEBindary Mode") + event.SetSource("binary.mode.event.sender.test.knative.dev") body := fmt.Sprintf(`{"msg":%q}`, eventID) - if err := event.SetData(ce.EncodingBinary.String(), []byte(body)); err != nil { + if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil { t.Fatalf("Cannot set the payload of the event: %s", err.Error()) } - client.SendEventToAddressable("binary-test-sender", broker.Name, testlib.BrokerTypeMeta, event) + client.SendEventToAddressable("binary-test-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingBinary)) originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( - cetest.HasSource(event.Source()), + cetest.HasId(eventID), )) eventTracker.AssertAtLeast(1, originalEventMatcher) }) @@ -161,7 +162,7 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( map[string]string{ "ce-specversion": "1.0", "ce-type": testlib.DefaultEventType, - "ce-source": "2XX on good CE", + "ce-source": "2xx.request.sender.test.knative.dev", "ce-id": eventID, "content-type": ce.ApplicationJSON, }, @@ -181,7 +182,7 @@ func BrokerV1Beta1IngressDataPlaneTestHelper( map[string]string{ "ce-specversion": "9000.1", //its over 9,000! "ce-type": testlib.DefaultEventType, - "ce-source": "400 on bad CE", + "ce-source": "400.request.sender.test.knative.dev", "ce-id": eventID, "content-type": ce.ApplicationJSON, }, @@ -238,7 +239,7 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( baseEvent.SetSource(baseSource) baseEvent.SetSpecVersion("1.0") body := fmt.Sprintf(`{"msg":%q}`, eventID) - if err := baseEvent.SetData(ce.EncodingStructured.String(), []byte(body)); err != nil { + if err := baseEvent.SetData(ce.ApplicationJSON, []byte(body)); err != nil { t.Fatalf("Cannot set the payload of the baseEvent: %s", err.Error()) } @@ -248,7 +249,7 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( event.SetID(source) event.Context = event.Context.AsV03() - client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) + client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingStructured)) originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( cetest.HasSpecVersion("0.3"), cetest.HasId("no-upgrade"), @@ -262,12 +263,12 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( id := "identical-attibutes" event.SetID(id) client.SendEventToAddressable(id+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - originalEventMatcher := recordevents.MatchEvent(cetest.AllOf( + originalEventMatcher := recordevents.MatchEvent( cetest.HasId(id), cetest.HasType(testlib.DefaultEventType), cetest.HasSource(baseSource), cetest.HasSpecVersion("1.0"), - )) + ) eventTracker.AssertExact(1, originalEventMatcher) }) @@ -279,9 +280,9 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( client.SendEventToAddressable("first-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) client.SendEventToAddressable("second-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, secondEvent) - filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( + filteredEventMatcher := recordevents.MatchEvent( cetest.HasSource(source), - )) + ) nonEventMatcher := recordevents.MatchEvent( cetest.HasSource(baseSource), ) @@ -294,9 +295,9 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( source := "filtered-event" event.SetSource(source) client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - filteredEventMatcher := recordevents.MatchEvent(cetest.AllOf( + filteredEventMatcher := recordevents.MatchEvent( cetest.HasSource(source), - )) + ) eventTracker.AssertAtLeast(1, filteredEventMatcher) secondTracker.AssertAtLeast(1, filteredEventMatcher) }) @@ -342,11 +343,11 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( ) client.WaitForResourceReadyOrFail(replyTrigger.Name, testlib.TriggerTypeMeta) client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event) - transformedEventMatcher := recordevents.MatchEvent(cetest.AllOf( + transformedEventMatcher := recordevents.MatchEvent( cetest.HasSource("reply-check-source"), cetest.HasType("reply-check-type"), cetest.HasData(msg), - )) + ) eventTracker.AssertAtLeast(2, transformedEventMatcher) }) }