From 70313757d03c6d2c8262bbe63aeee3a63f24bad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 28 Apr 2021 11:46:16 +0200 Subject: [PATCH 01/24] Introduction of SUT package --- test/e2e/broker_redelivery_test.go | 3 +- test/e2e/helpers/broker_test_helper.go | 3 +- test/lib/resources/eventing.go | 27 +++-- test/upgrade/README.md | 54 ++++++---- test/upgrade/continual.go | 8 +- test/upgrade/prober/config.toml | 2 +- test/upgrade/prober/configuration.go | 104 ++++++------------ test/upgrade/prober/forwarder.go | 9 +- test/upgrade/prober/prober.go | 32 +++--- test/upgrade/prober/prober_test.go | 4 +- test/upgrade/prober/receiver.go | 23 ++-- test/upgrade/prober/sender.go | 13 ++- test/upgrade/prober/sut/broker.go | 139 +++++++++++++++++++++++++ test/upgrade/prober/sut/default.go | 22 ++++ test/upgrade/prober/sut/types.go | 49 +++++++++ test/upgrade/prober/verify.go | 42 ++++---- 16 files changed, 356 insertions(+), 178 deletions(-) create mode 100644 test/upgrade/prober/sut/broker.go create mode 100644 test/upgrade/prober/sut/default.go create mode 100644 test/upgrade/prober/sut/types.go diff --git a/test/e2e/broker_redelivery_test.go b/test/e2e/broker_redelivery_test.go index 0da4b79a4cb..60a61cf19e2 100644 --- a/test/e2e/broker_redelivery_test.go +++ b/test/e2e/broker_redelivery_test.go @@ -44,7 +44,8 @@ func ChannelBasedBrokerCreator(channel metav1.TypeMeta, brokerClass string) help backoff := eventingduckv1.BackoffPolicyLinear // create a new broker. - client.CreateBrokerOrFail(brokerName, + client.CreateBrokerOrFail( + brokerName, resources.WithBrokerClassForBroker(brokerClass), resources.WithConfigForBroker(config), func(broker *v1.Broker) { diff --git a/test/e2e/helpers/broker_test_helper.go b/test/e2e/helpers/broker_test_helper.go index de6a8ef4212..8c24695493f 100644 --- a/test/e2e/helpers/broker_test_helper.go +++ b/test/e2e/helpers/broker_test_helper.go @@ -107,7 +107,8 @@ func ChannelBasedBrokerCreator(channel metav1.TypeMeta, brokerClass string) Brok switch version { case "v1": - client.CreateBrokerOrFail(brokerName, + client.CreateBrokerOrFail( + brokerName, resources.WithBrokerClassForBroker(brokerClass), resources.WithConfigForBroker(config), ) diff --git a/test/lib/resources/eventing.go b/test/lib/resources/eventing.go index 6c71ca99bc3..216934369aa 100644 --- a/test/lib/resources/eventing.go +++ b/test/lib/resources/eventing.go @@ -240,22 +240,31 @@ func WithDependencyAnnotationTrigger(dependencyAnnotation string) TriggerOption // WithSubscriberServiceRefForTrigger returns an option that adds a Subscriber Knative Service Ref for the given v1 Trigger. func WithSubscriberServiceRefForTrigger(name string) TriggerOption { - return func(t *eventingv1.Trigger) { - if name != "" { - t.Spec.Subscriber = duckv1.Destination{ - Ref: KnativeRefForService(name, t.Namespace), - } + return WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { + return duckv1.Destination{ + Ref: KnativeRefForService(name, t.Namespace), } - } + }) } // WithSubscriberURIForTrigger returns an option that adds a Subscriber URI for the given v1 Trigger. func WithSubscriberURIForTrigger(uri string) TriggerOption { - apisURI, _ := apis.ParseURL(uri) - return func(t *eventingv1.Trigger) { - t.Spec.Subscriber = duckv1.Destination{ + return WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { + apisURI, _ := apis.ParseURL(uri) + return duckv1.Destination{ URI: apisURI, } + }) +} + +// WithSubscriberDestination returns an option that adds a Subscriber for given +// duckv1.Destination. +func WithSubscriberDestination(destFactory func(t *eventingv1.Trigger) duckv1.Destination) TriggerOption { + return func(t *eventingv1.Trigger) { + dest := destFactory(t) + if dest.Ref != nil || dest.URI != nil { + t.Spec.Subscriber = dest + } } } diff --git a/test/upgrade/README.md b/test/upgrade/README.md index 99f103b9ffe..ed7a5f9da00 100644 --- a/test/upgrade/README.md +++ b/test/upgrade/README.md @@ -6,7 +6,8 @@ Running these tests on every commit will ensure that we don’t introduce any non-upgradeable changes, so every commit should be releasable. This is inspired by kubernetes -[upgrade testing](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-testing/e2e-tests.md#version-skewed-and-upgrade-testing). +[upgrade testing](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-testing/e2e-tests.md#version-skewed-and-upgrade-testing) +. These tests are a pretty big hammer in that they cover more than just version changes, but it’s one of the only ways to make sure we don’t accidentally make @@ -26,15 +27,19 @@ At a high level, we want to do this: 1. Run any post-install jobs that apply for the release to be. 1. Test those resources, verify that we didn’t break anything. -To achieve that, we just have three separate build tags: +To achieve that, we created an upgrade framework (knative.dev/pkg/test/upgrade). +This framework will enforce running upgrade tests in specific order and supports +continual verification of system under test. In case of Eventing Kafka it is: 1. Install the latest release from GitHub. -1. Run the `preupgrade` tests in this directory. -1. Install at HEAD (`ko apply -f config/`). -1. Run the post-install job. For v0.15 we need to migrate storage versions. -1. Run the `postupgrade` tests in this directory. +1. Run the `preupgrade` smoke tests. +1. Start `continual` tests that will propagate events in the background, while + upgrading and downgrading. +1. Install at HEAD (`ko apply -f config/`) and run the post-install jobs. +1. Run the `postupgrade` smoke tests. 1. Install the latest release from GitHub. -1. Run the `postdowngrade` tests in this directory. +1. Run the `postdowngrade` smoke tests. +1. Stop and verify `continual` tests, checking if every event propagated well. ## Tests @@ -52,21 +57,24 @@ In order to verify that we don't have data-plane unavailability during our control-plane outages (when we're upgrading the knative/eventing installation), we run a prober test that continually sends events to a service during the entire upgrade/downgrade process. When the upgrade completes, we make sure that -all of those events propagated just once. - -To achieve that a [wathola tool](test/upgrade/prober/wathola) was prepared. It -consists of 4 components: _sender_, _forwarder_, _receiver_, and _fetcher_. -_Sender_ is the usual Kubernetes deployment that publishes events to the default -`broker` with given interval. When it terminates (by either `SIGTERM`, or +all of those events propagated at least once. + +To achieve that +a [wathola tool](https://pkg.go.dev/knative.dev/eventing/test/upgrade/prober/wathola) +was prepared. It consists of 4 components: _sender_, _forwarder_, _receiver_, +and _fetcher_. _Sender_ is the usual Kubernetes deployment that publishes events +to the System under Tests (SUT). By default, SUT is a default `broker` +with two triggers for each type of events being sent. _Sender_ will send events +with given interval. When it terminates (by either `SIGTERM`, or `SIGINT`), a `finished` event is generated. _Forwarder_ is a knative serving service that scales up from zero to receive the sent events and forward them to given target which is the _receiver_ in our case. _Receiver_ is an ordinary -deployment that collects events from multiple forwarders and has an endpoint -`/report` that can be polled to get the status of received events. To fetch the -report from within the cluster _fetcher_ comes in. It's a simple one time job, -that will fetch the report from _receiver_ and print it on stdout as JSON. That -enables the test client to download _fetcher_ logs and parse the JSON to get the -final report. +deployment that collects events from multiple forwarders and has an +endpoint `/report` that can be polled to get the status of received events. To +fetch the report from within the cluster _fetcher_ comes in. It's a simple one +time job, that will fetch the report from _receiver_ and print it on stdout as +JSON. That enables the test client to download _fetcher_ logs and parse the JSON +to get the final report. Diagram below describe the setup: @@ -96,16 +104,16 @@ Probe test behavior can be influenced from outside without modifying its source code. That can be beneficial if one would like to run upgrade tests in different context. One such example might be running Eventing upgrade tests in place that have Serving and Eventing both installed. In such environment one can set -environment variable `E2E_UPGRADE_TESTS_SERVING_USE` to enable usage of ksvc -forwarder (which is disabled by default): +environment variable `EVENTING_UPGRADE_TESTS_SERVING_USE` to enable usage of +ksvc forwarder (which is disabled by default): ``` -$ export E2E_UPGRADE_TESTS_SERVING_USE=true +$ export EVENTING_UPGRADE_TESTS_SERVING_USE=true ``` Any option, apart from namespace, in [`knative.dev/eventing/test/upgrade/prober.Config`](https://github.com/knative/eventing/blob/022e281/test/upgrade/prober/prober.go#L52-L63) -struct can be influenced, by using `E2E_UPGRADE_TESTS_XXXXX` environmental +struct can be influenced, by using `EVENTING_UPGRADE_TESTS_XXXXX` environmental variable prefix (using [kelseyhightower/envconfig](https://github.com/kelseyhightower/envconfig#usage) usage). diff --git a/test/upgrade/continual.go b/test/upgrade/continual.go index e5ff5af2734..08c78df112b 100644 --- a/test/upgrade/continual.go +++ b/test/upgrade/continual.go @@ -17,15 +17,13 @@ limitations under the License. package upgrade import ( - "context" - testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/upgrade/prober" pkgupgrade "knative.dev/pkg/test/upgrade" ) +// ContinualTest will perform a continual validation of Eventing SUT. func ContinualTest() pkgupgrade.BackgroundOperation { - ctx := context.Background() var client *testlib.Client var probe prober.Prober return pkgupgrade.NewBackgroundVerification("EventingContinualTest", @@ -33,12 +31,12 @@ func ContinualTest() pkgupgrade.BackgroundOperation { // setup client = testlib.Setup(c.T, false) config := prober.NewConfig(client.Namespace) - probe = prober.RunEventProber(ctx, c.Log, client, config) + probe = prober.RunEventProber(c.Log, client, config) }, func(c pkgupgrade.Context) { // verify defer testlib.TearDown(client) - prober.AssertEventProber(ctx, c.T, probe) + prober.AssertEventProber(c.T, probe) }, ) } diff --git a/test/upgrade/prober/config.toml b/test/upgrade/prober/config.toml index 02adec7f347..9029beda9d4 100644 --- a/test/upgrade/prober/config.toml +++ b/test/upgrade/prober/config.toml @@ -3,4 +3,4 @@ address = '{{- .BrokerURL -}}' interval = {{ .Config.Interval.Nanoseconds }} [forwarder] -target = 'http://wathola-receiver.{{- .Config.Namespace -}}.svc.cluster.local' +target = 'http://wathola-receiver.{{- .Namespace -}}.svc.cluster.local' diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 4818214077b..99f515c3aca 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -27,12 +27,10 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/wavesoftware/go-ensure" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" - testlib "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/duck" "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/upgrade/prober/sut" "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" ) const ( @@ -40,8 +38,7 @@ const ( defaultConfigHomedirPath = ".config/wathola" defaultHomedir = "/home/nonroot" defaultConfigFilename = "config.toml" - defaultWatholaEventsPrefix = "com.github.cardil.wathola" - defaultBrokerName = "default" + defaultWatholaEventsPrefix = "dev.knative.eventing.wathola" defaultHealthEndpoint = "/healthz" defaultFinishedSleep = 5 * time.Second @@ -58,25 +55,24 @@ var eventTypes = []string{"step", "finished"} // Config represents a configuration for prober. type Config struct { Wathola - Namespace string Interval time.Duration FinishedSleep time.Duration Serving ServingConfig FailOnErrors bool OnDuplicate DuplicateAction - BrokerOpts []resources.BrokerOption + Ctx context.Context } // Wathola represents options related strictly to wathola testing tool. type Wathola struct { - ConfigMap + ConfigToml + sut.SystemUnderTest EventsTypePrefix string HealthEndpoint string - BrokerName string } -// ConfigMap represents options of wathola config toml file. -type ConfigMap struct { +// ConfigToml represents options of wathola config toml file. +type ConfigToml struct { // ConfigTemplate is a template file that will be compiled to the configmap ConfigTemplate string ConfigMapName string @@ -95,18 +91,17 @@ type ServingConfig struct { // `e2e_upgrade_tests` prefix. func NewConfig(namespace string) *Config { config := &Config{ - Namespace: "", Interval: Interval, FinishedSleep: defaultFinishedSleep, FailOnErrors: true, OnDuplicate: Warn, - BrokerOpts: make([]resources.BrokerOption, 0), + Ctx: context.Background(), Serving: ServingConfig{ Use: false, ScaleToZero: true, }, Wathola: Wathola{ - ConfigMap: ConfigMap{ + ConfigToml: ConfigToml{ ConfigTemplate: defaultConfigFilename, ConfigMapName: defaultConfigName, ConfigMountPoint: fmt.Sprintf("%s/%s", defaultHomedir, defaultConfigHomedirPath), @@ -114,83 +109,42 @@ func NewConfig(namespace string) *Config { }, EventsTypePrefix: defaultWatholaEventsPrefix, HealthEndpoint: defaultHealthEndpoint, - BrokerName: defaultBrokerName, + SystemUnderTest: sut.NewDefault(namespace, eventTypes), }, } - // FIXME: remove while fixing https://github.com/knative/eventing/issues/2665 - config.FailOnErrors = false - - err := envconfig.Process("e2e_upgrade_tests", config) + err := envconfig.Process("eventing_upgrade_tests", config) ensure.NoError(err) - config.Namespace = namespace return config } func (p *prober) deployConfiguration() { - p.deployBroker() - p.deployConfigMap() - p.deployTriggers() -} - -func (p *prober) deployBroker() { - p.client.CreateBrokerOrFail(p.config.BrokerName, p.config.BrokerOpts...) -} - -func (p *prober) fetchBrokerURL() (*apis.URL, error) { - namespace := p.config.Namespace - p.log.Debugf("Fetching %s broker URL for ns %s", - p.config.BrokerName, namespace) - meta := resources.NewMetaResource( - p.config.BrokerName, p.config.Namespace, testlib.BrokerTypeMeta, - ) - err := duck.WaitForResourceReady(p.client.Dynamic, meta) - if err != nil { - return nil, err + sc := sut.Context{ + Ctx: p.config.Ctx, + Log: p.log, + Client: p.client, } - broker, err := p.client.Eventing.EventingV1().Brokers(namespace).Get( - context.Background(), p.config.BrokerName, metav1.GetOptions{}, - ) + ref := resources.KnativeRefForService(receiverName, p.client.Namespace) + if p.config.Serving.Use { + ref = resources.KnativeRefForKservice(forwarderName, p.client.Namespace) + } + dest := duckv1.Destination{Ref: ref} + url, err := p.config.SystemUnderTest.Deploy(sc, dest) if err != nil { - return nil, err + p.client.T.Fatal(err) } - url := broker.Status.Address.URL - p.log.Debugf("%s broker URL for ns %s is %v", - p.config.BrokerName, namespace, url) - return url, nil + p.deployConfigToml(url) } -func (p *prober) deployConfigMap() { +func (p *prober) deployConfigToml(url *apis.URL) { name := p.config.ConfigMapName - p.log.Infof("Deploying config map: \"%s/%s\"", p.config.Namespace, name) - brokerURL, err := p.fetchBrokerURL() - ensure.NoError(err) - configData := p.compileTemplate(p.config.ConfigTemplate, brokerURL) - p.client.CreateConfigMapOrFail(name, p.config.Namespace, map[string]string{ + p.log.Infof("Deploying config map: \"%s/%s\"", p.client.Namespace, name) + configData := p.compileTemplate(p.config.ConfigTemplate, url) + p.client.CreateConfigMapOrFail(name, p.client.Namespace, map[string]string{ p.config.ConfigFilename: configData, }) } -func (p *prober) deployTriggers() { - for _, eventType := range eventTypes { - name := fmt.Sprintf("wathola-trigger-%v", eventType) - fullType := fmt.Sprintf("%v.%v", p.config.EventsTypePrefix, eventType) - subscriberOption := resources.WithSubscriberServiceRefForTrigger(receiverName) - if p.config.Serving.Use { - subscriberOption = resources.WithSubscriberKServiceRefForTrigger(forwarderName) - } - p.client.CreateTriggerOrFail(name, - resources.WithBroker(p.config.BrokerName), - resources.WithAttributesTriggerFilter( - eventingv1.TriggerAnyFilter, - fullType, - map[string]interface{}{}, - ), - subscriberOption, - ) - } -} - func (p *prober) compileTemplate(templateName string, brokerURL fmt.Stringer) string { _, filename, _, _ := runtime.Caller(0) templateFilepath := path.Join(path.Dir(filename), templateName) @@ -201,9 +155,11 @@ func (p *prober) compileTemplate(templateName string, brokerURL fmt.Stringer) st var buff bytes.Buffer data := struct { *Config + Namespace string BrokerURL string }{ p.config, + p.client.Namespace, brokerURL.String(), } ensure.NoError(tmpl.Execute(&buff, data)) diff --git a/test/upgrade/prober/forwarder.go b/test/upgrade/prober/forwarder.go index cfda56554b3..8e3bd765883 100644 --- a/test/upgrade/prober/forwarder.go +++ b/test/upgrade/prober/forwarder.go @@ -16,7 +16,6 @@ package prober import ( - "context" "fmt" "github.com/wavesoftware/go-ensure" @@ -32,11 +31,11 @@ var ( forwarderName = "wathola-forwarder" ) -func (p *prober) deployForwarder(ctx context.Context) { +func (p *prober) deployForwarder() { p.log.Infof("Deploy forwarder knative service: %v", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) service := p.forwarderKService(forwarderName, p.client.Namespace) - _, err := serving.Create(context.Background(), service, metav1.CreateOptions{}) + _, err := serving.Create(p.config.Ctx, service, metav1.CreateOptions{}) ensure.NoError(err) sc := p.servingClient() @@ -46,7 +45,7 @@ func (p *prober) deployForwarder(ctx context.Context) { if p.config.Serving.ScaleToZero { testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarderName), func() error { - return duck.WaitForKServiceScales(ctx, sc, forwarderName, p.client.Namespace, func(scale int) bool { + return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarderName, p.client.Namespace, func(scale int) bool { return scale == 0 }) }) @@ -56,7 +55,7 @@ func (p *prober) deployForwarder(ctx context.Context) { func (p *prober) removeForwarder() { p.log.Infof("Remove forwarder knative service: %v", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) - err := serving.Delete(context.Background(), forwarderName, metav1.DeleteOptions{}) + err := serving.Delete(p.config.Ctx, forwarderName, metav1.DeleteOptions{}) ensure.NoError(err) } diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index bc6623bf59f..3b195c056ce 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -16,7 +16,6 @@ package prober import ( - "context" "testing" "time" @@ -27,43 +26,42 @@ import ( ) var ( - // FIXME: Interval is set to 200 msec, as lower values will result in errors: knative/eventing#2357 - // Interval = 10 * time.Millisecond - Interval = 200 * time.Millisecond + // Interval is used to send events in specific rate. + Interval = 10 * time.Millisecond ) // Prober is the interface for a prober, which checks the result of the probes when stopped. type Prober interface { // Verify will verify prober state after finished has been send - Verify(ctx context.Context) ([]error, int, error) + Verify() ([]error, int, error) // Finish send finished event - Finish(ctx context.Context) + Finish() // ReportErrors will reports found errors in proper way ReportErrors(t *testing.T, errors []error) // deploy a prober to a cluster - deploy(ctx context.Context) + deploy() // remove a prober from cluster remove() } // RunEventProber starts a single Prober of the given domain. -func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { +func RunEventProber(log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { pm := newProber(log, client, config) - pm.deploy(ctx) + pm.deploy() return pm } // AssertEventProber will send finish event and then verify if all events propagated well -func AssertEventProber(ctx context.Context, t *testing.T, prober Prober) { - prober.Finish(ctx) +func AssertEventProber(t *testing.T, prober Prober) { + prober.Finish() defer prober.remove() waitAfterFinished(prober) - eventErrs, eventCount, err := prober.Verify(ctx) + eventErrs, eventCount, err := prober.Verify() if err != nil { t.Fatal("fetch error:", err) } @@ -105,16 +103,16 @@ func (p *prober) ReportErrors(t *testing.T, errors []error) { } } -func (p *prober) deploy(ctx context.Context) { +func (p *prober) deploy() { p.log.Infof("Using namespace for probe testing: %v", p.client.Namespace) p.deployConfiguration() - p.deployReceiver(ctx) + p.deployReceiver() if p.config.Serving.Use { - p.deployForwarder(ctx) + p.deployForwarder() } - p.client.WaitForAllTestResourcesReadyOrFail(ctx) + p.client.WaitForAllTestResourcesReadyOrFail(p.config.Ctx) - p.deploySender(ctx) + p.deploySender() ensure.NoError(testlib.AwaitForAll(p.log)) // allow sender to send at least some events, 2 sec wait time.Sleep(2 * time.Second) diff --git a/test/upgrade/prober/prober_test.go b/test/upgrade/prober/prober_test.go index 40666a72e48..d1e36cf5172 100644 --- a/test/upgrade/prober/prober_test.go +++ b/test/upgrade/prober/prober_test.go @@ -26,8 +26,8 @@ import ( const ( defaultConfigFilename = "config.toml" - servingEnvName = "E2E_UPGRADE_TESTS_SERVING_USE" - configFilenameEnvName = "E2E_UPGRADE_TESTS_CONFIGFILENAME" + servingEnvName = "EVENTING_UPGRADE_TESTS_SERVING_USE" + configFilenameEnvName = "EVENTING_UPGRADE_TESTS_CONFIGFILENAME" ) func TestNewConfig(t *testing.T) { diff --git a/test/upgrade/prober/receiver.go b/test/upgrade/prober/receiver.go index 2b4f5883cf9..f558448a061 100644 --- a/test/upgrade/prober/receiver.go +++ b/test/upgrade/prober/receiver.go @@ -16,7 +16,6 @@ package prober import ( - "context" "fmt" "github.com/wavesoftware/go-ensure" @@ -34,32 +33,32 @@ var ( receiverNodePort int32 = -1 ) -func (p *prober) deployReceiver(ctx context.Context) { - p.deployReceiverDeployment(ctx) - p.deployReceiverService(ctx) +func (p *prober) deployReceiver() { + p.deployReceiverDeployment() + p.deployReceiverService() } -func (p *prober) deployReceiverDeployment(ctx context.Context) { +func (p *prober) deployReceiverDeployment() { p.log.Info("Deploy of receiver deployment: ", receiverName) deployment := p.createReceiverDeployment() _, err := p.client.Kube.AppsV1(). Deployments(deployment.Namespace). - Create(ctx, deployment, metav1.CreateOptions{}) + Create(p.config.Ctx, deployment, metav1.CreateOptions{}) ensure.NoError(err) testlib.WaitFor(fmt.Sprint("receiver deployment be ready: ", receiverName), func() error { return pkgTest.WaitForDeploymentScale( - ctx, p.client.Kube, receiverName, p.client.Namespace, 1, + p.config.Ctx, p.client.Kube, receiverName, p.client.Namespace, 1, ) }) } -func (p *prober) deployReceiverService(ctx context.Context) { +func (p *prober) deployReceiverService() { p.log.Infof("Deploy of receiver service: %v", receiverName) service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: receiverName, - Namespace: p.config.Namespace, + Namespace: p.client.Namespace, }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ @@ -79,8 +78,8 @@ func (p *prober) deployReceiverService(ctx context.Context) { Type: corev1.ServiceTypeNodePort, }, } - created, err := p.client.Kube.CoreV1().Services(p.config.Namespace). - Create(ctx, service, metav1.CreateOptions{}) + created, err := p.client.Kube.CoreV1().Services(p.client.Namespace). + Create(p.config.Ctx, service, metav1.CreateOptions{}) ensure.NoError(err) for _, portSpec := range created.Spec.Ports { if portSpec.Port == 80 { @@ -99,7 +98,7 @@ func (p *prober) createReceiverDeployment() *appsv1.Deployment { return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: receiverName, - Namespace: p.config.Namespace, + Namespace: p.client.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, diff --git a/test/upgrade/prober/sender.go b/test/upgrade/prober/sender.go index 0d5be701881..a4c0cfd0e0e 100644 --- a/test/upgrade/prober/sender.go +++ b/test/upgrade/prober/sender.go @@ -16,7 +16,6 @@ package prober import ( - "context" "fmt" "github.com/wavesoftware/go-ensure" @@ -29,13 +28,13 @@ import ( var senderName = "wathola-sender" -func (p *prober) deploySender(ctx context.Context) { +func (p *prober) deploySender() { p.log.Info("Deploy sender deployment: ", senderName) var replicas int32 = 1 deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: senderName, - Namespace: p.config.Namespace, + Namespace: p.client.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, @@ -77,21 +76,21 @@ func (p *prober) deploySender(ctx context.Context) { _, err := p.client.Kube.AppsV1(). Deployments(p.client.Namespace). - Create(ctx, deployment, metav1.CreateOptions{}) + Create(p.config.Ctx, deployment, metav1.CreateOptions{}) ensure.NoError(err) testlib.WaitFor(fmt.Sprint("sender deployment be ready: ", senderName), func() error { return pkgTest.WaitForDeploymentScale( - ctx, p.client.Kube, senderName, p.client.Namespace, int(replicas), + p.config.Ctx, p.client.Kube, senderName, p.client.Namespace, int(replicas), ) }) } -func (p *prober) removeSender(ctx context.Context) { +func (p *prober) removeSender() { p.log.Info("Remove of sender deployment: ", senderName) err := p.client.Kube.AppsV1(). Deployments(p.client.Namespace). - Delete(ctx, senderName, metav1.DeleteOptions{}) + Delete(p.config.Ctx, senderName, metav1.DeleteOptions{}) ensure.NoError(err) } diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go new file mode 100644 index 00000000000..8bab97428a4 --- /dev/null +++ b/test/upgrade/prober/sut/broker.go @@ -0,0 +1,139 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/duck" + "knative.dev/eventing/test/lib/resources" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +var ( + retryCount = int32(12) + backoffPolicy = eventingduckv1.BackoffPolicyExponential + backoffDelay = "PT1S" +) + +// BrokerAndTriggers will deploy a default broker and 2 triggers to route two types of +// events to receiver. +type BrokerAndTriggers struct { + Broker + Triggers + Namespace string +} + +// Broker will hold settings for broker itself +type Broker struct { + Name string + Opts []resources.BrokerOption +} + +// Triggers will hold settings for triggers +type Triggers struct { + Types []string + TypePrefix string +} + +func newBrokerAndTriggers(namespace string, eventTypes []string) SystemUnderTest { + return &BrokerAndTriggers{ + Namespace: namespace, + Broker: Broker{ + Name: "sut", + Opts: []resources.BrokerOption{ + resources.WithDeliveryForBroker( + &eventingduckv1.DeliverySpec{ + Retry: &retryCount, + BackoffPolicy: &backoffPolicy, + BackoffDelay: &backoffDelay, + }), + }, + }, + Triggers: Triggers{ + Types: eventTypes, + TypePrefix: defaulEventsPrefix, + }, + } +} + +func (b *BrokerAndTriggers) Deploy(ctx Context, dest duckv1.Destination) (*apis.URL, error) { + b.deployBroker(ctx) + url, err := b.fetchURL(ctx) + if err != nil { + return nil, err + } + b.deployTriggers(ctx, dest) + return url, nil +} + +func (b *BrokerAndTriggers) Teardown(ctx Context) error { + ctx.Log.Debug("BrokerAndTriggers SUT should automatically teardown") + return nil +} + +func (b *BrokerAndTriggers) deployBroker(ctx Context) { + ctx.Client.CreateBrokerOrFail(b.Name, b.Broker.Opts...) +} + +func (b *BrokerAndTriggers) fetchURL(ctx Context) (*apis.URL, error) { + namespace := b.Namespace + ctx.Log.Debugf("Fetching %s broker URL for ns %s", + b.Name, namespace) + meta := resources.NewMetaResource( + b.Name, b.Namespace, testlib.BrokerTypeMeta, + ) + err := duck.WaitForResourceReady(ctx.Client.Dynamic, meta) + if err != nil { + return nil, err + } + broker, err := ctx.Client.Eventing.EventingV1().Brokers(namespace).Get( + ctx.Ctx, b.Name, metav1.GetOptions{}, + ) + if err != nil { + return nil, err + } + url := broker.Status.Address.URL + ctx.Log.Debugf("%s broker URL for ns %s is %v", b.Name, namespace, url) + return url, nil +} + +func (b *BrokerAndTriggers) deployTriggers(ctx Context, dest duckv1.Destination) { + for _, eventType := range b.Triggers.Types { + name := fmt.Sprintf("%s-%s", b.Name, eventType) + fullType := fmt.Sprintf("%s.%s", b.Triggers.TypePrefix, eventType) + subscriberOption := resources.WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { + return dest + }) + ctx.Log.Debugf("Creating trigger %s for type %s to route to %#v", name, eventType, dest) + ctx.Client.CreateTriggerOrFail( + name, + resources.WithBroker(b.Name), + resources.WithAttributesTriggerFilter( + eventingv1.TriggerAnyFilter, + fullType, + map[string]interface{}{}, + ), + subscriberOption, + ) + } +} diff --git a/test/upgrade/prober/sut/default.go b/test/upgrade/prober/sut/default.go new file mode 100644 index 00000000000..7d9286b5d2e --- /dev/null +++ b/test/upgrade/prober/sut/default.go @@ -0,0 +1,22 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut + +// NewDefault will return a default SUT for this repo. +func NewDefault(namespace string, eventTypes []string) SystemUnderTest { + return newBrokerAndTriggers(namespace, eventTypes) +} diff --git a/test/upgrade/prober/sut/types.go b/test/upgrade/prober/sut/types.go new file mode 100644 index 00000000000..72fc42ec40f --- /dev/null +++ b/test/upgrade/prober/sut/types.go @@ -0,0 +1,49 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut + +import ( + "context" + + "go.uber.org/zap" + testlib "knative.dev/eventing/test/lib" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +const ( + defaulEventsPrefix = "dev.knative.eventing.prober.sut" +) + +// SystemUnderTest (SUT) represents a system that we'd like to test with +// continual prober. +type SystemUnderTest interface { + // Deploy is responsible for deploying SUT and returning a URL to feed + // events into. + Deploy(ctx Context, destination duckv1.Destination) (*apis.URL, error) + + // Teardown will remove all deployed SUT resources. + Teardown(ctx Context) error +} + +// Context represents a context of system under test that we'd +// like to deploy and teardown. +type Context struct { + Ctx context.Context + Log *zap.SugaredLogger + *testlib.Client +} diff --git a/test/upgrade/prober/verify.go b/test/upgrade/prober/verify.go index 2b282e4ccf0..0e7e8494e79 100644 --- a/test/upgrade/prober/verify.go +++ b/test/upgrade/prober/verify.go @@ -42,8 +42,8 @@ const ( ) // Verify will verify prober state after finished has been sent. -func (p *prober) Verify(ctx context.Context) (eventErrs []error, eventsSent int, fetchErr error) { - report := p.fetchReport(ctx) +func (p *prober) Verify() (eventErrs []error, eventsSent int, fetchErr error) { + report := p.fetchReport() availRate := 0.0 if report.TotalRequests != 0 { availRate = float64(report.EventsSent*100) / float64(report.TotalRequests) @@ -75,12 +75,12 @@ func (p *prober) Verify(ctx context.Context) (eventErrs []error, eventsSent int, } // Finish terminates sender which sends finished event. -func (p *prober) Finish(ctx context.Context) { - p.removeSender(ctx) +func (p *prober) Finish() { + p.removeSender() } -func (p *prober) fetchReport(ctx context.Context) *receiver.Report { - exec := p.fetchExecution(ctx) +func (p *prober) fetchReport() *receiver.Report { + exec := p.fetchExecution() replayLogs(p.log, exec) return exec.Report } @@ -100,13 +100,13 @@ func replayLogs(log *zap.SugaredLogger, exec *fetcher.Execution) { } } -func (p *prober) fetchExecution(ctx context.Context) *fetcher.Execution { - ns := p.config.Namespace - job := p.deployFetcher(ctx) - defer p.deleteFetcher(ctx) - pod, err := p.findSucceededPod(ctx, job) +func (p *prober) fetchExecution() *fetcher.Execution { + ns := p.client.Namespace + job := p.deployFetcher() + defer p.deleteFetcher() + pod, err := p.findSucceededPod(job) ensure.NoError(err) - bytes, err := pkgTest.PodLogs(ctx, p.client.Kube, pod.Name, fetcherName, ns) + bytes, err := pkgTest.PodLogs(p.config.Ctx, p.client.Kube, pod.Name, fetcherName, ns) ensure.NoError(err) ex := &fetcher.Execution{ Logs: []fetcher.LogEntry{}, @@ -127,9 +127,9 @@ func (p *prober) fetchExecution(ctx context.Context) *fetcher.Execution { return ex } -func (p *prober) deployFetcher(ctx context.Context) *batchv1.Job { +func (p *prober) deployFetcher() *batchv1.Job { p.log.Info("Deploying fetcher job: ", fetcherName) - jobs := p.client.Kube.BatchV1().Jobs(p.config.Namespace) + jobs := p.client.Kube.BatchV1().Jobs(p.client.Namespace) var replicas int32 = 1 fetcherJob := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -170,25 +170,25 @@ func (p *prober) deployFetcher(ctx context.Context) *batchv1.Job { }, }, } - created, err := jobs.Create(ctx, fetcherJob, metav1.CreateOptions{}) + created, err := jobs.Create(p.config.Ctx, fetcherJob, metav1.CreateOptions{}) ensure.NoError(err) p.log.Info("Waiting for fetcher job to succeed: ", fetcherName) - err = waitForJobToComplete(ctx, p.client.Kube, fetcherName, p.config.Namespace) + err = waitForJobToComplete(p.config.Ctx, p.client.Kube, fetcherName, p.client.Namespace) ensure.NoError(err) return created } -func (p *prober) deleteFetcher(ctx context.Context) { - ns := p.config.Namespace +func (p *prober) deleteFetcher() { + ns := p.client.Namespace jobs := p.client.Kube.BatchV1().Jobs(ns) - err := jobs.Delete(ctx, fetcherName, metav1.DeleteOptions{}) + err := jobs.Delete(p.config.Ctx, fetcherName, metav1.DeleteOptions{}) ensure.NoError(err) } -func (p *prober) findSucceededPod(ctx context.Context, job *batchv1.Job) (*corev1.Pod, error) { +func (p *prober) findSucceededPod(job *batchv1.Job) (*corev1.Pod, error) { pods := p.client.Kube.CoreV1().Pods(job.Namespace) - podList, err := pods.List(ctx, metav1.ListOptions{ + podList, err := pods.List(p.config.Ctx, metav1.ListOptions{ LabelSelector: fmt.Sprint("job-name=", job.Name), }) ensure.NoError(err) From 172557870dfd22119e80a16738fc7e456fad402d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 5 May 2021 13:53:07 +0200 Subject: [PATCH 02/24] Refactor to keep used interfaces and deprecate them first --- test/upgrade/continual.go | 21 +--- test/upgrade/prober/configuration.go | 13 +- test/upgrade/prober/prober.go | 125 ++++++++++++++------ test/upgrade/prober/prober_test.go | 4 +- test/upgrade/prober/sut/broker.go | 2 +- test/upgrade/prober/sut/default.go | 4 +- test/upgrade/prober/sut/default_e2e_test.go | 66 +++++++++++ test/upgrade/prober/sut/types.go | 2 + test/upgrade/prober/verify.go | 6 +- 9 files changed, 176 insertions(+), 67 deletions(-) create mode 100644 test/upgrade/prober/sut/default_e2e_test.go diff --git a/test/upgrade/continual.go b/test/upgrade/continual.go index 08c78df112b..957a04a2789 100644 --- a/test/upgrade/continual.go +++ b/test/upgrade/continual.go @@ -17,26 +17,17 @@ limitations under the License. package upgrade import ( - testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/upgrade/prober" pkgupgrade "knative.dev/pkg/test/upgrade" ) // ContinualTest will perform a continual validation of Eventing SUT. func ContinualTest() pkgupgrade.BackgroundOperation { - var client *testlib.Client - var probe prober.Prober - return pkgupgrade.NewBackgroundVerification("EventingContinualTest", - func(c pkgupgrade.Context) { - // setup - client = testlib.Setup(c.T, false) - config := prober.NewConfig(client.Namespace) - probe = prober.RunEventProber(c.Log, client, config) - }, - func(c pkgupgrade.Context) { - // verify - defer testlib.TearDown(client) - prober.AssertEventProber(c.T, probe) - }, + config := prober.NewConfig() + pr := prober.CreateProbeRunner(config) + return pkgupgrade.NewBackgroundVerification( + "EventingContinualTest", + pr.Setup, + pr.Verify, ) } diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 99f515c3aca..9c268663dbd 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -50,8 +50,6 @@ const ( // DuplicateAction is the action to take in case of duplicated events type DuplicateAction string -var eventTypes = []string{"step", "finished"} - // Config represents a configuration for prober. type Config struct { Wathola @@ -66,7 +64,7 @@ type Config struct { // Wathola represents options related strictly to wathola testing tool. type Wathola struct { ConfigToml - sut.SystemUnderTest + SystemUnderTest func(namespace string) sut.SystemUnderTest EventsTypePrefix string HealthEndpoint string } @@ -88,8 +86,8 @@ type ServingConfig struct { // NewConfig creates a new configuration object with default values filled in. // Values can be influenced by kelseyhightower/envconfig with -// `e2e_upgrade_tests` prefix. -func NewConfig(namespace string) *Config { +// `eventing_upgrade_tests` prefix. +func NewConfig() *Config { config := &Config{ Interval: Interval, FinishedSleep: defaultFinishedSleep, @@ -109,7 +107,7 @@ func NewConfig(namespace string) *Config { }, EventsTypePrefix: defaultWatholaEventsPrefix, HealthEndpoint: defaultHealthEndpoint, - SystemUnderTest: sut.NewDefault(namespace, eventTypes), + SystemUnderTest: sut.NewDefault, }, } @@ -129,7 +127,8 @@ func (p *prober) deployConfiguration() { ref = resources.KnativeRefForKservice(forwarderName, p.client.Namespace) } dest := duckv1.Destination{Ref: ref} - url, err := p.config.SystemUnderTest.Deploy(sc, dest) + s := p.config.SystemUnderTest(p.client.Namespace) + url, err := s.Deploy(sc, dest) if err != nil { p.client.T.Fatal(err) } diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 3b195c056ce..20dea54c910 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -16,13 +16,16 @@ package prober import ( + "context" "testing" "time" + "github.com/prometheus/common/log" "github.com/wavesoftware/go-ensure" "go.uber.org/zap" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" + pkgupgrade "knative.dev/pkg/test/upgrade" ) var ( @@ -33,45 +36,102 @@ var ( // Prober is the interface for a prober, which checks the result of the probes when stopped. type Prober interface { // Verify will verify prober state after finished has been send - Verify() ([]error, int, error) + Verify() ([]error, int) // Finish send finished event Finish() // ReportErrors will reports found errors in proper way - ReportErrors(t *testing.T, errors []error) + ReportErrors(errors []error) +} + +// ProbeRunner will run continual verification with provided configuration. +type ProbeRunner interface { + // Setup will start a continual prober in background. + Setup(ctx pkgupgrade.Context) - // deploy a prober to a cluster - deploy() - // remove a prober from cluster - remove() + // Verify will verify that all sent events propagated at least once. + Verify(ctx pkgupgrade.Context) } -// RunEventProber starts a single Prober of the given domain. -func RunEventProber(log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { - pm := newProber(log, client, config) - pm.deploy() - return pm +// CreateProbeRunner will create a runner compatibile with +// pkgupgrade.BackgroundVerification interface. +func CreateProbeRunner( + config *Config, + options ...testlib.SetupClientOption, +) ProbeRunner { + return &probeRunner{ + prober: &prober{ + config: config, + }, + options: options, + } } -// AssertEventProber will send finish event and then verify if all events propagated well -func AssertEventProber(t *testing.T, prober Prober) { - prober.Finish() - defer prober.remove() +type probeRunner struct { + *prober + options []testlib.SetupClientOption +} - waitAfterFinished(prober) +func (p *probeRunner) Setup(ctx pkgupgrade.Context) { + p.log = ctx.Log + p.client = testlib.Setup(ctx.T, false, p.options...) + p.deploy() +} - eventErrs, eventCount, err := prober.Verify() - if err != nil { - t.Fatal("fetch error:", err) +func (p *probeRunner) Verify(ctx pkgupgrade.Context) { + if p.client == nil { + ctx.T.Fatal("prober isn't initiated (client is nil)") + return } - if len(eventErrs) == 0 { - t.Logf("All %d events propagated well", eventCount) + // use T from new test + p.client.T = ctx.T + t := ctx.T + defer testlib.TearDown(p.client) + defer p.remove() + p.Finish() + waitAfterFinished(p.prober) + + errors, events := p.prober.Verify() + if len(errors) == 0 { + t.Logf("All %d events propagated well", events) } else { t.Logf("There were %d events propagated, but %d errors occurred. "+ - "Listing them below.", eventCount, len(eventErrs)) + "Listing them below.", events, len(errors)) } - prober.ReportErrors(t, eventErrs) + + p.ReportErrors(errors) +} + +// RunEventProber starts a single Prober of the given domain. +// Deprecated: use CreateProbeRunner func instead. +func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { + log.Warn("prober.RunEventProber is deprecated. Use CreateProbeRunner instead.") + config.Ctx = ctx + p := &prober{ + log: log, + client: client, + config: config, + } + p.deploy() + return p +} + +// AssertEventProber will send finish event and then verify if all events propagated well +// Deprecated: use CreateProbeRunner func instead. +func AssertEventProber(ctx context.Context, t *testing.T, probe Prober) { + log.Warn("prober.AssertEventProber is deprecated. Use CreateProbeRunner instead.") + p := probe.(*prober) + p.client.T = t + p.config.Ctx = ctx + pr := &probeRunner{ + prober: p, + options: nil, + } + pr.Verify(pkgupgrade.Context{ + T: t, + Log: p.log, + }) } type prober struct { @@ -87,7 +147,8 @@ func (p *prober) servingClient() resources.ServingClient { } } -func (p *prober) ReportErrors(t *testing.T, errors []error) { +func (p *prober) ReportErrors(errors []error) { + t := p.client.T for _, err := range errors { if p.config.FailOnErrors { t.Error(err) @@ -127,17 +188,7 @@ func (p *prober) remove() { ensure.NoError(p.client.Tracker.Clean(true)) } -func newProber(log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { - return &prober{ - log: log, - client: client, - config: config, - } -} - -func waitAfterFinished(p Prober) { - s := p.(*prober) - cfg := s.config - s.log.Infof("Waiting %v after sender finished...", cfg.FinishedSleep) - time.Sleep(cfg.FinishedSleep) +func waitAfterFinished(p *prober) { + p.log.Infof("Waiting %v after sender finished...", p.config.FinishedSleep) + time.Sleep(p.config.FinishedSleep) } diff --git a/test/upgrade/prober/prober_test.go b/test/upgrade/prober/prober_test.go index d1e36cf5172..db658cf1902 100644 --- a/test/upgrade/prober/prober_test.go +++ b/test/upgrade/prober/prober_test.go @@ -48,12 +48,12 @@ func TestNewConfig(t *testing.T) { if s.panics { assert.Panics(t, func() { - prober.NewConfig("test-ns") + prober.NewConfig() }) return } - config := prober.NewConfig("test-ns") + config := prober.NewConfig() assert.Equal(t, s.servingUse, config.Serving.Use) assert.True(t, config.Serving.ScaleToZero) diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go index 8bab97428a4..48197c8b6a1 100644 --- a/test/upgrade/prober/sut/broker.go +++ b/test/upgrade/prober/sut/broker.go @@ -55,7 +55,7 @@ type Triggers struct { TypePrefix string } -func newBrokerAndTriggers(namespace string, eventTypes []string) SystemUnderTest { +func newBrokerAndTriggers(namespace string) SystemUnderTest { return &BrokerAndTriggers{ Namespace: namespace, Broker: Broker{ diff --git a/test/upgrade/prober/sut/default.go b/test/upgrade/prober/sut/default.go index 7d9286b5d2e..e2dfdf3bd99 100644 --- a/test/upgrade/prober/sut/default.go +++ b/test/upgrade/prober/sut/default.go @@ -17,6 +17,6 @@ limitations under the License. package sut // NewDefault will return a default SUT for this repo. -func NewDefault(namespace string, eventTypes []string) SystemUnderTest { - return newBrokerAndTriggers(namespace, eventTypes) +func NewDefault(namespace string) SystemUnderTest { + return newBrokerAndTriggers(namespace) } diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go new file mode 100644 index 00000000000..593d979d585 --- /dev/null +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -0,0 +1,66 @@ +// +build e2e + +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" + "knative.dev/eventing/test/upgrade/prober/sut" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +func TestNewDefaultE2E(t *testing.T) { + client := testlib.Setup(t, false) + defer testlib.TearDown(client) + s := sut.NewDefault(client.Namespace) + ctx := sut.Context{ + Ctx: context.Background(), + Log: log(t), + Client: client, + } + // create event logger pod and service as the subscriber + pod := recordevents.DeployEventRecordOrFail(ctx.Ctx, client, "record") + url, err := s.Deploy(ctx, duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: pod.Kind, + Namespace: pod.Namespace, + Name: pod.Name, + APIVersion: pod.APIVersion, + }, + }) + defer func() { + err := s.Teardown(ctx) + assert.NoError(t, err) + }() + assert.NoError(t, err) + assert.NotEmpty(t, url) +} + +func log(t *testing.T) *zap.SugaredLogger { + l, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + return l.Sugar() +} diff --git a/test/upgrade/prober/sut/types.go b/test/upgrade/prober/sut/types.go index 72fc42ec40f..f7bc9e1aac6 100644 --- a/test/upgrade/prober/sut/types.go +++ b/test/upgrade/prober/sut/types.go @@ -29,6 +29,8 @@ const ( defaulEventsPrefix = "dev.knative.eventing.prober.sut" ) +var eventTypes = []string{"step", "finished"} + // SystemUnderTest (SUT) represents a system that we'd like to test with // continual prober. type SystemUnderTest interface { diff --git a/test/upgrade/prober/verify.go b/test/upgrade/prober/verify.go index 0e7e8494e79..9f4c3bc929a 100644 --- a/test/upgrade/prober/verify.go +++ b/test/upgrade/prober/verify.go @@ -42,7 +42,7 @@ const ( ) // Verify will verify prober state after finished has been sent. -func (p *prober) Verify() (eventErrs []error, eventsSent int, fetchErr error) { +func (p *prober) Verify() (eventErrs []error, eventsSent int) { report := p.fetchReport() availRate := 0.0 if report.TotalRequests != 0 { @@ -53,7 +53,7 @@ func (p *prober) Verify() (eventErrs []error, eventsSent int, fetchErr error) { p.log.Infof("Availability: %.3f%%, Requests sent: %d.", availRate, report.TotalRequests) if report.State == "active" { - return nil, report.EventsSent, errors.New("report fetched too early, receiver is in active state") + p.client.T.Fatal("report fetched too early, receiver is in active state") } for _, t := range report.Thrown.Missing { eventErrs = append(eventErrs, errors.New(t)) @@ -71,7 +71,7 @@ func (p *prober) Verify() (eventErrs []error, eventsSent int, fetchErr error) { eventErrs = append(eventErrs, errors.New(t)) } } - return eventErrs, report.EventsSent, nil + return eventErrs, report.EventsSent } // Finish terminates sender which sends finished event. From a6660f79864ce6de88bb153eb15ebbe1f7a40454 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 5 May 2021 14:07:14 +0200 Subject: [PATCH 03/24] Update-codegen --- go.mod | 1 + go.sum | 1 - test/upgrade/prober/prober.go | 2 +- vendor/modules.txt | 1 + 4 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index d594d170fb6..37b578deaf2 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/pelletier/go-toml v1.8.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.9.1 + github.com/prometheus/common v0.20.0 github.com/rickb777/date v1.13.0 github.com/robfig/cron/v3 v3.0.1 github.com/rogpeppe/fastuuid v1.2.0 diff --git a/go.sum b/go.sum index 1142beec3f1..8441ade61e5 100644 --- a/go.sum +++ b/go.sum @@ -1121,7 +1121,6 @@ knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 h1:km0Rrh0T9/wA2pivQm1hqSPVw knative.dev/hack v0.0.0-20210325223819-b6ab329907d3/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3 h1:F/pVm+rB+WpyVhH9cmVn3Lh53+UI24qlnjaYiqaw1pw= knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0= -knative.dev/pkg v0.0.0-20210420053235-1afd04993622 h1:wSyDPp/LuOLeDCpvUHgKXqb4DfmCEPelsaYzC0Fojm0= knative.dev/pkg v0.0.0-20210420053235-1afd04993622/go.mod h1:UtcSLHy2XIz5blWoPTA40F87zk4O7erxkCwv+7Tsmws= knative.dev/pkg v0.0.0-20210422210038-0c5259d6504d h1:ilIOXb2KfleKeW/JXqcoQK/BKi3D4X8HVmrKYek32Go= knative.dev/pkg v0.0.0-20210422210038-0c5259d6504d/go.mod h1:UtcSLHy2XIz5blWoPTA40F87zk4O7erxkCwv+7Tsmws= diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 20dea54c910..6d2465187c4 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -54,7 +54,7 @@ type ProbeRunner interface { Verify(ctx pkgupgrade.Context) } -// CreateProbeRunner will create a runner compatibile with +// CreateProbeRunner will create a runner compatible with // pkgupgrade.BackgroundVerification interface. func CreateProbeRunner( config *Config, diff --git a/vendor/modules.txt b/vendor/modules.txt index 34dbf06164c..65217b135a2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -259,6 +259,7 @@ github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.20.0 +## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/log From 4291ff32cabf69bb732bb2fdfa7a62da09799065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 5 May 2021 20:15:52 +0200 Subject: [PATCH 04/24] Restoring configuration options, to have Deprecated status instead --- test/upgrade/continual.go | 2 +- test/upgrade/prober/configuration.go | 13 ++++- test/upgrade/prober/prober.go | 55 +++++++++++++++------ test/upgrade/prober/sut/default_e2e_test.go | 25 +++++++++- 4 files changed, 76 insertions(+), 19 deletions(-) diff --git a/test/upgrade/continual.go b/test/upgrade/continual.go index 957a04a2789..6503e29b766 100644 --- a/test/upgrade/continual.go +++ b/test/upgrade/continual.go @@ -24,7 +24,7 @@ import ( // ContinualTest will perform a continual validation of Eventing SUT. func ContinualTest() pkgupgrade.BackgroundOperation { config := prober.NewConfig() - pr := prober.CreateProbeRunner(config) + pr := prober.CreateRunner(config) return pkgupgrade.NewBackgroundVerification( "EventingContinualTest", pr.Setup, diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 9c268663dbd..3e950f1a55f 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "path" "runtime" + "strings" "text/template" "time" @@ -59,6 +60,13 @@ type Config struct { FailOnErrors bool OnDuplicate DuplicateAction Ctx context.Context + // BrokerOpts holds opts for broker. + // Deprecated: use Wathola.SystemUnderTest instead. + BrokerOpts []resources.BrokerOption + // Namespace holds namespace in which test is about to be executed. + // Deprecated: namespace is about to be taken from testlib.Client created by + // CreateRunner + Namespace string } // Wathola represents options related strictly to wathola testing tool. @@ -87,7 +95,7 @@ type ServingConfig struct { // NewConfig creates a new configuration object with default values filled in. // Values can be influenced by kelseyhightower/envconfig with // `eventing_upgrade_tests` prefix. -func NewConfig() *Config { +func NewConfig(namespace ...string) *Config { config := &Config{ Interval: Interval, FinishedSleep: defaultFinishedSleep, @@ -113,6 +121,9 @@ func NewConfig() *Config { err := envconfig.Process("eventing_upgrade_tests", config) ensure.NoError(err) + if len(namespace) > 0 { + config.Namespace = strings.Join(namespace, ",") + } return config } diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 6d2465187c4..49bc5fcda0d 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -17,6 +17,7 @@ package prober import ( "context" + "reflect" "testing" "time" @@ -25,6 +26,7 @@ import ( "go.uber.org/zap" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/upgrade/prober/sut" pkgupgrade "knative.dev/pkg/test/upgrade" ) @@ -33,7 +35,9 @@ var ( Interval = 10 * time.Millisecond ) -// Prober is the interface for a prober, which checks the result of the probes when stopped. +// Prober is the interface for a prober, which checks the result of the probes +// when stopped. +// Deprecated: use Runner instead, create it with CreateRunner func. type Prober interface { // Verify will verify prober state after finished has been send Verify() ([]error, int) @@ -45,8 +49,8 @@ type Prober interface { ReportErrors(errors []error) } -// ProbeRunner will run continual verification with provided configuration. -type ProbeRunner interface { +// Runner will run continual verification with provided configuration. +type Runner interface { // Setup will start a continual prober in background. Setup(ctx pkgupgrade.Context) @@ -54,16 +58,11 @@ type ProbeRunner interface { Verify(ctx pkgupgrade.Context) } -// CreateProbeRunner will create a runner compatible with -// pkgupgrade.BackgroundVerification interface. -func CreateProbeRunner( - config *Config, - options ...testlib.SetupClientOption, -) ProbeRunner { +// CreateRunner will create a runner compatible with +// pkgupgrade.NewBackgroundVerification func. +func CreateRunner(config *Config, options ...testlib.SetupClientOption) Runner { return &probeRunner{ - prober: &prober{ - config: config, - }, + prober: &prober{config: config}, options: options, } } @@ -74,6 +73,7 @@ type probeRunner struct { } func (p *probeRunner) Setup(ctx pkgupgrade.Context) { + p.validate(ctx) p.log = ctx.Log p.client = testlib.Setup(ctx.T, false, p.options...) p.deploy() @@ -103,10 +103,33 @@ func (p *probeRunner) Verify(ctx pkgupgrade.Context) { p.ReportErrors(errors) } +func (p *probeRunner) validate(ctx pkgupgrade.Context) { + if p.client.Namespace != "" { + ctx.Log.Warnf( + "DEPRECATED: namespace set in Config: %s. Ignoring it.", + p.client.Namespace) + } + if len(p.config.BrokerOpts) > 0 { + ctx.Log.Warn( + "DEPRECATED: BrokerOpts set in Config. Use custom SystemUnderTest") + if reflect.ValueOf(p.config.Wathola.SystemUnderTest) == reflect.ValueOf(sut.NewDefault) { + p.config.Wathola.SystemUnderTest = func(namespace string) sut.SystemUnderTest { + s := sut.NewDefault(namespace) + bt := s.(*sut.BrokerAndTriggers) + bt.Opts = append(bt.Opts, p.config.BrokerOpts...) + return bt + } + } else { + ctx.T.Fatal("Can't use given BrokerOpts, as custom SUT is used as " + + "well. Drop using BrokerOpts in favor of custom SUT.") + } + } +} + // RunEventProber starts a single Prober of the given domain. -// Deprecated: use CreateProbeRunner func instead. +// Deprecated: use CreateRunner func instead. func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { - log.Warn("prober.RunEventProber is deprecated. Use CreateProbeRunner instead.") + log.Warn("prober.RunEventProber is deprecated. Use CreateRunner instead.") config.Ctx = ctx p := &prober{ log: log, @@ -118,9 +141,9 @@ func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib } // AssertEventProber will send finish event and then verify if all events propagated well -// Deprecated: use CreateProbeRunner func instead. +// Deprecated: use CreateRunner func instead. func AssertEventProber(ctx context.Context, t *testing.T, probe Prober) { - log.Warn("prober.AssertEventProber is deprecated. Use CreateProbeRunner instead.") + log.Warn("prober.AssertEventProber is deprecated. Use CreateRunner instead.") p := probe.(*prober) p.client.T = t p.config.Ctx = ctx diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go index 593d979d585..6e04dc3ac4d 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -20,8 +20,11 @@ package sut_test import ( "context" + "fmt" "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "go.uber.org/zap" testlib "knative.dev/eventing/test/lib" @@ -40,7 +43,7 @@ func TestNewDefaultE2E(t *testing.T) { Client: client, } // create event logger pod and service as the subscriber - pod := recordevents.DeployEventRecordOrFail(ctx.Ctx, client, "record") + eis, pod := recordevents.StartEventRecordOrFail(ctx.Ctx, client, "record") url, err := s.Deploy(ctx, duckv1.Destination{ Ref: &duckv1.KReference{ Kind: pod.Kind, @@ -55,6 +58,26 @@ func TestNewDefaultE2E(t *testing.T) { }() assert.NoError(t, err) assert.NotEmpty(t, url) + ceClient, err := cloudevents.NewClientHTTP() + assert.NoError(t, err) + id := uuid.NewString() + event := cloudevents.NewEvent() + event.SetID(id) + err = ceClient.Send(ctx.Ctx, event) + assert.NoError(t, err) + eis.AssertExact(1, hasID(id)) +} + +func hasID(id string) recordevents.EventInfoMatcher { + return func(info recordevents.EventInfo) error { + if id != info.Event.ID() { + return fmt.Errorf( + "event ID don't match. Expected: %#v, Actual: %#v", + id, info.Event.ID(), + ) + } + return nil + } } func log(t *testing.T) *zap.SugaredLogger { From ea1404b11865f18eda09ddef445aa431dedd4479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Thu, 6 May 2021 01:32:32 +0200 Subject: [PATCH 05/24] Working e2e test --- test/upgrade/prober/configuration.go | 8 +- test/upgrade/prober/prober.go | 2 +- test/upgrade/prober/sut/broker.go | 41 ++++--- test/upgrade/prober/sut/default_e2e_test.go | 121 +++++++++++++++----- test/upgrade/prober/sut/types.go | 6 +- test/upgrade/prober/wathola/event/types.go | 4 +- 6 files changed, 130 insertions(+), 52 deletions(-) diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 3e950f1a55f..085191cc1ac 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -139,10 +139,10 @@ func (p *prober) deployConfiguration() { } dest := duckv1.Destination{Ref: ref} s := p.config.SystemUnderTest(p.client.Namespace) - url, err := s.Deploy(sc, dest) - if err != nil { - p.client.T.Fatal(err) - } + url := s.Deploy(sc, dest) + p.client.Cleanup(func() { + s.Teardown(sc) + }) p.deployConfigToml(url) } diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 49bc5fcda0d..95089595209 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -60,7 +60,7 @@ type Runner interface { // CreateRunner will create a runner compatible with // pkgupgrade.NewBackgroundVerification func. -func CreateRunner(config *Config, options ...testlib.SetupClientOption) Runner { +func CreateRunner(config *Config, options ...testlib.SetupClientOption) Runner { return &probeRunner{ prober: &prober{config: config}, options: options, diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go index 48197c8b6a1..fc78c6eb106 100644 --- a/test/upgrade/prober/sut/broker.go +++ b/test/upgrade/prober/sut/broker.go @@ -76,56 +76,55 @@ func newBrokerAndTriggers(namespace string) SystemUnderTest { } } -func (b *BrokerAndTriggers) Deploy(ctx Context, dest duckv1.Destination) (*apis.URL, error) { +func (b *BrokerAndTriggers) Deploy(ctx Context, dest duckv1.Destination) *apis.URL { b.deployBroker(ctx) - url, err := b.fetchURL(ctx) - if err != nil { - return nil, err - } + url := b.fetchURL(ctx) b.deployTriggers(ctx, dest) - return url, nil + return url } -func (b *BrokerAndTriggers) Teardown(ctx Context) error { +func (b *BrokerAndTriggers) Teardown(ctx Context) { ctx.Log.Debug("BrokerAndTriggers SUT should automatically teardown") - return nil } func (b *BrokerAndTriggers) deployBroker(ctx Context) { ctx.Client.CreateBrokerOrFail(b.Name, b.Broker.Opts...) } -func (b *BrokerAndTriggers) fetchURL(ctx Context) (*apis.URL, error) { +func (b *BrokerAndTriggers) fetchURL(ctx Context) *apis.URL { namespace := b.Namespace - ctx.Log.Debugf("Fetching %s broker URL for ns %s", + ctx.Log.Debugf("Fetching \"%s\" broker URL for ns %s", b.Name, namespace) meta := resources.NewMetaResource( b.Name, b.Namespace, testlib.BrokerTypeMeta, ) err := duck.WaitForResourceReady(ctx.Client.Dynamic, meta) if err != nil { - return nil, err + ctx.T.Fatal(err) } broker, err := ctx.Client.Eventing.EventingV1().Brokers(namespace).Get( ctx.Ctx, b.Name, metav1.GetOptions{}, ) if err != nil { - return nil, err + ctx.T.Fatal(err) } url := broker.Status.Address.URL - ctx.Log.Debugf("%s broker URL for ns %s is %v", b.Name, namespace, url) - return url, nil + ctx.Log.Debugf("\"%s\" broker URL for ns %s is %v", + b.Name, namespace, url) + return url } func (b *BrokerAndTriggers) deployTriggers(ctx Context, dest duckv1.Destination) { + triggers := make([]*eventingv1.Trigger, 0, len(b.Triggers.Types)) for _, eventType := range b.Triggers.Types { name := fmt.Sprintf("%s-%s", b.Name, eventType) fullType := fmt.Sprintf("%s.%s", b.Triggers.TypePrefix, eventType) subscriberOption := resources.WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { return dest }) - ctx.Log.Debugf("Creating trigger %s for type %s to route to %#v", name, eventType, dest) - ctx.Client.CreateTriggerOrFail( + ctx.Log.Debugf("Creating trigger \"%s\" for type %s to route to %#v", + name, eventType, dest) + trgr := ctx.Client.CreateTriggerOrFail( name, resources.WithBroker(b.Name), resources.WithAttributesTriggerFilter( @@ -135,5 +134,15 @@ func (b *BrokerAndTriggers) deployTriggers(ctx Context, dest duckv1.Destination) ), subscriberOption, ) + triggers = append(triggers, trgr) + } + for _, trgr := range triggers { + meta := resources.NewMetaResource( + trgr.Name, trgr.Namespace, testlib.TriggerTypeMeta, + ) + err := duck.WaitForResourceReady(ctx.Client.Dynamic, meta) + if err != nil { + ctx.T.Fatal(err) + } } } diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go index 6e04dc3ac4d..661b78652f4 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -21,60 +21,129 @@ package sut_test import ( "context" "fmt" + "reflect" "testing" cloudevents "github.com/cloudevents/sdk-go/v2" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "go.uber.org/zap" + "k8s.io/apiserver/pkg/storage/names" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" + "knative.dev/eventing/test/lib/resources" "knative.dev/eventing/test/upgrade/prober/sut" + watholaevent "knative.dev/eventing/test/upgrade/prober/wathola/event" + watholasender "knative.dev/eventing/test/upgrade/prober/wathola/sender" + "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/signals" + pkgTest "knative.dev/pkg/test" ) func TestNewDefaultE2E(t *testing.T) { + ctx := signals.NewContext() client := testlib.Setup(t, false) defer testlib.TearDown(client) s := sut.NewDefault(client.Namespace) - ctx := sut.Context{ - Ctx: context.Background(), + sutCtx := sut.Context{ + Ctx: ctx, Log: log(t), Client: client, } // create event logger pod and service as the subscriber - eis, pod := recordevents.StartEventRecordOrFail(ctx.Ctx, client, "record") - url, err := s.Deploy(ctx, duckv1.Destination{ + receiverName := "receiver" + eis, pod := recordevents.StartEventRecordOrFail(ctx, client, receiverName) + + ref := pkgTest.CoreV1ObjectReference( + resources.ServiceKind, resources.CoreAPIVersion, receiverName) + url := s.Deploy(sutCtx, duckv1.Destination{ Ref: &duckv1.KReference{ - Kind: pod.Kind, + APIVersion: ref.APIVersion, + Kind: ref.Kind, Namespace: pod.Namespace, Name: pod.Name, - APIVersion: pod.APIVersion, }, }) - defer func() { - err := s.Teardown(ctx) - assert.NoError(t, err) - }() - assert.NoError(t, err) + defer s.Teardown(sutCtx) assert.NotEmpty(t, url) - ceClient, err := cloudevents.NewClientHTTP() - assert.NoError(t, err) - id := uuid.NewString() - event := cloudevents.NewEvent() - event.SetID(id) - err = ceClient.Send(ctx.Ctx, event) - assert.NoError(t, err) - eis.AssertExact(1, hasID(id)) + step1 := watholasender.NewCloudEvent(watholaevent.Step{Number: 1}, + watholaevent.StepType) + step2 := watholasender.NewCloudEvent(watholaevent.Step{Number: 2}, + watholaevent.StepType) + finished := watholasender.NewCloudEvent(watholaevent.Finished{EventsSent: 2}, + watholaevent.FinishedType) + + sender := testingSender{t: t, ctx: ctx, client: client, url: url} + sender.send(step1) + sender.send(step2) + sender.send(finished) + + eis.AssertAtLeast(2, eventType(watholaevent.StepType)) + eis.AssertAtLeast(1, eventType(watholaevent.FinishedType)) + eis.AssertAtLeast(1, stepEvent(watholaevent.Step{Number: 1})) + eis.AssertAtLeast(1, stepEvent(watholaevent.Step{Number: 2})) + eis.AssertAtLeast(1, finishedEvent(watholaevent.Finished{EventsSent: 2})) +} + +type testingSender struct { + t *testing.T + ctx context.Context + client *testlib.Client + url *apis.URL +} + +func (s *testingSender) send(event cloudevents.Event) { + s.t.Helper() + senderName := names.SimpleNameGenerator.GenerateName("sender-") + s.client.SendEvent(s.ctx, senderName, s.url.String(), event) +} + +func eventType(_type string) recordevents.EventInfoMatcher { + return func(info recordevents.EventInfo) error { + actual := info.Event.Type() + if actual == _type { + return nil + } + return fmt.Errorf("event type don't match. want: '%s', got: '%s'", + _type, actual) + } +} + +func finishedEvent(expected watholaevent.Finished) recordevents.EventInfoMatcher { + return func(info recordevents.EventInfo) error { + ce := info.Event + if ce.Type() == watholaevent.FinishedType { + actual := &watholaevent.Finished{} + err := ce.DataAs(actual) + if err != nil { + return err + } + if reflect.DeepEqual(actual, expected) { + return fmt.Errorf( + "finished event don't match. want: %#v, got: %#v", + expected, actual, + ) + } + } + return nil + } } -func hasID(id string) recordevents.EventInfoMatcher { +func stepEvent(expected watholaevent.Step) recordevents.EventInfoMatcher { return func(info recordevents.EventInfo) error { - if id != info.Event.ID() { - return fmt.Errorf( - "event ID don't match. Expected: %#v, Actual: %#v", - id, info.Event.ID(), - ) + ce := info.Event + if ce.Type() == watholaevent.StepType { + actual := &watholaevent.Step{} + err := ce.DataAs(actual) + if err != nil { + return err + } + if reflect.DeepEqual(actual, expected) { + return fmt.Errorf( + "step event don't match. want: %#v, got: %#v", + expected, actual, + ) + } } return nil } diff --git a/test/upgrade/prober/sut/types.go b/test/upgrade/prober/sut/types.go index f7bc9e1aac6..40e9768f0eb 100644 --- a/test/upgrade/prober/sut/types.go +++ b/test/upgrade/prober/sut/types.go @@ -26,7 +26,7 @@ import ( ) const ( - defaulEventsPrefix = "dev.knative.eventing.prober.sut" + defaulEventsPrefix = "dev.knative.eventing.wathola" ) var eventTypes = []string{"step", "finished"} @@ -36,10 +36,10 @@ var eventTypes = []string{"step", "finished"} type SystemUnderTest interface { // Deploy is responsible for deploying SUT and returning a URL to feed // events into. - Deploy(ctx Context, destination duckv1.Destination) (*apis.URL, error) + Deploy(ctx Context, destination duckv1.Destination) *apis.URL // Teardown will remove all deployed SUT resources. - Teardown(ctx Context) error + Teardown(ctx Context) } // Context represents a context of system under test that we'd diff --git a/test/upgrade/prober/wathola/event/types.go b/test/upgrade/prober/wathola/event/types.go index f6559dab5e8..2a7fe264bb8 100644 --- a/test/upgrade/prober/wathola/event/types.go +++ b/test/upgrade/prober/wathola/event/types.go @@ -23,9 +23,9 @@ import ( const ( // StepType is a string type representation of step event - StepType = "com.github.cardil.wathola.step" + StepType = "dev.knative.eventing.wathola.step" // FinishedType os a string type representation of finished event - FinishedType = "com.github.cardil.wathola.finished" + FinishedType = "dev.knative.eventing.wathola.finished" ) // Step is a event call at each step of verification From 94b20643ab0dfd4079d15c953b8e04ad822ed8b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Thu, 6 May 2021 12:52:46 +0200 Subject: [PATCH 06/24] Fixing NPE in upgrade tests --- test/upgrade/prober/prober.go | 2 +- test/upgrade/prober/sut/default_e2e_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 95089595209..84d7b799d0e 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -104,7 +104,7 @@ func (p *probeRunner) Verify(ctx pkgupgrade.Context) { } func (p *probeRunner) validate(ctx pkgupgrade.Context) { - if p.client.Namespace != "" { + if p.config.Namespace != "" { ctx.Log.Warnf( "DEPRECATED: namespace set in Config: %s. Ignoring it.", p.client.Namespace) diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go index 661b78652f4..1d9b10a5542 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -1,4 +1,4 @@ -// +build e2e +// +build upgrade /* Copyright 2021 The Knative Authors @@ -40,7 +40,7 @@ import ( pkgTest "knative.dev/pkg/test" ) -func TestNewDefaultE2E(t *testing.T) { +func TestSUTNewDefaultE2E(t *testing.T) { ctx := signals.NewContext() client := testlib.Setup(t, false) defer testlib.TearDown(client) From 51be9e0ed11c24e76ff9024361aa2f097df427db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Thu, 6 May 2021 13:52:00 +0200 Subject: [PATCH 07/24] Introducing new interfaces for configuration and easy execution --- test/e2e/broker_redelivery_test.go | 3 +- test/e2e/helpers/broker_test_helper.go | 3 +- test/upgrade/continual.go | 9 +--- test/upgrade/prober/configuration.go | 59 +++++++++++++++++++++++--- test/upgrade/prober/continual.go | 57 +++++++++++++++++++++++++ test/upgrade/prober/prober.go | 22 ++++++---- 6 files changed, 128 insertions(+), 25 deletions(-) create mode 100644 test/upgrade/prober/continual.go diff --git a/test/e2e/broker_redelivery_test.go b/test/e2e/broker_redelivery_test.go index 60a61cf19e2..0da4b79a4cb 100644 --- a/test/e2e/broker_redelivery_test.go +++ b/test/e2e/broker_redelivery_test.go @@ -44,8 +44,7 @@ func ChannelBasedBrokerCreator(channel metav1.TypeMeta, brokerClass string) help backoff := eventingduckv1.BackoffPolicyLinear // create a new broker. - client.CreateBrokerOrFail( - brokerName, + client.CreateBrokerOrFail(brokerName, resources.WithBrokerClassForBroker(brokerClass), resources.WithConfigForBroker(config), func(broker *v1.Broker) { diff --git a/test/e2e/helpers/broker_test_helper.go b/test/e2e/helpers/broker_test_helper.go index 8c24695493f..de6a8ef4212 100644 --- a/test/e2e/helpers/broker_test_helper.go +++ b/test/e2e/helpers/broker_test_helper.go @@ -107,8 +107,7 @@ func ChannelBasedBrokerCreator(channel metav1.TypeMeta, brokerClass string) Brok switch version { case "v1": - client.CreateBrokerOrFail( - brokerName, + client.CreateBrokerOrFail(brokerName, resources.WithBrokerClassForBroker(brokerClass), resources.WithConfigForBroker(config), ) diff --git a/test/upgrade/continual.go b/test/upgrade/continual.go index 6503e29b766..56d1bb54822 100644 --- a/test/upgrade/continual.go +++ b/test/upgrade/continual.go @@ -23,11 +23,6 @@ import ( // ContinualTest will perform a continual validation of Eventing SUT. func ContinualTest() pkgupgrade.BackgroundOperation { - config := prober.NewConfig() - pr := prober.CreateRunner(config) - return pkgupgrade.NewBackgroundVerification( - "EventingContinualTest", - pr.Setup, - pr.Verify, - ) + return prober.NewContinualVerification("EventingContinualTest", + prober.ContinualVerificationOptions{}) } diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 085191cc1ac..df95693e31e 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io/ioutil" + "os" "path" "runtime" "strings" @@ -32,6 +33,7 @@ import ( "knative.dev/eventing/test/upgrade/prober/sut" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + pkgupgrade "knative.dev/pkg/test/upgrade" ) const ( @@ -46,6 +48,12 @@ const ( Silence DuplicateAction = "silence" Warn DuplicateAction = "warn" Error DuplicateAction = "error" + + prefix = "eventing_upgrade_tests" + + // TODO(ksuszyns): Remove this val in next release + // Deprecated: use 'eventing_upgrade_tests' prefix instead + deprecatedPrefix = "e2e_upgrade_tests" ) // DuplicateAction is the action to take in case of duplicated events @@ -61,11 +69,13 @@ type Config struct { OnDuplicate DuplicateAction Ctx context.Context // BrokerOpts holds opts for broker. + // TODO(ksuszyns): Remove this opt in next release // Deprecated: use Wathola.SystemUnderTest instead. BrokerOpts []resources.BrokerOption // Namespace holds namespace in which test is about to be executed. + // TODO(ksuszyns): Remove this opt in next release // Deprecated: namespace is about to be taken from testlib.Client created by - // CreateRunner + // NewRunner Namespace string } @@ -92,10 +102,38 @@ type ServingConfig struct { ScaleToZero bool } +// NewConfigOrFail will create a prober.Config or fail trying. +func NewConfigOrFail(c pkgupgrade.Context) *Config { + errSink := func(err error) { + c.T.Fatal(err) + } + warnf := c.Log.Warnf + return newConfig(errSink, warnf) +} + // NewConfig creates a new configuration object with default values filled in. // Values can be influenced by kelseyhightower/envconfig with // `eventing_upgrade_tests` prefix. +// TODO(ksuszyns): Remove this func in next release +// Deprecated: use NewContinualVerification or NewConfigOrFail func NewConfig(namespace ...string) *Config { + errSink := func(err error) { + ensure.NoError(err) + } + warnf := func(template string, args ...interface{}) { + _, err := fmt.Fprintf(os.Stderr, template, args) + ensure.NoError(err) + } + config := newConfig(errSink, warnf) + if len(namespace) > 0 { + config.Namespace = strings.Join(namespace, ",") + } + return config +} + +func newConfig( + errSink func(err error), warnf func(template string, args ...interface{}), +) *Config { config := &Config{ Interval: Interval, FinishedSleep: defaultFinishedSleep, @@ -119,10 +157,21 @@ func NewConfig(namespace ...string) *Config { }, } - err := envconfig.Process("eventing_upgrade_tests", config) - ensure.NoError(err) - if len(namespace) > 0 { - config.Namespace = strings.Join(namespace, ",") + if err := envconfig.Process(prefix, config); err != nil { + errSink(err) + } + + // TODO(ksuszyns): Remove this block in next release + for _, enventry := range os.Environ() { + if strings.HasPrefix(strings.ToLower(enventry), deprecatedPrefix) { + warnf( + "DEPRECATED: using deprecated '%s' prefix. Use '%s' instead.", + deprecatedPrefix, prefix) + if err := envconfig.Process(deprecatedPrefix, config); err != nil { + errSink(err) + } + break + } } return config } diff --git a/test/upgrade/prober/continual.go b/test/upgrade/prober/continual.go new file mode 100644 index 00000000000..ce7a2a88845 --- /dev/null +++ b/test/upgrade/prober/continual.go @@ -0,0 +1,57 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + testlib "knative.dev/eventing/test/lib" + pkgupgrade "knative.dev/pkg/test/upgrade" +) + +// Configurator will customize default config. +type Configurator func(config *Config) error + +// ContinualVerificationOptions holds options for NewContinualVerification func. +type ContinualVerificationOptions struct { + Configurators []Configurator + ClientOptions []testlib.SetupClientOption +} + +// NewContinualVerification will create a new continual verification operation +// that will verify that SUT is propagating events well through the test +// process. It's a general pattern that can be used to validate upgrade or +// performance testing. +func NewContinualVerification( + name string, opts ContinualVerificationOptions, +) pkgupgrade.BackgroundOperation { + var config *Config + var runner Runner + setup := func(c pkgupgrade.Context) { + config = NewConfigOrFail(c) + if len(opts.Configurators) > 0 { + for _, fn := range opts.Configurators { + err := fn(config) + if err != nil { + c.T.Fatal(err) + } + } + } + runner = NewRunner(config, opts.ClientOptions...) + runner.Setup(c) + } + verify := runner.Verify + return pkgupgrade.NewBackgroundVerification(name, setup, verify) +} diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 84d7b799d0e..96fc2a26944 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -37,7 +37,8 @@ var ( // Prober is the interface for a prober, which checks the result of the probes // when stopped. -// Deprecated: use Runner instead, create it with CreateRunner func. +// TODO(ksuszyns): Remove this interface in next release +// Deprecated: use Runner instead, create it with NewRunner func. type Prober interface { // Verify will verify prober state after finished has been send Verify() ([]error, int) @@ -58,9 +59,9 @@ type Runner interface { Verify(ctx pkgupgrade.Context) } -// CreateRunner will create a runner compatible with -// pkgupgrade.NewBackgroundVerification func. -func CreateRunner(config *Config, options ...testlib.SetupClientOption) Runner { +// NewRunner will create a runner compatible with NewContinualVerification +// func. +func NewRunner(config *Config, options ...testlib.SetupClientOption) Runner { return &probeRunner{ prober: &prober{config: config}, options: options, @@ -127,9 +128,10 @@ func (p *probeRunner) validate(ctx pkgupgrade.Context) { } // RunEventProber starts a single Prober of the given domain. -// Deprecated: use CreateRunner func instead. +// TODO(ksuszyns): Remove this func in next release +// Deprecated: use NewRunner func instead. func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { - log.Warn("prober.RunEventProber is deprecated. Use CreateRunner instead.") + log.Warn("prober.RunEventProber is deprecated. Use NewRunner instead.") config.Ctx = ctx p := &prober{ log: log, @@ -140,10 +142,12 @@ func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib return p } -// AssertEventProber will send finish event and then verify if all events propagated well -// Deprecated: use CreateRunner func instead. +// AssertEventProber will send finish event and then verify if all events +// propagated well. +// TODO(ksuszyns): Remove this func in next release +// Deprecated: use NewRunner func instead. func AssertEventProber(ctx context.Context, t *testing.T, probe Prober) { - log.Warn("prober.AssertEventProber is deprecated. Use CreateRunner instead.") + log.Warn("prober.AssertEventProber is deprecated. Use NewRunner instead.") p := probe.(*prober) p.client.T = t p.config.Ctx = ctx From 2f3cb5708c71ead26edf8adbc665fb65a9f96245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Thu, 6 May 2021 14:13:40 +0200 Subject: [PATCH 08/24] NPE fix in continual.go@55 --- test/upgrade/prober/continual.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/upgrade/prober/continual.go b/test/upgrade/prober/continual.go index ce7a2a88845..237fdb582e4 100644 --- a/test/upgrade/prober/continual.go +++ b/test/upgrade/prober/continual.go @@ -52,6 +52,8 @@ func NewContinualVerification( runner = NewRunner(config, opts.ClientOptions...) runner.Setup(c) } - verify := runner.Verify + verify := func(c pkgupgrade.Context) { + runner.Verify(c) + } return pkgupgrade.NewBackgroundVerification(name, setup, verify) } From 01fb8da8554ea01535d254ce6c2183d810b159da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Thu, 6 May 2021 14:28:12 +0200 Subject: [PATCH 09/24] Removal of ensure.NoError --- test/upgrade/prober/configuration.go | 6 +++--- test/upgrade/prober/errors.go | 23 +++++++++++++++++++++++ test/upgrade/prober/forwarder.go | 8 ++++---- test/upgrade/prober/prober.go | 5 ++--- test/upgrade/prober/receiver.go | 25 ++++--------------------- test/upgrade/prober/sender.go | 5 ++--- test/upgrade/prober/verify.go | 15 +++++++-------- 7 files changed, 45 insertions(+), 42 deletions(-) create mode 100644 test/upgrade/prober/errors.go diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index df95693e31e..3d79ec69059 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -208,9 +208,9 @@ func (p *prober) compileTemplate(templateName string, brokerURL fmt.Stringer) st _, filename, _, _ := runtime.Caller(0) templateFilepath := path.Join(path.Dir(filename), templateName) templateBytes, err := ioutil.ReadFile(templateFilepath) - ensure.NoError(err) + p.ensureNoError(err) tmpl, err := template.New(templateName).Parse(string(templateBytes)) - ensure.NoError(err) + p.ensureNoError(err) var buff bytes.Buffer data := struct { *Config @@ -221,6 +221,6 @@ func (p *prober) compileTemplate(templateName string, brokerURL fmt.Stringer) st p.client.Namespace, brokerURL.String(), } - ensure.NoError(tmpl.Execute(&buff, data)) + p.ensureNoError(tmpl.Execute(&buff, data)) return buff.String() } diff --git a/test/upgrade/prober/errors.go b/test/upgrade/prober/errors.go new file mode 100644 index 00000000000..e70da17bd9b --- /dev/null +++ b/test/upgrade/prober/errors.go @@ -0,0 +1,23 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +func (p *prober) ensureNoError(err error) { + if err != nil { + p.client.T.Fatal(err) + } +} diff --git a/test/upgrade/prober/forwarder.go b/test/upgrade/prober/forwarder.go index 8e3bd765883..98fafb3b235 100644 --- a/test/upgrade/prober/forwarder.go +++ b/test/upgrade/prober/forwarder.go @@ -18,7 +18,6 @@ package prober import ( "fmt" - "github.com/wavesoftware/go-ensure" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" testlib "knative.dev/eventing/test/lib" @@ -35,8 +34,9 @@ func (p *prober) deployForwarder() { p.log.Infof("Deploy forwarder knative service: %v", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) service := p.forwarderKService(forwarderName, p.client.Namespace) - _, err := serving.Create(p.config.Ctx, service, metav1.CreateOptions{}) - ensure.NoError(err) + if _, err := serving.Create(p.config.Ctx, service, metav1.CreateOptions{}); err != nil { + p.client.T.Fatal(err) + } sc := p.servingClient() testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarderName), func() error { @@ -56,7 +56,7 @@ func (p *prober) removeForwarder() { p.log.Infof("Remove forwarder knative service: %v", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) err := serving.Delete(p.config.Ctx, forwarderName, metav1.DeleteOptions{}) - ensure.NoError(err) + p.ensureNoError(err) } func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstructured { diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 96fc2a26944..09ef86a231c 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -22,7 +22,6 @@ import ( "time" "github.com/prometheus/common/log" - "github.com/wavesoftware/go-ensure" "go.uber.org/zap" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" @@ -201,7 +200,7 @@ func (p *prober) deploy() { p.client.WaitForAllTestResourcesReadyOrFail(p.config.Ctx) p.deploySender() - ensure.NoError(testlib.AwaitForAll(p.log)) + p.ensureNoError(testlib.AwaitForAll(p.log)) // allow sender to send at least some events, 2 sec wait time.Sleep(2 * time.Second) p.log.Infof("Prober is now sending events with interval of %v in "+ @@ -212,7 +211,7 @@ func (p *prober) remove() { if p.config.Serving.Use { p.removeForwarder() } - ensure.NoError(p.client.Tracker.Clean(true)) + p.ensureNoError(p.client.Tracker.Clean(true)) } func waitAfterFinished(p *prober) { diff --git a/test/upgrade/prober/receiver.go b/test/upgrade/prober/receiver.go index f558448a061..06ab3cf829a 100644 --- a/test/upgrade/prober/receiver.go +++ b/test/upgrade/prober/receiver.go @@ -18,7 +18,6 @@ package prober import ( "fmt" - "github.com/wavesoftware/go-ensure" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,8 +28,7 @@ import ( ) var ( - receiverName = "wathola-receiver" - receiverNodePort int32 = -1 + receiverName = "wathola-receiver" ) func (p *prober) deployReceiver() { @@ -41,10 +39,7 @@ func (p *prober) deployReceiver() { func (p *prober) deployReceiverDeployment() { p.log.Info("Deploy of receiver deployment: ", receiverName) deployment := p.createReceiverDeployment() - _, err := p.client.Kube.AppsV1(). - Deployments(deployment.Namespace). - Create(p.config.Ctx, deployment, metav1.CreateOptions{}) - ensure.NoError(err) + p.client.CreateDeploymentOrFail(deployment) testlib.WaitFor(fmt.Sprint("receiver deployment be ready: ", receiverName), func() error { return pkgTest.WaitForDeploymentScale( @@ -75,22 +70,10 @@ func (p *prober) deployReceiverService() { Selector: map[string]string{ "app": receiverName, }, - Type: corev1.ServiceTypeNodePort, + Type: corev1.ServiceTypeClusterIP, }, } - created, err := p.client.Kube.CoreV1().Services(p.client.Namespace). - Create(p.config.Ctx, service, metav1.CreateOptions{}) - ensure.NoError(err) - for _, portSpec := range created.Spec.Ports { - if portSpec.Port == 80 { - receiverNodePort = portSpec.NodePort - } - } - if receiverNodePort == -1 { - panic(fmt.Errorf("couldn't find a node port for service: %v", receiverName)) - } else { - p.log.Debugf("Node port for service: %v is %v", receiverName, receiverNodePort) - } + p.client.CreateServiceOrFail(service) } func (p *prober) createReceiverDeployment() *appsv1.Deployment { diff --git a/test/upgrade/prober/sender.go b/test/upgrade/prober/sender.go index a4c0cfd0e0e..ec5b83ecb05 100644 --- a/test/upgrade/prober/sender.go +++ b/test/upgrade/prober/sender.go @@ -18,7 +18,6 @@ package prober import ( "fmt" - "github.com/wavesoftware/go-ensure" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -77,7 +76,7 @@ func (p *prober) deploySender() { _, err := p.client.Kube.AppsV1(). Deployments(p.client.Namespace). Create(p.config.Ctx, deployment, metav1.CreateOptions{}) - ensure.NoError(err) + p.ensureNoError(err) testlib.WaitFor(fmt.Sprint("sender deployment be ready: ", senderName), func() error { return pkgTest.WaitForDeploymentScale( @@ -92,5 +91,5 @@ func (p *prober) removeSender() { err := p.client.Kube.AppsV1(). Deployments(p.client.Namespace). Delete(p.config.Ctx, senderName, metav1.DeleteOptions{}) - ensure.NoError(err) + p.ensureNoError(err) } diff --git a/test/upgrade/prober/verify.go b/test/upgrade/prober/verify.go index 9f4c3bc929a..ec9d7989fc9 100644 --- a/test/upgrade/prober/verify.go +++ b/test/upgrade/prober/verify.go @@ -22,7 +22,6 @@ import ( "fmt" "time" - "github.com/wavesoftware/go-ensure" "go.uber.org/zap" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -105,9 +104,9 @@ func (p *prober) fetchExecution() *fetcher.Execution { job := p.deployFetcher() defer p.deleteFetcher() pod, err := p.findSucceededPod(job) - ensure.NoError(err) + p.ensureNoError(err) bytes, err := pkgTest.PodLogs(p.config.Ctx, p.client.Kube, pod.Name, fetcherName, ns) - ensure.NoError(err) + p.ensureNoError(err) ex := &fetcher.Execution{ Logs: []fetcher.LogEntry{}, Report: &receiver.Report{ @@ -123,7 +122,7 @@ func (p *prober) fetchExecution() *fetcher.Execution { }, } err = json.Unmarshal(bytes, ex) - ensure.NoError(err) + p.ensureNoError(err) return ex } @@ -171,10 +170,10 @@ func (p *prober) deployFetcher() *batchv1.Job { }, } created, err := jobs.Create(p.config.Ctx, fetcherJob, metav1.CreateOptions{}) - ensure.NoError(err) + p.ensureNoError(err) p.log.Info("Waiting for fetcher job to succeed: ", fetcherName) err = waitForJobToComplete(p.config.Ctx, p.client.Kube, fetcherName, p.client.Namespace) - ensure.NoError(err) + p.ensureNoError(err) return created } @@ -183,7 +182,7 @@ func (p *prober) deleteFetcher() { ns := p.client.Namespace jobs := p.client.Kube.BatchV1().Jobs(ns) err := jobs.Delete(p.config.Ctx, fetcherName, metav1.DeleteOptions{}) - ensure.NoError(err) + p.ensureNoError(err) } func (p *prober) findSucceededPod(job *batchv1.Job) (*corev1.Pod, error) { @@ -191,7 +190,7 @@ func (p *prober) findSucceededPod(job *batchv1.Job) (*corev1.Pod, error) { podList, err := pods.List(p.config.Ctx, metav1.ListOptions{ LabelSelector: fmt.Sprint("job-name=", job.Name), }) - ensure.NoError(err) + p.ensureNoError(err) for _, pod := range podList.Items { if pod.Status.Phase == corev1.PodSucceeded { return &pod, nil From 9c65cce43fde6817f9f685f065e360b752737e07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Thu, 6 May 2021 15:52:08 +0200 Subject: [PATCH 10/24] Raising UnavailablePeriodToReport to 10s to make tests more stable --- test/upgrade/prober/wathola/config/defaults.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/upgrade/prober/wathola/config/defaults.go b/test/upgrade/prober/wathola/config/defaults.go index a76a7c20c7a..c821d15124a 100644 --- a/test/upgrade/prober/wathola/config/defaults.go +++ b/test/upgrade/prober/wathola/config/defaults.go @@ -62,7 +62,7 @@ func defaultValues() *Config { Duration: time.Second, }, Errors: ReceiverErrorConfig{ - UnavailablePeriodToReport: 5 * time.Second, + UnavailablePeriodToReport: 10 * time.Second, }, }, Forwarder: ForwarderConfig{ From 2172493a38152c4dd2732ee148cf801e1cdb691b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Fri, 7 May 2021 12:05:30 +0200 Subject: [PATCH 11/24] Running all e2e tagged tests --- test/e2e-tests.sh | 2 +- test/upgrade/prober/sut/default_e2e_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index f1fabed9204..8731be48768 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -33,7 +33,7 @@ source "$(dirname "$0")/e2e-common.sh" initialize $@ --skip-istio-addon echo "Running E2E tests for: Multi Tenant Channel Based Broker, Channel (v1), InMemoryChannel (v1) , ApiServerSource (v1), ContainerSource (v1) and PingSource (v1beta2)" -go_test_e2e -timeout=30m -parallel=20 ./test/e2e \ +go_test_e2e -timeout=30m -parallel=20 ./test/... \ -brokerclass=MTChannelBasedBroker \ -channels=messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel \ -sources=sources.knative.dev/v1beta2:PingSource,sources.knative.dev/v1:ApiServerSource,sources.knative.dev/v1:ContainerSource \ diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go index 1d9b10a5542..418f9bc4a85 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -1,4 +1,4 @@ -// +build upgrade +// +build e2e /* Copyright 2021 The Knative Authors From fb39a4b455cdccda642818b15d79a811a08bd061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Fri, 7 May 2021 12:06:20 +0200 Subject: [PATCH 12/24] Raising unavailibility period to 60s as load & retries makes it longer. --- test/upgrade/prober/wathola/config/defaults.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/upgrade/prober/wathola/config/defaults.go b/test/upgrade/prober/wathola/config/defaults.go index c821d15124a..386ee822156 100644 --- a/test/upgrade/prober/wathola/config/defaults.go +++ b/test/upgrade/prober/wathola/config/defaults.go @@ -62,7 +62,7 @@ func defaultValues() *Config { Duration: time.Second, }, Errors: ReceiverErrorConfig{ - UnavailablePeriodToReport: 10 * time.Second, + UnavailablePeriodToReport: time.Minute, }, }, Forwarder: ForwarderConfig{ From e1a63a04e56f6f3a382c57e967095328510e030a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Fri, 7 May 2021 18:53:09 +0200 Subject: [PATCH 13/24] Execute upgrade e2e tests with additional run --- test/e2e-tests.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 8731be48768..b31036000e7 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -33,10 +33,13 @@ source "$(dirname "$0")/e2e-common.sh" initialize $@ --skip-istio-addon echo "Running E2E tests for: Multi Tenant Channel Based Broker, Channel (v1), InMemoryChannel (v1) , ApiServerSource (v1), ContainerSource (v1) and PingSource (v1beta2)" -go_test_e2e -timeout=30m -parallel=20 ./test/... \ +go_test_e2e -timeout=30m -parallel=20 ./test/e2e \ -brokerclass=MTChannelBasedBroker \ -channels=messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel \ -sources=sources.knative.dev/v1beta2:PingSource,sources.knative.dev/v1:ApiServerSource,sources.knative.dev/v1:ContainerSource \ || fail_test +echo "Running E2E tests for upgrade tests" +go_test_e2e -timeout=10m -parallel=20 ./test/upgrade/... || fail_test + success From 3ff62f9b60c04bbe4ad2e7ec54ed853db5438d7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Mon, 10 May 2021 11:25:26 +0200 Subject: [PATCH 14/24] Using SutURL in wathola config instead of BrokerURL --- test/upgrade/prober/config.toml | 2 +- test/upgrade/prober/configuration.go | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/test/upgrade/prober/config.toml b/test/upgrade/prober/config.toml index 9029beda9d4..52f14de9d05 100644 --- a/test/upgrade/prober/config.toml +++ b/test/upgrade/prober/config.toml @@ -1,6 +1,6 @@ # logLevel = 5 # DEBUG(5) [sender] -address = '{{- .BrokerURL -}}' +address = '{{- .SutURL -}}' interval = {{ .Config.Interval.Nanoseconds }} [forwarder] target = 'http://wathola-receiver.{{- .Namespace -}}.svc.cluster.local' diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 3d79ec69059..1ed31d1ad7a 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -31,7 +31,6 @@ import ( "github.com/wavesoftware/go-ensure" "knative.dev/eventing/test/lib/resources" "knative.dev/eventing/test/upgrade/prober/sut" - "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" pkgupgrade "knative.dev/pkg/test/upgrade" ) @@ -195,31 +194,34 @@ func (p *prober) deployConfiguration() { p.deployConfigToml(url) } -func (p *prober) deployConfigToml(url *apis.URL) { +func (p *prober) deployConfigToml(sutUrl fmt.Stringer) { name := p.config.ConfigMapName p.log.Infof("Deploying config map: \"%s/%s\"", p.client.Namespace, name) - configData := p.compileTemplate(p.config.ConfigTemplate, url) + configData := p.compileTemplate(p.config.ConfigTemplate, sutUrl) p.client.CreateConfigMapOrFail(name, p.client.Namespace, map[string]string{ p.config.ConfigFilename: configData, }) } -func (p *prober) compileTemplate(templateName string, brokerURL fmt.Stringer) string { +func (p *prober) compileTemplate(templateName string, sutUrl fmt.Stringer) string { _, filename, _, _ := runtime.Caller(0) templateFilepath := path.Join(path.Dir(filename), templateName) templateBytes, err := ioutil.ReadFile(templateFilepath) p.ensureNoError(err) tmpl, err := template.New(templateName).Parse(string(templateBytes)) p.ensureNoError(err) + u := sutUrl.String() var buff bytes.Buffer data := struct { *Config Namespace string + // Deprecated: use SutURL BrokerURL string + SutURL string }{ p.config, p.client.Namespace, - brokerURL.String(), + u, u, } p.ensureNoError(tmpl.Execute(&buff, data)) return buff.String() From bd2c4826d1ea0f43481c575a90e1a0f2c357dc25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Mon, 10 May 2021 13:34:48 +0200 Subject: [PATCH 15/24] Removal of redundant namespace option --- test/upgrade/prober/configuration.go | 6 +++--- test/upgrade/prober/sut/broker.go | 8 +++----- test/upgrade/prober/sut/default.go | 4 ++-- test/upgrade/prober/sut/default_e2e_test.go | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 1ed31d1ad7a..d66a1b94bbc 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -81,7 +81,7 @@ type Config struct { // Wathola represents options related strictly to wathola testing tool. type Wathola struct { ConfigToml - SystemUnderTest func(namespace string) sut.SystemUnderTest + SystemUnderTest sut.SystemUnderTest EventsTypePrefix string HealthEndpoint string } @@ -152,7 +152,7 @@ func newConfig( }, EventsTypePrefix: defaultWatholaEventsPrefix, HealthEndpoint: defaultHealthEndpoint, - SystemUnderTest: sut.NewDefault, + SystemUnderTest: sut.NewDefault(), }, } @@ -186,7 +186,7 @@ func (p *prober) deployConfiguration() { ref = resources.KnativeRefForKservice(forwarderName, p.client.Namespace) } dest := duckv1.Destination{Ref: ref} - s := p.config.SystemUnderTest(p.client.Namespace) + s := p.config.SystemUnderTest url := s.Deploy(sc, dest) p.client.Cleanup(func() { s.Teardown(sc) diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go index fc78c6eb106..236a3d88021 100644 --- a/test/upgrade/prober/sut/broker.go +++ b/test/upgrade/prober/sut/broker.go @@ -40,7 +40,6 @@ var ( type BrokerAndTriggers struct { Broker Triggers - Namespace string } // Broker will hold settings for broker itself @@ -55,9 +54,8 @@ type Triggers struct { TypePrefix string } -func newBrokerAndTriggers(namespace string) SystemUnderTest { +func newBrokerAndTriggers() SystemUnderTest { return &BrokerAndTriggers{ - Namespace: namespace, Broker: Broker{ Name: "sut", Opts: []resources.BrokerOption{ @@ -92,11 +90,11 @@ func (b *BrokerAndTriggers) deployBroker(ctx Context) { } func (b *BrokerAndTriggers) fetchURL(ctx Context) *apis.URL { - namespace := b.Namespace + namespace := ctx.Client.Namespace ctx.Log.Debugf("Fetching \"%s\" broker URL for ns %s", b.Name, namespace) meta := resources.NewMetaResource( - b.Name, b.Namespace, testlib.BrokerTypeMeta, + b.Name, namespace, testlib.BrokerTypeMeta, ) err := duck.WaitForResourceReady(ctx.Client.Dynamic, meta) if err != nil { diff --git a/test/upgrade/prober/sut/default.go b/test/upgrade/prober/sut/default.go index e2dfdf3bd99..1a9561a2c77 100644 --- a/test/upgrade/prober/sut/default.go +++ b/test/upgrade/prober/sut/default.go @@ -17,6 +17,6 @@ limitations under the License. package sut // NewDefault will return a default SUT for this repo. -func NewDefault(namespace string) SystemUnderTest { - return newBrokerAndTriggers(namespace) +func NewDefault() SystemUnderTest { + return newBrokerAndTriggers() } diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go index 418f9bc4a85..67237be7c98 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -44,7 +44,7 @@ func TestSUTNewDefaultE2E(t *testing.T) { ctx := signals.NewContext() client := testlib.Setup(t, false) defer testlib.TearDown(client) - s := sut.NewDefault(client.Namespace) + s := sut.NewDefault() sutCtx := sut.Context{ Ctx: ctx, Log: log(t), From 1c548d2dce2d5a1531a6e271e653639b527743b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Tue, 11 May 2021 19:30:47 +0200 Subject: [PATCH 16/24] Switch to using interface{} as endpoint address representation --- test/upgrade/prober/config.toml | 2 +- test/upgrade/prober/configuration.go | 23 ++++++++++++--------- test/upgrade/prober/sut/broker.go | 8 ++----- test/upgrade/prober/sut/default_e2e_test.go | 10 +++++---- test/upgrade/prober/sut/types.go | 14 +++++++++---- 5 files changed, 32 insertions(+), 25 deletions(-) diff --git a/test/upgrade/prober/config.toml b/test/upgrade/prober/config.toml index 52f14de9d05..d0367a44c23 100644 --- a/test/upgrade/prober/config.toml +++ b/test/upgrade/prober/config.toml @@ -1,6 +1,6 @@ # logLevel = 5 # DEBUG(5) [sender] -address = '{{- .SutURL -}}' +address = '{{- .Endpoint -}}' interval = {{ .Config.Interval.Nanoseconds }} [forwarder] target = 'http://wathola-receiver.{{- .Namespace -}}.svc.cluster.local' diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index d66a1b94bbc..233d0afb7f9 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -187,41 +187,44 @@ func (p *prober) deployConfiguration() { } dest := duckv1.Destination{Ref: ref} s := p.config.SystemUnderTest - url := s.Deploy(sc, dest) + endpoint := s.Deploy(sc, dest) p.client.Cleanup(func() { - s.Teardown(sc) + if tr, ok := s.(sut.HasManualTeardown); ok { + + tr.Teardown(sc) + } }) - p.deployConfigToml(url) + p.deployConfigToml(endpoint) } -func (p *prober) deployConfigToml(sutUrl fmt.Stringer) { +func (p *prober) deployConfigToml(endpoint interface{}) { name := p.config.ConfigMapName p.log.Infof("Deploying config map: \"%s/%s\"", p.client.Namespace, name) - configData := p.compileTemplate(p.config.ConfigTemplate, sutUrl) + configData := p.compileTemplate(p.config.ConfigTemplate, endpoint) p.client.CreateConfigMapOrFail(name, p.client.Namespace, map[string]string{ p.config.ConfigFilename: configData, }) } -func (p *prober) compileTemplate(templateName string, sutUrl fmt.Stringer) string { +func (p *prober) compileTemplate(templateName string, endpoint interface{}) string { _, filename, _, _ := runtime.Caller(0) templateFilepath := path.Join(path.Dir(filename), templateName) templateBytes, err := ioutil.ReadFile(templateFilepath) p.ensureNoError(err) tmpl, err := template.New(templateName).Parse(string(templateBytes)) p.ensureNoError(err) - u := sutUrl.String() var buff bytes.Buffer data := struct { *Config Namespace string - // Deprecated: use SutURL + // Deprecated: use Endpoint BrokerURL string - SutURL string + Endpoint interface{} }{ p.config, p.client.Namespace, - u, u, + fmt.Sprintf("%v", endpoint), + endpoint, } p.ensureNoError(tmpl.Execute(&buff, data)) return buff.String() diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go index 236a3d88021..d4614a07902 100644 --- a/test/upgrade/prober/sut/broker.go +++ b/test/upgrade/prober/sut/broker.go @@ -69,22 +69,18 @@ func newBrokerAndTriggers() SystemUnderTest { }, Triggers: Triggers{ Types: eventTypes, - TypePrefix: defaulEventsPrefix, + TypePrefix: defaultEventsPrefix, }, } } -func (b *BrokerAndTriggers) Deploy(ctx Context, dest duckv1.Destination) *apis.URL { +func (b *BrokerAndTriggers) Deploy(ctx Context, dest duckv1.Destination) interface{} { b.deployBroker(ctx) url := b.fetchURL(ctx) b.deployTriggers(ctx, dest) return url } -func (b *BrokerAndTriggers) Teardown(ctx Context) { - ctx.Log.Debug("BrokerAndTriggers SUT should automatically teardown") -} - func (b *BrokerAndTriggers) deployBroker(ctx Context) { ctx.Client.CreateBrokerOrFail(b.Name, b.Broker.Opts...) } diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go index 67237be7c98..410610fde7d 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -56,7 +56,7 @@ func TestSUTNewDefaultE2E(t *testing.T) { ref := pkgTest.CoreV1ObjectReference( resources.ServiceKind, resources.CoreAPIVersion, receiverName) - url := s.Deploy(sutCtx, duckv1.Destination{ + endpoint := s.Deploy(sutCtx, duckv1.Destination{ Ref: &duckv1.KReference{ APIVersion: ref.APIVersion, Kind: ref.Kind, @@ -64,8 +64,10 @@ func TestSUTNewDefaultE2E(t *testing.T) { Name: pod.Name, }, }) - defer s.Teardown(sutCtx) - assert.NotEmpty(t, url) + if tr, ok := s.(sut.HasManualTeardown); ok { + defer tr.Teardown(sutCtx) + } + assert.NotEmpty(t, endpoint) step1 := watholasender.NewCloudEvent(watholaevent.Step{Number: 1}, watholaevent.StepType) step2 := watholasender.NewCloudEvent(watholaevent.Step{Number: 2}, @@ -73,7 +75,7 @@ func TestSUTNewDefaultE2E(t *testing.T) { finished := watholasender.NewCloudEvent(watholaevent.Finished{EventsSent: 2}, watholaevent.FinishedType) - sender := testingSender{t: t, ctx: ctx, client: client, url: url} + sender := testingSender{t: t, ctx: ctx, client: client, url: endpoint.(*apis.URL)} sender.send(step1) sender.send(step2) sender.send(finished) diff --git a/test/upgrade/prober/sut/types.go b/test/upgrade/prober/sut/types.go index 40e9768f0eb..0fbf5a8623c 100644 --- a/test/upgrade/prober/sut/types.go +++ b/test/upgrade/prober/sut/types.go @@ -21,23 +21,29 @@ import ( "go.uber.org/zap" testlib "knative.dev/eventing/test/lib" - "knative.dev/pkg/apis" + watholaevent "knative.dev/eventing/test/upgrade/prober/wathola/event" duckv1 "knative.dev/pkg/apis/duck/v1" ) const ( - defaulEventsPrefix = "dev.knative.eventing.wathola" + defaultEventsPrefix = "dev.knative.eventing.wathola" ) -var eventTypes = []string{"step", "finished"} +var eventTypes = []string{ + watholaevent.Step{}.Type(), + watholaevent.Finished{}.Type(), +} // SystemUnderTest (SUT) represents a system that we'd like to test with // continual prober. type SystemUnderTest interface { // Deploy is responsible for deploying SUT and returning a URL to feed // events into. - Deploy(ctx Context, destination duckv1.Destination) *apis.URL + Deploy(ctx Context, destination duckv1.Destination) interface{} +} +// HasManualTeardown indicates that SystemUnderTest supports manual teardown. +type HasManualTeardown interface { // Teardown will remove all deployed SUT resources. Teardown(ctx Context) } From 8dad306ecf2c9c28f27c113b08b57a48f9e6fc72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Tue, 11 May 2021 20:00:51 +0200 Subject: [PATCH 17/24] Fixing compile error on prober --- test/upgrade/prober/prober.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 09ef86a231c..dabd7a2ac22 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -113,12 +113,9 @@ func (p *probeRunner) validate(ctx pkgupgrade.Context) { ctx.Log.Warn( "DEPRECATED: BrokerOpts set in Config. Use custom SystemUnderTest") if reflect.ValueOf(p.config.Wathola.SystemUnderTest) == reflect.ValueOf(sut.NewDefault) { - p.config.Wathola.SystemUnderTest = func(namespace string) sut.SystemUnderTest { - s := sut.NewDefault(namespace) - bt := s.(*sut.BrokerAndTriggers) - bt.Opts = append(bt.Opts, p.config.BrokerOpts...) - return bt - } + bt := sut.NewDefault().(*sut.BrokerAndTriggers) + bt.Opts = append(bt.Opts, p.config.BrokerOpts...) + p.config.Wathola.SystemUnderTest = bt } else { ctx.T.Fatal("Can't use given BrokerOpts, as custom SUT is used as " + "well. Drop using BrokerOpts in favor of custom SUT.") From 490b182828496281073a4dd5fd6871cbef1404f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Tue, 18 May 2021 17:01:01 +0200 Subject: [PATCH 18/24] Use full event type only --- test/upgrade/prober/sut/broker.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go index d4614a07902..661fda3ad6a 100644 --- a/test/upgrade/prober/sut/broker.go +++ b/test/upgrade/prober/sut/broker.go @@ -50,8 +50,7 @@ type Broker struct { // Triggers will hold settings for triggers type Triggers struct { - Types []string - TypePrefix string + Types []string } func newBrokerAndTriggers() SystemUnderTest { @@ -68,8 +67,7 @@ func newBrokerAndTriggers() SystemUnderTest { }, }, Triggers: Triggers{ - Types: eventTypes, - TypePrefix: defaultEventsPrefix, + Types: eventTypes, }, } } @@ -112,7 +110,6 @@ func (b *BrokerAndTriggers) deployTriggers(ctx Context, dest duckv1.Destination) triggers := make([]*eventingv1.Trigger, 0, len(b.Triggers.Types)) for _, eventType := range b.Triggers.Types { name := fmt.Sprintf("%s-%s", b.Name, eventType) - fullType := fmt.Sprintf("%s.%s", b.Triggers.TypePrefix, eventType) subscriberOption := resources.WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { return dest }) @@ -123,7 +120,7 @@ func (b *BrokerAndTriggers) deployTriggers(ctx Context, dest duckv1.Destination) resources.WithBroker(b.Name), resources.WithAttributesTriggerFilter( eventingv1.TriggerAnyFilter, - fullType, + eventType, map[string]interface{}{}, ), subscriberOption, From 0cb872f5ccbd62e6307017f0507c4fe733a091a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Tue, 18 May 2021 20:44:04 +0200 Subject: [PATCH 19/24] Remove unused event prefix --- test/upgrade/prober/sut/types.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/upgrade/prober/sut/types.go b/test/upgrade/prober/sut/types.go index 0fbf5a8623c..4868807e4f1 100644 --- a/test/upgrade/prober/sut/types.go +++ b/test/upgrade/prober/sut/types.go @@ -25,10 +25,6 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" ) -const ( - defaultEventsPrefix = "dev.knative.eventing.wathola" -) - var eventTypes = []string{ watholaevent.Step{}.Type(), watholaevent.Finished{}.Type(), From 599a1f8da5f8add8999f19b726078a471336be24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Tue, 18 May 2021 21:53:13 +0200 Subject: [PATCH 20/24] Renaming after code review --- test/upgrade/README.md | 2 +- test/upgrade/prober/configuration.go | 3 +-- test/upgrade/prober/sut/default_e2e_test.go | 2 +- test/upgrade/prober/sut/types.go | 5 +++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/upgrade/README.md b/test/upgrade/README.md index ed7a5f9da00..920560df889 100644 --- a/test/upgrade/README.md +++ b/test/upgrade/README.md @@ -63,7 +63,7 @@ To achieve that a [wathola tool](https://pkg.go.dev/knative.dev/eventing/test/upgrade/prober/wathola) was prepared. It consists of 4 components: _sender_, _forwarder_, _receiver_, and _fetcher_. _Sender_ is the usual Kubernetes deployment that publishes events -to the System under Tests (SUT). By default, SUT is a default `broker` +to the System Under Tests (SUT). By default, SUT is a default `broker` with two triggers for each type of events being sent. _Sender_ will send events with given interval. When it terminates (by either `SIGTERM`, or `SIGINT`), a `finished` event is generated. _Forwarder_ is a knative serving diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 233d0afb7f9..755e4de9438 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -189,8 +189,7 @@ func (p *prober) deployConfiguration() { s := p.config.SystemUnderTest endpoint := s.Deploy(sc, dest) p.client.Cleanup(func() { - if tr, ok := s.(sut.HasManualTeardown); ok { - + if tr, ok := s.(sut.HasTeardown); ok { tr.Teardown(sc) } }) diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/default_e2e_test.go index 410610fde7d..22871d0eb7a 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/default_e2e_test.go @@ -64,7 +64,7 @@ func TestSUTNewDefaultE2E(t *testing.T) { Name: pod.Name, }, }) - if tr, ok := s.(sut.HasManualTeardown); ok { + if tr, ok := s.(sut.HasTeardown); ok { defer tr.Teardown(sutCtx) } assert.NotEmpty(t, endpoint) diff --git a/test/upgrade/prober/sut/types.go b/test/upgrade/prober/sut/types.go index 4868807e4f1..8f134101951 100644 --- a/test/upgrade/prober/sut/types.go +++ b/test/upgrade/prober/sut/types.go @@ -38,8 +38,9 @@ type SystemUnderTest interface { Deploy(ctx Context, destination duckv1.Destination) interface{} } -// HasManualTeardown indicates that SystemUnderTest supports manual teardown. -type HasManualTeardown interface { +// HasTeardown indicates that SystemUnderTest supports custom teardown that +// exceeds regular teardown via usage of testlib.Tracker. +type HasTeardown interface { // Teardown will remove all deployed SUT resources. Teardown(ctx Context) } From afe37cdac9aad72a3676786212c3703594df0abc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 19 May 2021 18:08:34 +0200 Subject: [PATCH 21/24] Removal of unnecessary github.com/prometheus/common --- go.mod | 1 - test/upgrade/prober/prober.go | 4 ++-- vendor/modules.txt | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index ff3f9480a2c..d9a268d78ec 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/pelletier/go-toml v1.8.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.9.1 - github.com/prometheus/common v0.20.0 github.com/rickb777/date v1.13.0 github.com/robfig/cron/v3 v3.0.1 github.com/rogpeppe/fastuuid v1.2.0 diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index dabd7a2ac22..201dc686228 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -21,7 +21,6 @@ import ( "testing" "time" - "github.com/prometheus/common/log" "go.uber.org/zap" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" @@ -143,7 +142,8 @@ func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib // TODO(ksuszyns): Remove this func in next release // Deprecated: use NewRunner func instead. func AssertEventProber(ctx context.Context, t *testing.T, probe Prober) { - log.Warn("prober.AssertEventProber is deprecated. Use NewRunner instead.") + t.Log("WARN: prober.AssertEventProber is deprecated. " + + "Use NewRunner instead.") p := probe.(*prober) p.client.T = t p.config.Ctx = ctx diff --git a/vendor/modules.txt b/vendor/modules.txt index b14db368a57..568f5e3d295 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -259,7 +259,6 @@ github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.20.0 -## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/log From 49eb644b9635043d6ba5dd3548994a00eb2865df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 19 May 2021 18:09:33 +0200 Subject: [PATCH 22/24] Remove Kafka from README and SUT to graph --- test/upgrade/README.md | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/test/upgrade/README.md b/test/upgrade/README.md index 920560df889..f8354a90337 100644 --- a/test/upgrade/README.md +++ b/test/upgrade/README.md @@ -29,7 +29,7 @@ At a high level, we want to do this: To achieve that, we created an upgrade framework (knative.dev/pkg/test/upgrade). This framework will enforce running upgrade tests in specific order and supports -continual verification of system under test. In case of Eventing Kafka it is: +continual verification of system under test. In case of Eventing it is: 1. Install the latest release from GitHub. 1. Run the `preupgrade` smoke tests. @@ -81,21 +81,23 @@ Diagram below describe the setup: ``` K8s cluster | Test machine | - (deploym.) (ksvc) (deploym.) | +(deployment) (ksvc) (deployment) | +--------+ +-----------+ +----------+ | +------------+ | | | ++ | | | | | | Sender | +-->| Forwarder ||----->+ Receiver | | + TestProber | | | | | || | |<---+ | | | +---+----+ | +------------| +----------+ | | +------------+ | | +-----------+ | | - | | | | - | | +---------+ | - | +--+-----+ +---------+ | | | - +-----> | | +-+ + Fetcher | | - | Broker | < - > | Trigger | | | | | - | | | | | +---------+ | - +--------+ +---------+ | (job) | - (default) +----------+ | + | ```````|````````````````````````````` | | + | ` | ` +---------+ | + | ` +--+-----+ +---------+ ` | | | + +-----> | | +-+ ` | Fetcher | | + ` | Broker | < - > | Trigger | | ` | | | + ` | | | | | ` +---------+ | + ` +--------+ +---------+ | ` (job) | + ` (default) +----------+ ` | + ` (SUT) ` + ````````````````````````````````````` ``` #### Probe test configuration From a682cbb80d57c5499b5f1ea7dc1bd3d980d609ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 19 May 2021 18:32:04 +0200 Subject: [PATCH 23/24] Remove unused EventsTypePrefix option --- test/upgrade/prober/configuration.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 755e4de9438..b72d21a1599 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -36,13 +36,12 @@ import ( ) const ( - defaultConfigName = "wathola-config" - defaultConfigHomedirPath = ".config/wathola" - defaultHomedir = "/home/nonroot" - defaultConfigFilename = "config.toml" - defaultWatholaEventsPrefix = "dev.knative.eventing.wathola" - defaultHealthEndpoint = "/healthz" - defaultFinishedSleep = 5 * time.Second + defaultConfigName = "wathola-config" + defaultConfigHomedirPath = ".config/wathola" + defaultHomedir = "/home/nonroot" + defaultConfigFilename = "config.toml" + defaultHealthEndpoint = "/healthz" + defaultFinishedSleep = 5 * time.Second Silence DuplicateAction = "silence" Warn DuplicateAction = "warn" @@ -81,9 +80,8 @@ type Config struct { // Wathola represents options related strictly to wathola testing tool. type Wathola struct { ConfigToml - SystemUnderTest sut.SystemUnderTest - EventsTypePrefix string - HealthEndpoint string + SystemUnderTest sut.SystemUnderTest + HealthEndpoint string } // ConfigToml represents options of wathola config toml file. @@ -150,9 +148,8 @@ func newConfig( ConfigMountPoint: fmt.Sprintf("%s/%s", defaultHomedir, defaultConfigHomedirPath), ConfigFilename: defaultConfigFilename, }, - EventsTypePrefix: defaultWatholaEventsPrefix, - HealthEndpoint: defaultHealthEndpoint, - SystemUnderTest: sut.NewDefault(), + HealthEndpoint: defaultHealthEndpoint, + SystemUnderTest: sut.NewDefault(), }, } From c761cc17f11555154edc531a11029e809234884c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Chris=20Suszy=C5=84ski?= Date: Wed, 19 May 2021 19:06:25 +0200 Subject: [PATCH 24/24] Remove execution of TestBrokerAndTriggers helper test --- test/e2e-tests.sh | 3 -- test/upgrade/prober/sut/broker.go | 4 ++- ...default_e2e_test.go => broker_e2e_test.go} | 7 ++-- test/upgrade/prober/sut/default.go | 2 +- test/upgrade/prober/sut/default_test.go | 32 +++++++++++++++++++ 5 files changed, 41 insertions(+), 7 deletions(-) rename test/upgrade/prober/sut/{default_e2e_test.go => broker_e2e_test.go} (93%) create mode 100644 test/upgrade/prober/sut/default_test.go diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index b31036000e7..f1fabed9204 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -39,7 +39,4 @@ go_test_e2e -timeout=30m -parallel=20 ./test/e2e \ -sources=sources.knative.dev/v1beta2:PingSource,sources.knative.dev/v1:ApiServerSource,sources.knative.dev/v1:ContainerSource \ || fail_test -echo "Running E2E tests for upgrade tests" -go_test_e2e -timeout=10m -parallel=20 ./test/upgrade/... || fail_test - success diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go index 661fda3ad6a..a468a3b4afc 100644 --- a/test/upgrade/prober/sut/broker.go +++ b/test/upgrade/prober/sut/broker.go @@ -53,7 +53,9 @@ type Triggers struct { Types []string } -func newBrokerAndTriggers() SystemUnderTest { +// NewBrokerAndTriggers will create default configuration for BrokerAndTriggers +// based SUT. +func NewBrokerAndTriggers() SystemUnderTest { return &BrokerAndTriggers{ Broker: Broker{ Name: "sut", diff --git a/test/upgrade/prober/sut/default_e2e_test.go b/test/upgrade/prober/sut/broker_e2e_test.go similarity index 93% rename from test/upgrade/prober/sut/default_e2e_test.go rename to test/upgrade/prober/sut/broker_e2e_test.go index 22871d0eb7a..71c13602815 100644 --- a/test/upgrade/prober/sut/default_e2e_test.go +++ b/test/upgrade/prober/sut/broker_e2e_test.go @@ -40,11 +40,14 @@ import ( pkgTest "knative.dev/pkg/test" ) -func TestSUTNewDefaultE2E(t *testing.T) { +// TestBrokerAndTriggers isn't executed by regular E2E tests as it's actually +// executed as part of the upgrade tests. This test here is to make it easy to +// create other SUTs for Eventing and quickly verify them act properly. +func TestBrokerAndTriggers(t *testing.T) { ctx := signals.NewContext() client := testlib.Setup(t, false) defer testlib.TearDown(client) - s := sut.NewDefault() + s := sut.NewBrokerAndTriggers() sutCtx := sut.Context{ Ctx: ctx, Log: log(t), diff --git a/test/upgrade/prober/sut/default.go b/test/upgrade/prober/sut/default.go index 1a9561a2c77..1f11edbdd2c 100644 --- a/test/upgrade/prober/sut/default.go +++ b/test/upgrade/prober/sut/default.go @@ -18,5 +18,5 @@ package sut // NewDefault will return a default SUT for this repo. func NewDefault() SystemUnderTest { - return newBrokerAndTriggers() + return NewBrokerAndTriggers() } diff --git a/test/upgrade/prober/sut/default_test.go b/test/upgrade/prober/sut/default_test.go new file mode 100644 index 00000000000..11a45aa29e0 --- /dev/null +++ b/test/upgrade/prober/sut/default_test.go @@ -0,0 +1,32 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "knative.dev/eventing/test/upgrade/prober/sut" +) + +func TestNewDefault(t *testing.T) { + s := sut.NewDefault() + assert.Condition(t, func() bool { + _, ok := s.(*sut.BrokerAndTriggers) + return ok + }) +}