diff --git a/test/lib/resources/eventing.go b/test/lib/resources/eventing.go index cf7667330ee..d4720d2b8f5 100644 --- a/test/lib/resources/eventing.go +++ b/test/lib/resources/eventing.go @@ -242,22 +242,31 @@ func WithDependencyAnnotationTrigger(dependencyAnnotation string) TriggerOption // WithSubscriberServiceRefForTrigger returns an option that adds a Subscriber Knative Service Ref for the given v1 Trigger. func WithSubscriberServiceRefForTrigger(name string) TriggerOption { - return func(t *eventingv1.Trigger) { - if name != "" { - t.Spec.Subscriber = duckv1.Destination{ - Ref: KnativeRefForService(name, t.Namespace), - } + return WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { + return duckv1.Destination{ + Ref: KnativeRefForService(name, t.Namespace), } - } + }) } // WithSubscriberURIForTrigger returns an option that adds a Subscriber URI for the given v1 Trigger. func WithSubscriberURIForTrigger(uri string) TriggerOption { - apisURI, _ := apis.ParseURL(uri) - return func(t *eventingv1.Trigger) { - t.Spec.Subscriber = duckv1.Destination{ + return WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { + apisURI, _ := apis.ParseURL(uri) + return duckv1.Destination{ URI: apisURI, } + }) +} + +// WithSubscriberDestination returns an option that adds a Subscriber for given +// duckv1.Destination. +func WithSubscriberDestination(destFactory func(t *eventingv1.Trigger) duckv1.Destination) TriggerOption { + return func(t *eventingv1.Trigger) { + dest := destFactory(t) + if dest.Ref != nil || dest.URI != nil { + t.Spec.Subscriber = dest + } } } diff --git a/test/upgrade/README.md b/test/upgrade/README.md index 99f103b9ffe..f8354a90337 100644 --- a/test/upgrade/README.md +++ b/test/upgrade/README.md @@ -6,7 +6,8 @@ Running these tests on every commit will ensure that we don’t introduce any non-upgradeable changes, so every commit should be releasable. This is inspired by kubernetes -[upgrade testing](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-testing/e2e-tests.md#version-skewed-and-upgrade-testing). +[upgrade testing](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-testing/e2e-tests.md#version-skewed-and-upgrade-testing) +. These tests are a pretty big hammer in that they cover more than just version changes, but it’s one of the only ways to make sure we don’t accidentally make @@ -26,15 +27,19 @@ At a high level, we want to do this: 1. Run any post-install jobs that apply for the release to be. 1. Test those resources, verify that we didn’t break anything. -To achieve that, we just have three separate build tags: +To achieve that, we created an upgrade framework (knative.dev/pkg/test/upgrade). +This framework will enforce running upgrade tests in specific order and supports +continual verification of system under test. In case of Eventing it is: 1. Install the latest release from GitHub. -1. Run the `preupgrade` tests in this directory. -1. Install at HEAD (`ko apply -f config/`). -1. Run the post-install job. For v0.15 we need to migrate storage versions. -1. Run the `postupgrade` tests in this directory. +1. Run the `preupgrade` smoke tests. +1. Start `continual` tests that will propagate events in the background, while + upgrading and downgrading. +1. Install at HEAD (`ko apply -f config/`) and run the post-install jobs. +1. Run the `postupgrade` smoke tests. 1. Install the latest release from GitHub. -1. Run the `postdowngrade` tests in this directory. +1. Run the `postdowngrade` smoke tests. +1. Stop and verify `continual` tests, checking if every event propagated well. ## Tests @@ -52,42 +57,47 @@ In order to verify that we don't have data-plane unavailability during our control-plane outages (when we're upgrading the knative/eventing installation), we run a prober test that continually sends events to a service during the entire upgrade/downgrade process. When the upgrade completes, we make sure that -all of those events propagated just once. - -To achieve that a [wathola tool](test/upgrade/prober/wathola) was prepared. It -consists of 4 components: _sender_, _forwarder_, _receiver_, and _fetcher_. -_Sender_ is the usual Kubernetes deployment that publishes events to the default -`broker` with given interval. When it terminates (by either `SIGTERM`, or +all of those events propagated at least once. + +To achieve that +a [wathola tool](https://pkg.go.dev/knative.dev/eventing/test/upgrade/prober/wathola) +was prepared. It consists of 4 components: _sender_, _forwarder_, _receiver_, +and _fetcher_. _Sender_ is the usual Kubernetes deployment that publishes events +to the System Under Tests (SUT). By default, SUT is a default `broker` +with two triggers for each type of events being sent. _Sender_ will send events +with given interval. When it terminates (by either `SIGTERM`, or `SIGINT`), a `finished` event is generated. _Forwarder_ is a knative serving service that scales up from zero to receive the sent events and forward them to given target which is the _receiver_ in our case. _Receiver_ is an ordinary -deployment that collects events from multiple forwarders and has an endpoint -`/report` that can be polled to get the status of received events. To fetch the -report from within the cluster _fetcher_ comes in. It's a simple one time job, -that will fetch the report from _receiver_ and print it on stdout as JSON. That -enables the test client to download _fetcher_ logs and parse the JSON to get the -final report. +deployment that collects events from multiple forwarders and has an +endpoint `/report` that can be polled to get the status of received events. To +fetch the report from within the cluster _fetcher_ comes in. It's a simple one +time job, that will fetch the report from _receiver_ and print it on stdout as +JSON. That enables the test client to download _fetcher_ logs and parse the JSON +to get the final report. Diagram below describe the setup: ``` K8s cluster | Test machine | - (deploym.) (ksvc) (deploym.) | +(deployment) (ksvc) (deployment) | +--------+ +-----------+ +----------+ | +------------+ | | | ++ | | | | | | Sender | +-->| Forwarder ||----->+ Receiver | | + TestProber | | | | | || | |<---+ | | | +---+----+ | +------------| +----------+ | | +------------+ | | +-----------+ | | - | | | | - | | +---------+ | - | +--+-----+ +---------+ | | | - +-----> | | +-+ + Fetcher | | - | Broker | < - > | Trigger | | | | | - | | | | | +---------+ | - +--------+ +---------+ | (job) | - (default) +----------+ | + | ```````|````````````````````````````` | | + | ` | ` +---------+ | + | ` +--+-----+ +---------+ ` | | | + +-----> | | +-+ ` | Fetcher | | + ` | Broker | < - > | Trigger | | ` | | | + ` | | | | | ` +---------+ | + ` +--------+ +---------+ | ` (job) | + ` (default) +----------+ ` | + ` (SUT) ` + ````````````````````````````````````` ``` #### Probe test configuration @@ -96,16 +106,16 @@ Probe test behavior can be influenced from outside without modifying its source code. That can be beneficial if one would like to run upgrade tests in different context. One such example might be running Eventing upgrade tests in place that have Serving and Eventing both installed. In such environment one can set -environment variable `E2E_UPGRADE_TESTS_SERVING_USE` to enable usage of ksvc -forwarder (which is disabled by default): +environment variable `EVENTING_UPGRADE_TESTS_SERVING_USE` to enable usage of +ksvc forwarder (which is disabled by default): ``` -$ export E2E_UPGRADE_TESTS_SERVING_USE=true +$ export EVENTING_UPGRADE_TESTS_SERVING_USE=true ``` Any option, apart from namespace, in [`knative.dev/eventing/test/upgrade/prober.Config`](https://github.com/knative/eventing/blob/022e281/test/upgrade/prober/prober.go#L52-L63) -struct can be influenced, by using `E2E_UPGRADE_TESTS_XXXXX` environmental +struct can be influenced, by using `EVENTING_UPGRADE_TESTS_XXXXX` environmental variable prefix (using [kelseyhightower/envconfig](https://github.com/kelseyhightower/envconfig#usage) usage). diff --git a/test/upgrade/continual.go b/test/upgrade/continual.go index e5ff5af2734..56d1bb54822 100644 --- a/test/upgrade/continual.go +++ b/test/upgrade/continual.go @@ -17,28 +17,12 @@ limitations under the License. package upgrade import ( - "context" - - testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/upgrade/prober" pkgupgrade "knative.dev/pkg/test/upgrade" ) +// ContinualTest will perform a continual validation of Eventing SUT. func ContinualTest() pkgupgrade.BackgroundOperation { - ctx := context.Background() - var client *testlib.Client - var probe prober.Prober - return pkgupgrade.NewBackgroundVerification("EventingContinualTest", - func(c pkgupgrade.Context) { - // setup - client = testlib.Setup(c.T, false) - config := prober.NewConfig(client.Namespace) - probe = prober.RunEventProber(ctx, c.Log, client, config) - }, - func(c pkgupgrade.Context) { - // verify - defer testlib.TearDown(client) - prober.AssertEventProber(ctx, c.T, probe) - }, - ) + return prober.NewContinualVerification("EventingContinualTest", + prober.ContinualVerificationOptions{}) } diff --git a/test/upgrade/prober/config.toml b/test/upgrade/prober/config.toml index ca5c1c7c993..20afa87d6fa 100644 --- a/test/upgrade/prober/config.toml +++ b/test/upgrade/prober/config.toml @@ -1,6 +1,6 @@ # logLevel = 'DEBUG' [sender] -address = '{{- .BrokerURL -}}' +address = '{{- .Endpoint -}}' interval = {{ .Config.Interval.Nanoseconds }} [forwarder] -target = 'http://wathola-receiver.{{- .Config.Namespace -}}.svc.cluster.local' +target = 'http://wathola-receiver.{{- .Namespace -}}.svc.cluster.local' diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index a195f7d4f14..ba1057afd1d 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -20,68 +20,77 @@ import ( "context" "fmt" "io/ioutil" + "os" "path" "runtime" + "strings" "text/template" "time" "github.com/kelseyhightower/envconfig" "github.com/wavesoftware/go-ensure" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" - testlib "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/duck" "knative.dev/eventing/test/lib/resources" - "knative.dev/pkg/apis" + "knative.dev/eventing/test/upgrade/prober/sut" + duckv1 "knative.dev/pkg/apis/duck/v1" pkgTest "knative.dev/pkg/test" + pkgupgrade "knative.dev/pkg/test/upgrade" ) const ( - defaultConfigName = "wathola-config" - defaultConfigHomedirPath = ".config/wathola" - defaultHomedir = "/home/nonroot" - defaultConfigFilename = "config.toml" - defaultWatholaEventsPrefix = "com.github.cardil.wathola" - defaultBrokerName = "default" - defaultHealthEndpoint = "/healthz" - defaultFinishedSleep = 5 * time.Second + defaultConfigName = "wathola-config" + defaultConfigHomedirPath = ".config/wathola" + defaultHomedir = "/home/nonroot" + defaultConfigFilename = "config.toml" + defaultHealthEndpoint = "/healthz" + defaultFinishedSleep = 5 * time.Second Silence DuplicateAction = "silence" Warn DuplicateAction = "warn" Error DuplicateAction = "error" + + prefix = "eventing_upgrade_tests" + + // TODO(ksuszyns): Remove this val in next release + // Deprecated: use 'eventing_upgrade_tests' prefix instead + deprecatedPrefix = "e2e_upgrade_tests" ) // DuplicateAction is the action to take in case of duplicated events type DuplicateAction string -var eventTypes = []string{"step", "finished"} - // Config represents a configuration for prober. type Config struct { Wathola - Namespace string Interval time.Duration FinishedSleep time.Duration Serving ServingConfig FailOnErrors bool OnDuplicate DuplicateAction - BrokerOpts []resources.BrokerOption + Ctx context.Context + // BrokerOpts holds opts for broker. + // TODO(ksuszyns): Remove this opt in next release + // Deprecated: use Wathola.SystemUnderTest instead. + BrokerOpts []resources.BrokerOption + // Namespace holds namespace in which test is about to be executed. + // TODO(ksuszyns): Remove this opt in next release + // Deprecated: namespace is about to be taken from testlib.Client created by + // NewRunner + Namespace string } // Wathola represents options related strictly to wathola testing tool. type Wathola struct { - ConfigMap + ConfigToml ImageResolver - EventsTypePrefix string - HealthEndpoint string - BrokerName string + SystemUnderTest sut.SystemUnderTest + HealthEndpoint string } // ImageResolver will resolve the container image for given component. type ImageResolver func(component string) string -// ConfigMap represents options of wathola config toml file. -type ConfigMap struct { +// ConfigToml represents options of wathola config toml file. +type ConfigToml struct { // ConfigTemplate is a template file that will be compiled to the configmap ConfigTemplate string ConfigMapName string @@ -95,123 +104,130 @@ type ServingConfig struct { ScaleToZero bool } +// NewConfigOrFail will create a prober.Config or fail trying. +func NewConfigOrFail(c pkgupgrade.Context) *Config { + errSink := func(err error) { + c.T.Fatal(err) + } + warnf := c.Log.Warnf + return newConfig(errSink, warnf) +} + // NewConfig creates a new configuration object with default values filled in. // Values can be influenced by kelseyhightower/envconfig with -// `e2e_upgrade_tests` prefix. -func NewConfig(namespace string) *Config { +// `eventing_upgrade_tests` prefix. +// TODO(ksuszyns): Remove this func in next release +// Deprecated: use NewContinualVerification or NewConfigOrFail +func NewConfig(namespace ...string) *Config { + errSink := func(err error) { + ensure.NoError(err) + } + warnf := func(template string, args ...interface{}) { + _, err := fmt.Fprintf(os.Stderr, template, args) + ensure.NoError(err) + } + config := newConfig(errSink, warnf) + if len(namespace) > 0 { + config.Namespace = strings.Join(namespace, ",") + } + return config +} + +func newConfig( + errSink func(err error), warnf func(template string, args ...interface{}), +) *Config { config := &Config{ - Namespace: "", Interval: Interval, FinishedSleep: defaultFinishedSleep, FailOnErrors: true, OnDuplicate: Warn, - BrokerOpts: make([]resources.BrokerOption, 0), + Ctx: context.Background(), Serving: ServingConfig{ Use: false, ScaleToZero: true, }, Wathola: Wathola{ ImageResolver: pkgTest.ImagePath, - ConfigMap: ConfigMap{ + ConfigToml: ConfigToml{ ConfigTemplate: defaultConfigFilename, ConfigMapName: defaultConfigName, ConfigMountPoint: fmt.Sprintf("%s/%s", defaultHomedir, defaultConfigHomedirPath), ConfigFilename: defaultConfigFilename, }, - EventsTypePrefix: defaultWatholaEventsPrefix, - HealthEndpoint: defaultHealthEndpoint, - BrokerName: defaultBrokerName, + HealthEndpoint: defaultHealthEndpoint, + SystemUnderTest: sut.NewDefault(), }, } - // FIXME: remove while fixing https://github.com/knative/eventing/issues/2665 - config.FailOnErrors = false + if err := envconfig.Process(prefix, config); err != nil { + errSink(err) + } - err := envconfig.Process("e2e_upgrade_tests", config) - ensure.NoError(err) - config.Namespace = namespace + // TODO(ksuszyns): Remove this block in next release + for _, enventry := range os.Environ() { + if strings.HasPrefix(strings.ToLower(enventry), deprecatedPrefix) { + warnf( + "DEPRECATED: using deprecated '%s' prefix. Use '%s' instead.", + deprecatedPrefix, prefix) + if err := envconfig.Process(deprecatedPrefix, config); err != nil { + errSink(err) + } + break + } + } return config } func (p *prober) deployConfiguration() { - p.deployBroker() - p.deployConfigMap() - p.deployTriggers() -} - -func (p *prober) deployBroker() { - p.client.CreateBrokerOrFail(p.config.BrokerName, p.config.BrokerOpts...) -} - -func (p *prober) fetchBrokerURL() (*apis.URL, error) { - namespace := p.config.Namespace - p.log.Debugf("Fetching %s broker URL for ns %s", - p.config.BrokerName, namespace) - meta := resources.NewMetaResource( - p.config.BrokerName, p.config.Namespace, testlib.BrokerTypeMeta, - ) - err := duck.WaitForResourceReady(p.client.Dynamic, meta) - if err != nil { - return nil, err + sc := sut.Context{ + Ctx: p.config.Ctx, + Log: p.log, + Client: p.client, } - broker, err := p.client.Eventing.EventingV1().Brokers(namespace).Get( - context.Background(), p.config.BrokerName, metav1.GetOptions{}, - ) - if err != nil { - return nil, err + ref := resources.KnativeRefForService(receiverName, p.client.Namespace) + if p.config.Serving.Use { + ref = resources.KnativeRefForKservice(forwarderName, p.client.Namespace) } - url := broker.Status.Address.URL - p.log.Debugf("%s broker URL for ns %s is %v", - p.config.BrokerName, namespace, url) - return url, nil + dest := duckv1.Destination{Ref: ref} + s := p.config.SystemUnderTest + endpoint := s.Deploy(sc, dest) + p.client.Cleanup(func() { + if tr, ok := s.(sut.HasTeardown); ok { + tr.Teardown(sc) + } + }) + p.deployConfigToml(endpoint) } -func (p *prober) deployConfigMap() { +func (p *prober) deployConfigToml(endpoint interface{}) { name := p.config.ConfigMapName - p.log.Infof("Deploying config map: \"%s/%s\"", p.config.Namespace, name) - brokerURL, err := p.fetchBrokerURL() - ensure.NoError(err) - configData := p.compileTemplate(p.config.ConfigTemplate, brokerURL) - p.client.CreateConfigMapOrFail(name, p.config.Namespace, map[string]string{ + p.log.Infof("Deploying config map: \"%s/%s\"", p.client.Namespace, name) + configData := p.compileTemplate(p.config.ConfigTemplate, endpoint) + p.client.CreateConfigMapOrFail(name, p.client.Namespace, map[string]string{ p.config.ConfigFilename: configData, }) } -func (p *prober) deployTriggers() { - for _, eventType := range eventTypes { - name := fmt.Sprintf("wathola-trigger-%v", eventType) - fullType := fmt.Sprintf("%v.%v", p.config.EventsTypePrefix, eventType) - subscriberOption := resources.WithSubscriberServiceRefForTrigger(receiverName) - if p.config.Serving.Use { - subscriberOption = resources.WithSubscriberKServiceRefForTrigger(forwarderName) - } - p.client.CreateTriggerOrFail(name, - resources.WithBroker(p.config.BrokerName), - resources.WithAttributesTriggerFilter( - eventingv1.TriggerAnyFilter, - fullType, - map[string]interface{}{}, - ), - subscriberOption, - ) - } -} - -func (p *prober) compileTemplate(templateName string, brokerURL fmt.Stringer) string { +func (p *prober) compileTemplate(templateName string, endpoint interface{}) string { _, filename, _, _ := runtime.Caller(0) templateFilepath := path.Join(path.Dir(filename), templateName) templateBytes, err := ioutil.ReadFile(templateFilepath) - ensure.NoError(err) + p.ensureNoError(err) tmpl, err := template.New(templateName).Parse(string(templateBytes)) - ensure.NoError(err) + p.ensureNoError(err) var buff bytes.Buffer data := struct { *Config + Namespace string + // Deprecated: use Endpoint BrokerURL string + Endpoint interface{} }{ p.config, - brokerURL.String(), + p.client.Namespace, + fmt.Sprintf("%v", endpoint), + endpoint, } - ensure.NoError(tmpl.Execute(&buff, data)) + p.ensureNoError(tmpl.Execute(&buff, data)) return buff.String() } diff --git a/test/upgrade/prober/continual.go b/test/upgrade/prober/continual.go new file mode 100644 index 00000000000..237fdb582e4 --- /dev/null +++ b/test/upgrade/prober/continual.go @@ -0,0 +1,59 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + testlib "knative.dev/eventing/test/lib" + pkgupgrade "knative.dev/pkg/test/upgrade" +) + +// Configurator will customize default config. +type Configurator func(config *Config) error + +// ContinualVerificationOptions holds options for NewContinualVerification func. +type ContinualVerificationOptions struct { + Configurators []Configurator + ClientOptions []testlib.SetupClientOption +} + +// NewContinualVerification will create a new continual verification operation +// that will verify that SUT is propagating events well through the test +// process. It's a general pattern that can be used to validate upgrade or +// performance testing. +func NewContinualVerification( + name string, opts ContinualVerificationOptions, +) pkgupgrade.BackgroundOperation { + var config *Config + var runner Runner + setup := func(c pkgupgrade.Context) { + config = NewConfigOrFail(c) + if len(opts.Configurators) > 0 { + for _, fn := range opts.Configurators { + err := fn(config) + if err != nil { + c.T.Fatal(err) + } + } + } + runner = NewRunner(config, opts.ClientOptions...) + runner.Setup(c) + } + verify := func(c pkgupgrade.Context) { + runner.Verify(c) + } + return pkgupgrade.NewBackgroundVerification(name, setup, verify) +} diff --git a/test/upgrade/prober/errors.go b/test/upgrade/prober/errors.go new file mode 100644 index 00000000000..e70da17bd9b --- /dev/null +++ b/test/upgrade/prober/errors.go @@ -0,0 +1,23 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +func (p *prober) ensureNoError(err error) { + if err != nil { + p.client.T.Fatal(err) + } +} diff --git a/test/upgrade/prober/forwarder.go b/test/upgrade/prober/forwarder.go index 3406254ca58..14965fafb88 100644 --- a/test/upgrade/prober/forwarder.go +++ b/test/upgrade/prober/forwarder.go @@ -16,10 +16,8 @@ package prober import ( - "context" "fmt" - "github.com/wavesoftware/go-ensure" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" testlib "knative.dev/eventing/test/lib" @@ -31,12 +29,13 @@ var ( forwarderName = "wathola-forwarder" ) -func (p *prober) deployForwarder(ctx context.Context) { +func (p *prober) deployForwarder() { p.log.Infof("Deploy forwarder knative service: %v", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) service := p.forwarderKService(forwarderName, p.client.Namespace) - _, err := serving.Create(context.Background(), service, metav1.CreateOptions{}) - ensure.NoError(err) + if _, err := serving.Create(p.config.Ctx, service, metav1.CreateOptions{}); err != nil { + p.client.T.Fatal(err) + } sc := p.servingClient() testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarderName), func() error { @@ -45,7 +44,7 @@ func (p *prober) deployForwarder(ctx context.Context) { if p.config.Serving.ScaleToZero { testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarderName), func() error { - return duck.WaitForKServiceScales(ctx, sc, forwarderName, p.client.Namespace, func(scale int) bool { + return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarderName, p.client.Namespace, func(scale int) bool { return scale == 0 }) }) @@ -55,8 +54,8 @@ func (p *prober) deployForwarder(ctx context.Context) { func (p *prober) removeForwarder() { p.log.Infof("Remove forwarder knative service: %v", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) - err := serving.Delete(context.Background(), forwarderName, metav1.DeleteOptions{}) - ensure.NoError(err) + err := serving.Delete(p.config.Ctx, forwarderName, metav1.DeleteOptions{}) + p.ensureNoError(err) } func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstructured { diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index bc6623bf59f..201dc686228 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -17,63 +17,144 @@ package prober import ( "context" + "reflect" "testing" "time" - "github.com/wavesoftware/go-ensure" "go.uber.org/zap" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/upgrade/prober/sut" + pkgupgrade "knative.dev/pkg/test/upgrade" ) var ( - // FIXME: Interval is set to 200 msec, as lower values will result in errors: knative/eventing#2357 - // Interval = 10 * time.Millisecond - Interval = 200 * time.Millisecond + // Interval is used to send events in specific rate. + Interval = 10 * time.Millisecond ) -// Prober is the interface for a prober, which checks the result of the probes when stopped. +// Prober is the interface for a prober, which checks the result of the probes +// when stopped. +// TODO(ksuszyns): Remove this interface in next release +// Deprecated: use Runner instead, create it with NewRunner func. type Prober interface { // Verify will verify prober state after finished has been send - Verify(ctx context.Context) ([]error, int, error) + Verify() ([]error, int) // Finish send finished event - Finish(ctx context.Context) + Finish() // ReportErrors will reports found errors in proper way - ReportErrors(t *testing.T, errors []error) + ReportErrors(errors []error) +} + +// Runner will run continual verification with provided configuration. +type Runner interface { + // Setup will start a continual prober in background. + Setup(ctx pkgupgrade.Context) - // deploy a prober to a cluster - deploy(ctx context.Context) - // remove a prober from cluster - remove() + // Verify will verify that all sent events propagated at least once. + Verify(ctx pkgupgrade.Context) } -// RunEventProber starts a single Prober of the given domain. -func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { - pm := newProber(log, client, config) - pm.deploy(ctx) - return pm +// NewRunner will create a runner compatible with NewContinualVerification +// func. +func NewRunner(config *Config, options ...testlib.SetupClientOption) Runner { + return &probeRunner{ + prober: &prober{config: config}, + options: options, + } } -// AssertEventProber will send finish event and then verify if all events propagated well -func AssertEventProber(ctx context.Context, t *testing.T, prober Prober) { - prober.Finish(ctx) - defer prober.remove() +type probeRunner struct { + *prober + options []testlib.SetupClientOption +} - waitAfterFinished(prober) +func (p *probeRunner) Setup(ctx pkgupgrade.Context) { + p.validate(ctx) + p.log = ctx.Log + p.client = testlib.Setup(ctx.T, false, p.options...) + p.deploy() +} - eventErrs, eventCount, err := prober.Verify(ctx) - if err != nil { - t.Fatal("fetch error:", err) +func (p *probeRunner) Verify(ctx pkgupgrade.Context) { + if p.client == nil { + ctx.T.Fatal("prober isn't initiated (client is nil)") + return } - if len(eventErrs) == 0 { - t.Logf("All %d events propagated well", eventCount) + // use T from new test + p.client.T = ctx.T + t := ctx.T + defer testlib.TearDown(p.client) + defer p.remove() + p.Finish() + waitAfterFinished(p.prober) + + errors, events := p.prober.Verify() + if len(errors) == 0 { + t.Logf("All %d events propagated well", events) } else { t.Logf("There were %d events propagated, but %d errors occurred. "+ - "Listing them below.", eventCount, len(eventErrs)) + "Listing them below.", events, len(errors)) + } + + p.ReportErrors(errors) +} + +func (p *probeRunner) validate(ctx pkgupgrade.Context) { + if p.config.Namespace != "" { + ctx.Log.Warnf( + "DEPRECATED: namespace set in Config: %s. Ignoring it.", + p.client.Namespace) + } + if len(p.config.BrokerOpts) > 0 { + ctx.Log.Warn( + "DEPRECATED: BrokerOpts set in Config. Use custom SystemUnderTest") + if reflect.ValueOf(p.config.Wathola.SystemUnderTest) == reflect.ValueOf(sut.NewDefault) { + bt := sut.NewDefault().(*sut.BrokerAndTriggers) + bt.Opts = append(bt.Opts, p.config.BrokerOpts...) + p.config.Wathola.SystemUnderTest = bt + } else { + ctx.T.Fatal("Can't use given BrokerOpts, as custom SUT is used as " + + "well. Drop using BrokerOpts in favor of custom SUT.") + } } - prober.ReportErrors(t, eventErrs) +} + +// RunEventProber starts a single Prober of the given domain. +// TODO(ksuszyns): Remove this func in next release +// Deprecated: use NewRunner func instead. +func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { + log.Warn("prober.RunEventProber is deprecated. Use NewRunner instead.") + config.Ctx = ctx + p := &prober{ + log: log, + client: client, + config: config, + } + p.deploy() + return p +} + +// AssertEventProber will send finish event and then verify if all events +// propagated well. +// TODO(ksuszyns): Remove this func in next release +// Deprecated: use NewRunner func instead. +func AssertEventProber(ctx context.Context, t *testing.T, probe Prober) { + t.Log("WARN: prober.AssertEventProber is deprecated. " + + "Use NewRunner instead.") + p := probe.(*prober) + p.client.T = t + p.config.Ctx = ctx + pr := &probeRunner{ + prober: p, + options: nil, + } + pr.Verify(pkgupgrade.Context{ + T: t, + Log: p.log, + }) } type prober struct { @@ -89,7 +170,8 @@ func (p *prober) servingClient() resources.ServingClient { } } -func (p *prober) ReportErrors(t *testing.T, errors []error) { +func (p *prober) ReportErrors(errors []error) { + t := p.client.T for _, err := range errors { if p.config.FailOnErrors { t.Error(err) @@ -105,17 +187,17 @@ func (p *prober) ReportErrors(t *testing.T, errors []error) { } } -func (p *prober) deploy(ctx context.Context) { +func (p *prober) deploy() { p.log.Infof("Using namespace for probe testing: %v", p.client.Namespace) p.deployConfiguration() - p.deployReceiver(ctx) + p.deployReceiver() if p.config.Serving.Use { - p.deployForwarder(ctx) + p.deployForwarder() } - p.client.WaitForAllTestResourcesReadyOrFail(ctx) + p.client.WaitForAllTestResourcesReadyOrFail(p.config.Ctx) - p.deploySender(ctx) - ensure.NoError(testlib.AwaitForAll(p.log)) + p.deploySender() + p.ensureNoError(testlib.AwaitForAll(p.log)) // allow sender to send at least some events, 2 sec wait time.Sleep(2 * time.Second) p.log.Infof("Prober is now sending events with interval of %v in "+ @@ -126,20 +208,10 @@ func (p *prober) remove() { if p.config.Serving.Use { p.removeForwarder() } - ensure.NoError(p.client.Tracker.Clean(true)) -} - -func newProber(log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober { - return &prober{ - log: log, - client: client, - config: config, - } + p.ensureNoError(p.client.Tracker.Clean(true)) } -func waitAfterFinished(p Prober) { - s := p.(*prober) - cfg := s.config - s.log.Infof("Waiting %v after sender finished...", cfg.FinishedSleep) - time.Sleep(cfg.FinishedSleep) +func waitAfterFinished(p *prober) { + p.log.Infof("Waiting %v after sender finished...", p.config.FinishedSleep) + time.Sleep(p.config.FinishedSleep) } diff --git a/test/upgrade/prober/prober_test.go b/test/upgrade/prober/prober_test.go index 40666a72e48..db658cf1902 100644 --- a/test/upgrade/prober/prober_test.go +++ b/test/upgrade/prober/prober_test.go @@ -26,8 +26,8 @@ import ( const ( defaultConfigFilename = "config.toml" - servingEnvName = "E2E_UPGRADE_TESTS_SERVING_USE" - configFilenameEnvName = "E2E_UPGRADE_TESTS_CONFIGFILENAME" + servingEnvName = "EVENTING_UPGRADE_TESTS_SERVING_USE" + configFilenameEnvName = "EVENTING_UPGRADE_TESTS_CONFIGFILENAME" ) func TestNewConfig(t *testing.T) { @@ -48,12 +48,12 @@ func TestNewConfig(t *testing.T) { if s.panics { assert.Panics(t, func() { - prober.NewConfig("test-ns") + prober.NewConfig() }) return } - config := prober.NewConfig("test-ns") + config := prober.NewConfig() assert.Equal(t, s.servingUse, config.Serving.Use) assert.True(t, config.Serving.ScaleToZero) diff --git a/test/upgrade/prober/receiver.go b/test/upgrade/prober/receiver.go index f10145f8df3..d5537988191 100644 --- a/test/upgrade/prober/receiver.go +++ b/test/upgrade/prober/receiver.go @@ -16,10 +16,8 @@ package prober import ( - "context" "fmt" - "github.com/wavesoftware/go-ensure" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,36 +28,32 @@ import ( ) var ( - receiverName = "wathola-receiver" - receiverNodePort int32 = -1 + receiverName = "wathola-receiver" ) -func (p *prober) deployReceiver(ctx context.Context) { - p.deployReceiverDeployment(ctx) - p.deployReceiverService(ctx) +func (p *prober) deployReceiver() { + p.deployReceiverDeployment() + p.deployReceiverService() } -func (p *prober) deployReceiverDeployment(ctx context.Context) { +func (p *prober) deployReceiverDeployment() { p.log.Info("Deploy of receiver deployment: ", receiverName) deployment := p.createReceiverDeployment() - _, err := p.client.Kube.AppsV1(). - Deployments(deployment.Namespace). - Create(ctx, deployment, metav1.CreateOptions{}) - ensure.NoError(err) + p.client.CreateDeploymentOrFail(deployment) testlib.WaitFor(fmt.Sprint("receiver deployment be ready: ", receiverName), func() error { return pkgTest.WaitForDeploymentScale( - ctx, p.client.Kube, receiverName, p.client.Namespace, 1, + p.config.Ctx, p.client.Kube, receiverName, p.client.Namespace, 1, ) }) } -func (p *prober) deployReceiverService(ctx context.Context) { +func (p *prober) deployReceiverService() { p.log.Infof("Deploy of receiver service: %v", receiverName) service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: receiverName, - Namespace: p.config.Namespace, + Namespace: p.client.Namespace, }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ @@ -76,22 +70,10 @@ func (p *prober) deployReceiverService(ctx context.Context) { Selector: map[string]string{ "app": receiverName, }, - Type: corev1.ServiceTypeNodePort, + Type: corev1.ServiceTypeClusterIP, }, } - created, err := p.client.Kube.CoreV1().Services(p.config.Namespace). - Create(ctx, service, metav1.CreateOptions{}) - ensure.NoError(err) - for _, portSpec := range created.Spec.Ports { - if portSpec.Port == 80 { - receiverNodePort = portSpec.NodePort - } - } - if receiverNodePort == -1 { - panic(fmt.Errorf("couldn't find a node port for service: %v", receiverName)) - } else { - p.log.Debugf("Node port for service: %v is %v", receiverName, receiverNodePort) - } + p.client.CreateServiceOrFail(service) } func (p *prober) createReceiverDeployment() *appsv1.Deployment { @@ -99,7 +81,7 @@ func (p *prober) createReceiverDeployment() *appsv1.Deployment { return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: receiverName, - Namespace: p.config.Namespace, + Namespace: p.client.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, diff --git a/test/upgrade/prober/sender.go b/test/upgrade/prober/sender.go index bde438fab46..5f3bdd4ec9c 100644 --- a/test/upgrade/prober/sender.go +++ b/test/upgrade/prober/sender.go @@ -16,10 +16,8 @@ package prober import ( - "context" "fmt" - "github.com/wavesoftware/go-ensure" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,13 +27,13 @@ import ( var senderName = "wathola-sender" -func (p *prober) deploySender(ctx context.Context) { +func (p *prober) deploySender() { p.log.Info("Deploy sender deployment: ", senderName) var replicas int32 = 1 deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: senderName, - Namespace: p.config.Namespace, + Namespace: p.client.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, @@ -77,21 +75,21 @@ func (p *prober) deploySender(ctx context.Context) { _, err := p.client.Kube.AppsV1(). Deployments(p.client.Namespace). - Create(ctx, deployment, metav1.CreateOptions{}) - ensure.NoError(err) + Create(p.config.Ctx, deployment, metav1.CreateOptions{}) + p.ensureNoError(err) testlib.WaitFor(fmt.Sprint("sender deployment be ready: ", senderName), func() error { return pkgTest.WaitForDeploymentScale( - ctx, p.client.Kube, senderName, p.client.Namespace, int(replicas), + p.config.Ctx, p.client.Kube, senderName, p.client.Namespace, int(replicas), ) }) } -func (p *prober) removeSender(ctx context.Context) { +func (p *prober) removeSender() { p.log.Info("Remove of sender deployment: ", senderName) err := p.client.Kube.AppsV1(). Deployments(p.client.Namespace). - Delete(ctx, senderName, metav1.DeleteOptions{}) - ensure.NoError(err) + Delete(p.config.Ctx, senderName, metav1.DeleteOptions{}) + p.ensureNoError(err) } diff --git a/test/upgrade/prober/sut/broker.go b/test/upgrade/prober/sut/broker.go new file mode 100644 index 00000000000..a468a3b4afc --- /dev/null +++ b/test/upgrade/prober/sut/broker.go @@ -0,0 +1,141 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/duck" + "knative.dev/eventing/test/lib/resources" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +var ( + retryCount = int32(12) + backoffPolicy = eventingduckv1.BackoffPolicyExponential + backoffDelay = "PT1S" +) + +// BrokerAndTriggers will deploy a default broker and 2 triggers to route two types of +// events to receiver. +type BrokerAndTriggers struct { + Broker + Triggers +} + +// Broker will hold settings for broker itself +type Broker struct { + Name string + Opts []resources.BrokerOption +} + +// Triggers will hold settings for triggers +type Triggers struct { + Types []string +} + +// NewBrokerAndTriggers will create default configuration for BrokerAndTriggers +// based SUT. +func NewBrokerAndTriggers() SystemUnderTest { + return &BrokerAndTriggers{ + Broker: Broker{ + Name: "sut", + Opts: []resources.BrokerOption{ + resources.WithDeliveryForBroker( + &eventingduckv1.DeliverySpec{ + Retry: &retryCount, + BackoffPolicy: &backoffPolicy, + BackoffDelay: &backoffDelay, + }), + }, + }, + Triggers: Triggers{ + Types: eventTypes, + }, + } +} + +func (b *BrokerAndTriggers) Deploy(ctx Context, dest duckv1.Destination) interface{} { + b.deployBroker(ctx) + url := b.fetchURL(ctx) + b.deployTriggers(ctx, dest) + return url +} + +func (b *BrokerAndTriggers) deployBroker(ctx Context) { + ctx.Client.CreateBrokerOrFail(b.Name, b.Broker.Opts...) +} + +func (b *BrokerAndTriggers) fetchURL(ctx Context) *apis.URL { + namespace := ctx.Client.Namespace + ctx.Log.Debugf("Fetching \"%s\" broker URL for ns %s", + b.Name, namespace) + meta := resources.NewMetaResource( + b.Name, namespace, testlib.BrokerTypeMeta, + ) + err := duck.WaitForResourceReady(ctx.Client.Dynamic, meta) + if err != nil { + ctx.T.Fatal(err) + } + broker, err := ctx.Client.Eventing.EventingV1().Brokers(namespace).Get( + ctx.Ctx, b.Name, metav1.GetOptions{}, + ) + if err != nil { + ctx.T.Fatal(err) + } + url := broker.Status.Address.URL + ctx.Log.Debugf("\"%s\" broker URL for ns %s is %v", + b.Name, namespace, url) + return url +} + +func (b *BrokerAndTriggers) deployTriggers(ctx Context, dest duckv1.Destination) { + triggers := make([]*eventingv1.Trigger, 0, len(b.Triggers.Types)) + for _, eventType := range b.Triggers.Types { + name := fmt.Sprintf("%s-%s", b.Name, eventType) + subscriberOption := resources.WithSubscriberDestination(func(t *eventingv1.Trigger) duckv1.Destination { + return dest + }) + ctx.Log.Debugf("Creating trigger \"%s\" for type %s to route to %#v", + name, eventType, dest) + trgr := ctx.Client.CreateTriggerOrFail( + name, + resources.WithBroker(b.Name), + resources.WithAttributesTriggerFilter( + eventingv1.TriggerAnyFilter, + eventType, + map[string]interface{}{}, + ), + subscriberOption, + ) + triggers = append(triggers, trgr) + } + for _, trgr := range triggers { + meta := resources.NewMetaResource( + trgr.Name, trgr.Namespace, testlib.TriggerTypeMeta, + ) + err := duck.WaitForResourceReady(ctx.Client.Dynamic, meta) + if err != nil { + ctx.T.Fatal(err) + } + } +} diff --git a/test/upgrade/prober/sut/broker_e2e_test.go b/test/upgrade/prober/sut/broker_e2e_test.go new file mode 100644 index 00000000000..71c13602815 --- /dev/null +++ b/test/upgrade/prober/sut/broker_e2e_test.go @@ -0,0 +1,163 @@ +// +build e2e + +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut_test + +import ( + "context" + "fmt" + "reflect" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "k8s.io/apiserver/pkg/storage/names" + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" + "knative.dev/eventing/test/lib/resources" + "knative.dev/eventing/test/upgrade/prober/sut" + watholaevent "knative.dev/eventing/test/upgrade/prober/wathola/event" + watholasender "knative.dev/eventing/test/upgrade/prober/wathola/sender" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/signals" + pkgTest "knative.dev/pkg/test" +) + +// TestBrokerAndTriggers isn't executed by regular E2E tests as it's actually +// executed as part of the upgrade tests. This test here is to make it easy to +// create other SUTs for Eventing and quickly verify them act properly. +func TestBrokerAndTriggers(t *testing.T) { + ctx := signals.NewContext() + client := testlib.Setup(t, false) + defer testlib.TearDown(client) + s := sut.NewBrokerAndTriggers() + sutCtx := sut.Context{ + Ctx: ctx, + Log: log(t), + Client: client, + } + // create event logger pod and service as the subscriber + receiverName := "receiver" + eis, pod := recordevents.StartEventRecordOrFail(ctx, client, receiverName) + + ref := pkgTest.CoreV1ObjectReference( + resources.ServiceKind, resources.CoreAPIVersion, receiverName) + endpoint := s.Deploy(sutCtx, duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: ref.APIVersion, + Kind: ref.Kind, + Namespace: pod.Namespace, + Name: pod.Name, + }, + }) + if tr, ok := s.(sut.HasTeardown); ok { + defer tr.Teardown(sutCtx) + } + assert.NotEmpty(t, endpoint) + step1 := watholasender.NewCloudEvent(watholaevent.Step{Number: 1}, + watholaevent.StepType) + step2 := watholasender.NewCloudEvent(watholaevent.Step{Number: 2}, + watholaevent.StepType) + finished := watholasender.NewCloudEvent(watholaevent.Finished{EventsSent: 2}, + watholaevent.FinishedType) + + sender := testingSender{t: t, ctx: ctx, client: client, url: endpoint.(*apis.URL)} + sender.send(step1) + sender.send(step2) + sender.send(finished) + + eis.AssertAtLeast(2, eventType(watholaevent.StepType)) + eis.AssertAtLeast(1, eventType(watholaevent.FinishedType)) + eis.AssertAtLeast(1, stepEvent(watholaevent.Step{Number: 1})) + eis.AssertAtLeast(1, stepEvent(watholaevent.Step{Number: 2})) + eis.AssertAtLeast(1, finishedEvent(watholaevent.Finished{EventsSent: 2})) +} + +type testingSender struct { + t *testing.T + ctx context.Context + client *testlib.Client + url *apis.URL +} + +func (s *testingSender) send(event cloudevents.Event) { + s.t.Helper() + senderName := names.SimpleNameGenerator.GenerateName("sender-") + s.client.SendEvent(s.ctx, senderName, s.url.String(), event) +} + +func eventType(_type string) recordevents.EventInfoMatcher { + return func(info recordevents.EventInfo) error { + actual := info.Event.Type() + if actual == _type { + return nil + } + return fmt.Errorf("event type don't match. want: '%s', got: '%s'", + _type, actual) + } +} + +func finishedEvent(expected watholaevent.Finished) recordevents.EventInfoMatcher { + return func(info recordevents.EventInfo) error { + ce := info.Event + if ce.Type() == watholaevent.FinishedType { + actual := &watholaevent.Finished{} + err := ce.DataAs(actual) + if err != nil { + return err + } + if reflect.DeepEqual(actual, expected) { + return fmt.Errorf( + "finished event don't match. want: %#v, got: %#v", + expected, actual, + ) + } + } + return nil + } +} + +func stepEvent(expected watholaevent.Step) recordevents.EventInfoMatcher { + return func(info recordevents.EventInfo) error { + ce := info.Event + if ce.Type() == watholaevent.StepType { + actual := &watholaevent.Step{} + err := ce.DataAs(actual) + if err != nil { + return err + } + if reflect.DeepEqual(actual, expected) { + return fmt.Errorf( + "step event don't match. want: %#v, got: %#v", + expected, actual, + ) + } + } + return nil + } +} + +func log(t *testing.T) *zap.SugaredLogger { + l, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + return l.Sugar() +} diff --git a/test/upgrade/prober/sut/default.go b/test/upgrade/prober/sut/default.go new file mode 100644 index 00000000000..1f11edbdd2c --- /dev/null +++ b/test/upgrade/prober/sut/default.go @@ -0,0 +1,22 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut + +// NewDefault will return a default SUT for this repo. +func NewDefault() SystemUnderTest { + return NewBrokerAndTriggers() +} diff --git a/test/upgrade/prober/sut/default_test.go b/test/upgrade/prober/sut/default_test.go new file mode 100644 index 00000000000..11a45aa29e0 --- /dev/null +++ b/test/upgrade/prober/sut/default_test.go @@ -0,0 +1,32 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "knative.dev/eventing/test/upgrade/prober/sut" +) + +func TestNewDefault(t *testing.T) { + s := sut.NewDefault() + assert.Condition(t, func() bool { + _, ok := s.(*sut.BrokerAndTriggers) + return ok + }) +} diff --git a/test/upgrade/prober/sut/types.go b/test/upgrade/prober/sut/types.go new file mode 100644 index 00000000000..8f134101951 --- /dev/null +++ b/test/upgrade/prober/sut/types.go @@ -0,0 +1,54 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sut + +import ( + "context" + + "go.uber.org/zap" + testlib "knative.dev/eventing/test/lib" + watholaevent "knative.dev/eventing/test/upgrade/prober/wathola/event" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +var eventTypes = []string{ + watholaevent.Step{}.Type(), + watholaevent.Finished{}.Type(), +} + +// SystemUnderTest (SUT) represents a system that we'd like to test with +// continual prober. +type SystemUnderTest interface { + // Deploy is responsible for deploying SUT and returning a URL to feed + // events into. + Deploy(ctx Context, destination duckv1.Destination) interface{} +} + +// HasTeardown indicates that SystemUnderTest supports custom teardown that +// exceeds regular teardown via usage of testlib.Tracker. +type HasTeardown interface { + // Teardown will remove all deployed SUT resources. + Teardown(ctx Context) +} + +// Context represents a context of system under test that we'd +// like to deploy and teardown. +type Context struct { + Ctx context.Context + Log *zap.SugaredLogger + *testlib.Client +} diff --git a/test/upgrade/prober/verify.go b/test/upgrade/prober/verify.go index 1687753a09d..90ce927a688 100644 --- a/test/upgrade/prober/verify.go +++ b/test/upgrade/prober/verify.go @@ -22,7 +22,6 @@ import ( "fmt" "time" - "github.com/wavesoftware/go-ensure" "go.uber.org/zap" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -42,8 +41,8 @@ const ( ) // Verify will verify prober state after finished has been sent. -func (p *prober) Verify(ctx context.Context) (eventErrs []error, eventsSent int, fetchErr error) { - report := p.fetchReport(ctx) +func (p *prober) Verify() (eventErrs []error, eventsSent int) { + report := p.fetchReport() availRate := 0.0 if report.TotalRequests != 0 { availRate = float64(report.EventsSent*100) / float64(report.TotalRequests) @@ -53,7 +52,7 @@ func (p *prober) Verify(ctx context.Context) (eventErrs []error, eventsSent int, p.log.Infof("Availability: %.3f%%, Requests sent: %d.", availRate, report.TotalRequests) if report.State == "active" { - return nil, report.EventsSent, errors.New("report fetched too early, receiver is in active state") + p.client.T.Fatal("report fetched too early, receiver is in active state") } for _, t := range report.Thrown.Missing { eventErrs = append(eventErrs, errors.New(t)) @@ -71,16 +70,16 @@ func (p *prober) Verify(ctx context.Context) (eventErrs []error, eventsSent int, eventErrs = append(eventErrs, errors.New(t)) } } - return eventErrs, report.EventsSent, nil + return eventErrs, report.EventsSent } // Finish terminates sender which sends finished event. -func (p *prober) Finish(ctx context.Context) { - p.removeSender(ctx) +func (p *prober) Finish() { + p.removeSender() } -func (p *prober) fetchReport(ctx context.Context) *receiver.Report { - exec := p.fetchExecution(ctx) +func (p *prober) fetchReport() *receiver.Report { + exec := p.fetchExecution() replayLogs(p.log, exec) return exec.Report } @@ -100,14 +99,14 @@ func replayLogs(log *zap.SugaredLogger, exec *fetcher.Execution) { } } -func (p *prober) fetchExecution(ctx context.Context) *fetcher.Execution { - ns := p.config.Namespace - job := p.deployFetcher(ctx) - defer p.deleteFetcher(ctx) - pod, err := p.findSucceededPod(ctx, job) - ensure.NoError(err) - bytes, err := pkgTest.PodLogs(ctx, p.client.Kube, pod.Name, fetcherName, ns) - ensure.NoError(err) +func (p *prober) fetchExecution() *fetcher.Execution { + ns := p.client.Namespace + job := p.deployFetcher() + defer p.deleteFetcher() + pod, err := p.findSucceededPod(job) + p.ensureNoError(err) + bytes, err := pkgTest.PodLogs(p.config.Ctx, p.client.Kube, pod.Name, fetcherName, ns) + p.ensureNoError(err) ex := &fetcher.Execution{ Logs: []fetcher.LogEntry{}, Report: &receiver.Report{ @@ -123,13 +122,13 @@ func (p *prober) fetchExecution(ctx context.Context) *fetcher.Execution { }, } err = json.Unmarshal(bytes, ex) - ensure.NoError(err) + p.ensureNoError(err) return ex } -func (p *prober) deployFetcher(ctx context.Context) *batchv1.Job { +func (p *prober) deployFetcher() *batchv1.Job { p.log.Info("Deploying fetcher job: ", fetcherName) - jobs := p.client.Kube.BatchV1().Jobs(p.config.Namespace) + jobs := p.client.Kube.BatchV1().Jobs(p.client.Namespace) var replicas int32 = 1 fetcherJob := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -170,28 +169,28 @@ func (p *prober) deployFetcher(ctx context.Context) *batchv1.Job { }, }, } - created, err := jobs.Create(ctx, fetcherJob, metav1.CreateOptions{}) - ensure.NoError(err) + created, err := jobs.Create(p.config.Ctx, fetcherJob, metav1.CreateOptions{}) + p.ensureNoError(err) p.log.Info("Waiting for fetcher job to succeed: ", fetcherName) - err = waitForJobToComplete(ctx, p.client.Kube, fetcherName, p.config.Namespace) - ensure.NoError(err) + err = waitForJobToComplete(p.config.Ctx, p.client.Kube, fetcherName, p.client.Namespace) + p.ensureNoError(err) return created } -func (p *prober) deleteFetcher(ctx context.Context) { - ns := p.config.Namespace +func (p *prober) deleteFetcher() { + ns := p.client.Namespace jobs := p.client.Kube.BatchV1().Jobs(ns) - err := jobs.Delete(ctx, fetcherName, metav1.DeleteOptions{}) - ensure.NoError(err) + err := jobs.Delete(p.config.Ctx, fetcherName, metav1.DeleteOptions{}) + p.ensureNoError(err) } -func (p *prober) findSucceededPod(ctx context.Context, job *batchv1.Job) (*corev1.Pod, error) { +func (p *prober) findSucceededPod(job *batchv1.Job) (*corev1.Pod, error) { pods := p.client.Kube.CoreV1().Pods(job.Namespace) - podList, err := pods.List(ctx, metav1.ListOptions{ + podList, err := pods.List(p.config.Ctx, metav1.ListOptions{ LabelSelector: fmt.Sprint("job-name=", job.Name), }) - ensure.NoError(err) + p.ensureNoError(err) for _, pod := range podList.Items { if pod.Status.Phase == corev1.PodSucceeded { return &pod, nil diff --git a/test/upgrade/prober/wathola/config/defaults.go b/test/upgrade/prober/wathola/config/defaults.go index e25f0a92d35..77f300a61ef 100644 --- a/test/upgrade/prober/wathola/config/defaults.go +++ b/test/upgrade/prober/wathola/config/defaults.go @@ -62,7 +62,7 @@ func defaultValues() *Config { Duration: time.Second, }, Errors: ReceiverErrorConfig{ - UnavailablePeriodToReport: 5 * time.Second, + UnavailablePeriodToReport: time.Minute, }, }, Forwarder: ForwarderConfig{ diff --git a/test/upgrade/prober/wathola/event/types.go b/test/upgrade/prober/wathola/event/types.go index f6559dab5e8..2a7fe264bb8 100644 --- a/test/upgrade/prober/wathola/event/types.go +++ b/test/upgrade/prober/wathola/event/types.go @@ -23,9 +23,9 @@ import ( const ( // StepType is a string type representation of step event - StepType = "com.github.cardil.wathola.step" + StepType = "dev.knative.eventing.wathola.step" // FinishedType os a string type representation of finished event - FinishedType = "com.github.cardil.wathola.finished" + FinishedType = "dev.knative.eventing.wathola.finished" ) // Step is a event call at each step of verification