diff --git a/test/upgrade/prober/forwarder.go b/test/upgrade/prober/forwarder.go index a5242b870d8..2a7d9daa728 100644 --- a/test/upgrade/prober/forwarder.go +++ b/test/upgrade/prober/forwarder.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/wavesoftware/go-ensure" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" testlib "knative.dev/eventing/test/lib" @@ -27,24 +28,22 @@ import ( pkgTest "knative.dev/pkg/test" ) -var ( - forwarderName = "wathola-forwarder" -) +var forwarderName = "wathola-forwarder" func (p *prober) deployForwarder() { - p.log.Infof("Deploy forwarder knative service: %v", forwarderName) + p.log.Info("Deploy forwarder knative service: ", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) service := p.forwarderKService(forwarderName, p.client.Namespace) _, err := serving.Create(service, metav1.CreateOptions{}) ensure.NoError(err) sc := p.servingClient() - testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarderName), func() error { + testlib.WaitFor(fmt.Sprint("forwarder knative service be ready: ", forwarderName), func() error { return duck.WaitForKServiceReady(sc, forwarderName, p.client.Namespace) }) if p.config.Serving.ScaleToZero { - testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarderName), func() error { + testlib.WaitFor(fmt.Sprint("forwarder scales to zero: ", forwarderName), func() error { return duck.WaitForKServiceScales(sc, forwarderName, p.client.Namespace, func(scale int) bool { return scale == 0 }) @@ -53,49 +52,47 @@ func (p *prober) deployForwarder() { } func (p *prober) removeForwarder() { - p.log.Infof("Remove forwarder knative service: %v", forwarderName) + p.log.Info("Remove forwarder knative service: ", forwarderName) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) err := serving.Delete(forwarderName, &metav1.DeleteOptions{}) ensure.NoError(err) } func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstructured { - obj := map[string]interface{}{ - "apiVersion": resources.KServiceType.APIVersion, - "kind": resources.KServiceType.Kind, - "metadata": map[string]interface{}{ - "name": name, - "namespace": namespace, - "labels": map[string]string{ - "serving.knative.dev/visibility": "cluster-local", - }, + return kService(metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + "serving.knative.dev/visibility": "cluster-local", }, - "spec": map[string]interface{}{ - "template": map[string]interface{}{ - "spec": map[string]interface{}{ - "containers": []map[string]interface{}{{ - "name": "forwarder", - "image": pkgTest.ImagePath(forwarderName), - "volumeMounts": []map[string]interface{}{{ - "name": p.config.ConfigMapName, - "mountPath": p.config.ConfigMountPoint, - "readOnly": true, - }}, - "readinessProbe": map[string]interface{}{ - "httpGet": map[string]interface{}{ - "path": p.config.HealthEndpoint, - }, - }, - }}, - "volumes": []map[string]interface{}{{ - "name": p.config.ConfigMapName, - "configMap": map[string]interface{}{ - "name": p.config.ConfigMapName, - }, - }}, + }, corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "forwarder", + Image: pkgTest.ImagePath(forwarderName), + VolumeMounts: []corev1.VolumeMount{ + { + Name: p.config.ConfigMapName, + MountPath: p.config.ConfigMountPoint, + ReadOnly: true, }, }, - }, - } - return &unstructured.Unstructured{Object: obj} + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: p.config.HealthEndpoint, + }, + }, + }, + }}, + Volumes: []corev1.Volume{{ + Name: p.config.ConfigMapName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: p.config.ConfigMapName, + }, + }, + }, + }}, + }) } diff --git a/test/upgrade/prober/kservice.go b/test/upgrade/prober/kservice.go new file mode 100644 index 00000000000..88cb38bcbbf --- /dev/null +++ b/test/upgrade/prober/kservice.go @@ -0,0 +1,75 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "knative.dev/eventing/test/lib/resources" +) + +func kService(meta metav1.ObjectMeta, spec corev1.PodSpec) *unstructured.Unstructured { + if len(spec.Containers) != 1 { + panic("kService func supports PodSpec with 1 container only") + } + container := spec.Containers[0] + if len(spec.Volumes) != 1 || len(container.VolumeMounts) != 1 { + panic("kService func supports PodSpec with 1 volume only") + } + volume := spec.Volumes[0] + volmount := container.VolumeMounts[0] + obj := map[string]interface{}{ + "apiVersion": resources.KServiceType.APIVersion, + "kind": resources.KServiceType.Kind, + "metadata": map[string]interface{}{ + "name": meta.Name, + "namespace": meta.Namespace, + "labels": meta.Labels, + }, + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": meta.Annotations, + }, + "spec": map[string]interface{}{ + "containers": []map[string]interface{}{{ + "name": container.Name, + "image": container.Image, + "volumeMounts": []map[string]interface{}{{ + "name": volmount.Name, + "mountPath": volmount.MountPath, + "readOnly": true, + }}, + "readinessProbe": map[string]interface{}{ + "httpGet": map[string]interface{}{ + "path": container.ReadinessProbe.HTTPGet.Path, + }, + }, + }}, + "volumes": []map[string]interface{}{{ + "name": volume.Name, + "configMap": map[string]interface{}{ + "name": volume.ConfigMap.Name, + }, + }}, + }, + }, + }, + } + return &unstructured.Unstructured{Object: obj} +} diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 0a11372ab10..3855ec5c50b 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -123,6 +123,7 @@ func (p *prober) deploy() { func (p *prober) remove() { if p.config.Serving.Use { p.removeForwarder() + p.removeReceiverKService() } ensure.NoError(p.client.Tracker.Clean(true)) } diff --git a/test/upgrade/prober/receiver.go b/test/upgrade/prober/receiver.go index 331a7b8bd2e..b49517c0f28 100644 --- a/test/upgrade/prober/receiver.go +++ b/test/upgrade/prober/receiver.go @@ -17,6 +17,7 @@ package prober import ( "fmt" + "net/url" "github.com/wavesoftware/go-ensure" appsv1 "k8s.io/api/apps/v1" @@ -24,6 +25,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/duck" + "knative.dev/eventing/test/lib/resources" watholaconfig "knative.dev/eventing/test/upgrade/prober/wathola/config" pkgTest "knative.dev/pkg/test" ) @@ -34,8 +37,19 @@ var ( ) func (p *prober) deployReceiver() { - p.deployReceiverDeployment() - p.deployReceiverService() + if p.config.Serving.Use { + p.deployReceiverKService() + } else { + p.deployReceiverDeployment() + p.deployReceiverService() + } +} + +func (p *prober) removeReceiverKService() { + p.log.Info("Remove receiver knative service: ", receiverName) + serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) + err := serving.Delete(receiverName, &metav1.DeleteOptions{}) + ensure.NoError(err) } func (p *prober) deployReceiverDeployment() { @@ -54,7 +68,7 @@ func (p *prober) deployReceiverDeployment() { } func (p *prober) deployReceiverService() { - p.log.Infof("Deploy of receiver service: %v", receiverName) + p.log.Info("Deploy of receiver service: ", receiverName) service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: receiverName, @@ -93,6 +107,48 @@ func (p *prober) deployReceiverService() { } } +func (p *prober) deployReceiverKService() { + p.log.Info("Deploy of receiver knative service: ", receiverName) + deployment := p.createReceiverDeployment() + podSpec := deployment.Spec.Template.Spec + podSpec.Containers[0].ReadinessProbe.HTTPGet = &corev1.HTTPGetAction{ + Path: p.config.HealthEndpoint, + } + deployment.ObjectMeta.Annotations = map[string]string{ + "autoscaling.knative.dev/minScale": "1", + "autoscaling.knative.dev/maxScale": "1", + } + ksvc := kService(deployment.ObjectMeta, podSpec) + serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) + _, err := serving.Create(ksvc, metav1.CreateOptions{}) + ensure.NoError(err) + + sc := p.servingClient() + testlib.WaitFor(fmt.Sprint("receiver knative service be ready: ", receiverName), func() error { + return duck.WaitForKServiceReady(sc, receiverName, p.client.Namespace) + }) +} + +func (p *prober) receiverKServiceAddress() *url.URL { + serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) + ksvc, err := serving.Get(receiverName, metav1.GetOptions{}) + ensure.NoError(err) + content := ksvc.UnstructuredContent() + maybeStatus, ok := content["status"] + if !ok { + panic("no status on knative service") + } + status := maybeStatus.(map[string]interface{}) + maybeURL, ok := status["url"] + if !ok { + panic("no url on knative service status") + } + u, err := url.Parse(maybeURL.(string)) + ensure.NoError(err) + u.Path = "/report" + return u +} + func (p *prober) createReceiverDeployment() *appsv1.Deployment { var replicas int32 = 1 return &appsv1.Deployment{ diff --git a/test/upgrade/prober/verify.go b/test/upgrade/prober/verify.go index 7db5f69a434..d7e81034df8 100644 --- a/test/upgrade/prober/verify.go +++ b/test/upgrade/prober/verify.go @@ -21,21 +21,23 @@ import ( "errors" "fmt" "net/http" + "net/url" "strings" "github.com/wavesoftware/go-ensure" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" "knative.dev/eventing/test/lib/nodes" ) func (p *prober) Verify() ([]error, int) { - nc := nodes.Client(p.client.Kube.Kube, p.log) - node, err := nc.RandomWorkerNode() - ensure.NoError(err) - address := nc.GuessNodeExternalAddress(node) - p.log.Debugf("Address resolved: %v, type: %v", address.Address, address.Type) - report := fetchReceiverReport(address, p.log) + var urlResolver func() *url.URL + if p.config.Serving.Use { + urlResolver = p.receiverKServiceAddress + } else { + urlResolver = p.receiverAddressByWorkerExternalAddressNodePort + } + u := urlResolver() + report := fetchReceiverReport(u, p.log) p.log.Infof("Fetched receiver report. Events propagated: %v. "+ "State: %v", report.Events, report.State) if report.State == "active" { @@ -52,10 +54,22 @@ func (p *prober) Finish() { p.removeSender() } -func fetchReceiverReport(address *corev1.NodeAddress, log *zap.SugaredLogger) *Report { - u := fmt.Sprintf("http://%s:%d/report", address.Address, receiverNodePort) +func (p *prober) receiverAddressByWorkerExternalAddressNodePort() *url.URL { + nc := nodes.Client(p.client.Kube.Kube, p.log) + node, err := nc.RandomWorkerNode() + ensure.NoError(err) + address := nc.GuessNodeExternalAddress(node) + p.log.Debugf("Address resolved: %v, type: %v", address.Address, address.Type) + + u, err := url.Parse(fmt.Sprintf("http://%s:%d/report", + address.Address, receiverNodePort)) + ensure.NoError(err) + return u +} + +func fetchReceiverReport(u *url.URL, log *zap.SugaredLogger) *Report { log.Infof("Fetching receiver report from: %v", u) - resp, err := http.Get(u) + resp, err := http.Get(u.String()) ensure.NoError(err) if resp.StatusCode != 200 { var b strings.Builder