diff --git a/config/post-install/v0.15.0/storage-version-migration.yaml b/config/post-install/v0.15.0/storage-version-migration.yaml index b39392ace2a..3e6e337bfef 100644 --- a/config/post-install/v0.15.0/storage-version-migration.yaml +++ b/config/post-install/v0.15.0/storage-version-migration.yaml @@ -27,6 +27,8 @@ spec: metadata: labels: app: "storage-version-migration" + annotations: + sidecar.istio.io/inject: "false" spec: serviceAccountName: knative-eventing-post-install-job restartPolicy: OnFailure diff --git a/config/pre-install/v0.16.0/storage-version-migration.yaml b/config/pre-install/v0.16.0/storage-version-migration.yaml index 3b1bd08cc3a..e0af598e837 100644 --- a/config/pre-install/v0.16.0/storage-version-migration.yaml +++ b/config/pre-install/v0.16.0/storage-version-migration.yaml @@ -15,10 +15,10 @@ apiVersion: batch/v1 kind: Job metadata: - name: storage-version-migration-v016 + name: storage-version-migration-eventing namespace: knative-eventing labels: - app: "storage-version-migration" + app: "storage-version-migration-eventing" eventing.knative.dev/release: devel spec: ttlSecondsAfterFinished: 600 @@ -26,7 +26,9 @@ spec: template: metadata: labels: - app: "storage-version-migration" + app: "storage-version-migration-eventing" + annotations: + sidecar.istio.io/inject: "false" spec: serviceAccountName: knative-eventing-pre-install-job restartPolicy: OnFailure diff --git a/pkg/apis/flows/v1/parallel_lifecycle.go b/pkg/apis/flows/v1/parallel_lifecycle.go index 18857479ab3..cccd20503e0 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle.go +++ b/pkg/apis/flows/v1/parallel_lifecycle.go @@ -77,7 +77,7 @@ func (ps *ParallelStatus) InitializeConditions() { // PropagateSubscriptionStatuses sets the ParallelConditionSubscriptionsReady based on // the status of the incoming subscriptions. func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*messagingv1.Subscription, subscriptions []*messagingv1.Subscription) { - if ps.BranchStatuses == nil { + if ps.BranchStatuses == nil || len(subscriptions) != len(ps.BranchStatuses) { ps.BranchStatuses = make([]ParallelBranchStatus, len(subscriptions)) } allReady := true @@ -136,7 +136,7 @@ func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*m // PropagateChannelStatuses sets the ChannelStatuses and ParallelConditionChannelsReady based on the // status of the incoming channels. func (ps *ParallelStatus) PropagateChannelStatuses(ingressChannel *duckv1.Channelable, channels []*duckv1.Channelable) { - if ps.BranchStatuses == nil { + if ps.BranchStatuses == nil || len(channels) != len(ps.BranchStatuses) { ps.BranchStatuses = make([]ParallelBranchStatus, len(channels)) } allReady := true diff --git a/pkg/apis/flows/v1/parallel_lifecycle_test.go b/pkg/apis/flows/v1/parallel_lifecycle_test.go index d5fd532ec48..95320fd8f6c 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle_test.go +++ b/pkg/apis/flows/v1/parallel_lifecycle_test.go @@ -293,6 +293,37 @@ func TestParallelPropagateChannelStatuses(t *testing.T) { } } +func TestParallelPropagateChannelStatusUpdated(t *testing.T) { + inChannel := getChannelable(true) + initialChannels := []*eventingduckv1.Channelable{getChannelable(true)} + afterChannels := []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)} + ps := ParallelStatus{} + ps.PropagateChannelStatuses(inChannel, initialChannels) + if len(ps.BranchStatuses) != 1 { + t.Errorf("unexpected branchstatuses want 1 got %d", len(ps.BranchStatuses)) + } + ps.PropagateChannelStatuses(inChannel, afterChannels) + if len(ps.BranchStatuses) != 2 { + t.Errorf("unexpected branchstatuses want 2 got %d", len(ps.BranchStatuses)) + } +} + +func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) { + initialFsubs := []*messagingv1.Subscription{getSubscription("fsub0", true)} + initialSubs := []*messagingv1.Subscription{getSubscription("sub0", true)} + afterFsubs := []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)} + afterSubs := []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)} + ps := ParallelStatus{} + ps.PropagateSubscriptionStatuses(initialFsubs, initialSubs) + if len(ps.BranchStatuses) != 1 { + t.Errorf("unexpected branchstatuses want 1 got %d", len(ps.BranchStatuses)) + } + ps.PropagateSubscriptionStatuses(afterFsubs, afterSubs) + if len(ps.BranchStatuses) != 2 { + t.Errorf("unexpected branchstatuses want 2 got %d", len(ps.BranchStatuses)) + } +} + func TestParallelReady(t *testing.T) { tests := []struct { name string diff --git a/pkg/mtbroker/filter/filter_handler.go b/pkg/mtbroker/filter/filter_handler.go index 80b49f7b5da..70252c76710 100644 --- a/pkg/mtbroker/filter/filter_handler.go +++ b/pkg/mtbroker/filter/filter_handler.go @@ -255,8 +255,12 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target str err = fmt.Errorf("failed to dispatch message: %w", err) } - _ = h.reporter.ReportEventDispatchTime(reporterArgs, resp.StatusCode, dispatchTime) + sc := 0 + if resp != nil { + sc = resp.StatusCode + } + _ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime) return resp, err } diff --git a/test/e2e-conformance-tests.sh b/test/e2e-conformance-tests.sh new file mode 100755 index 00000000000..04b63db2a0a --- /dev/null +++ b/test/e2e-conformance-tests.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script runs the end-to-end tests against eventing built from source. + +# If you already have the *_OVERRIDE environment variables set, call +# this script with the --run-tests arguments and it will use the cluster +# and run the tests. + +# Calling this script without arguments will create a new cluster in +# project $PROJECT_ID, start Knative eventing system, run the tests and +# delete the cluster. + +export GO111MODULE=on + +source "$(dirname "$0")/e2e-common.sh" + +# Script entry point. + +initialize $@ --skip-istio-addon + + +install_mt_broker || fail_test "Could not install MT Channel Based Broker" + +install_sugar || fail_test "Could not install Sugar Controller" + +echo "Running tests with Multi Tenant Channel Based Broker" +go_test_e2e -timeout=30m -parallel=12 ./test/conformance -brokerclass=MTChannelBasedBroker -channels=messaging.knative.dev/v1beta1:Channel,messaging.knative.dev/v1beta1:InMemoryChannel,messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1alpha2:ApiServerSource,sources.knative.dev/v1alpha2:ContainerSource,sources.knative.dev/v1alpha2:PingSource || fail_test + +success diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 10797079551..4c3e993ba37 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -37,6 +37,6 @@ install_mt_broker || fail_test "Could not install MT Channel Based Broker" install_sugar || fail_test "Could not install Sugar Controller" echo "Running tests with Multi Tenant Channel Based Broker" -go_test_e2e -timeout=30m -parallel=12 ./test/e2e ./test/conformance -brokerclass=MTChannelBasedBroker -channels=messaging.knative.dev/v1beta1:Channel,messaging.knative.dev/v1beta1:InMemoryChannel,messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1alpha2:ApiServerSource,sources.knative.dev/v1alpha2:ContainerSource,sources.knative.dev/v1alpha2:PingSource || fail_test +go_test_e2e -timeout=30m -parallel=12 ./test/e2e -brokerclass=MTChannelBasedBroker -channels=messaging.knative.dev/v1beta1:Channel,messaging.knative.dev/v1beta1:InMemoryChannel,messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1alpha2:ApiServerSource,sources.knative.dev/v1alpha2:ContainerSource,sources.knative.dev/v1alpha2:PingSource || fail_test success diff --git a/test/lib/duck/resource_inspectors.go b/test/lib/duck/resource_inspectors.go index 6df11cd6d5d..392b8a12b0e 100644 --- a/test/lib/duck/resource_inspectors.go +++ b/test/lib/duck/resource_inspectors.go @@ -39,6 +39,9 @@ func GetAddressableURI(dynamicClient dynamic.Interface, obj *resources.MetaResou at := untyped.(*duckv1alpha1.AddressableType) + if at.Status.Address == nil { + return url.URL{}, fmt.Errorf("addressable does not have an Address: %+v", at) + } au := at.Status.Address.GetURL() if au.Host == "" { return url.URL{}, fmt.Errorf("addressable's URL does not have a Host: %+v", at) diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 7f498a8afab..f98f5f24978 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -19,13 +19,11 @@ import ( "bytes" "fmt" "io/ioutil" - "knative.dev/eventing/pkg/reconciler/sugar" "path" "runtime" "text/template" "github.com/wavesoftware/go-ensure" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" testlib "knative.dev/eventing/test/lib" @@ -48,22 +46,13 @@ var ( ) func (p *prober) deployConfiguration() { - p.annotateNamespace() + p.deployBroker() p.deployConfigMap() p.deployTriggers() } -func (p *prober) annotateNamespace() { - ns, err := p.client.Kube.Kube.CoreV1().Namespaces(). - Get(p.client.Namespace, metav1.GetOptions{}) - ensure.NoError(err) - ns.Labels = map[string]string{ - sugar.DeprecatedInjectionLabelKey: sugar.InjectionEnabledLabelValue, - sugar.InjectionLabelKey: sugar.InjectionEnabledLabelValue, - } - _, err = p.client.Kube.Kube.CoreV1().Namespaces(). - Update(ns) - ensure.NoError(err) +func (p *prober) deployBroker() { + p.client.CreateBrokerV1Beta1OrFail(brokerName) } func (p *prober) fetchBrokerUrl() (*apis.URL, error) { @@ -85,28 +74,11 @@ func (p *prober) fetchBrokerUrl() (*apis.URL, error) { func (p *prober) deployConfigMap() { name := configName - testlib.WaitFor(fmt.Sprintf("configmap be deployed: %v", name), func() error { - p.log.Infof("Deploying config map: %v", name) - - brokerUrl, err := p.fetchBrokerUrl() - if err != nil { - return err - } - configData := p.compileTemplate(configFilename, brokerUrl) - watholaConfigMap := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Data: map[string]string{ - configFilename: configData, - }, - } - _, err = p.client.Kube.Kube.CoreV1().ConfigMaps(p.config.Namespace).Create(watholaConfigMap) - if err != nil { - return err - } - return nil - }) + p.log.Infof("Deploying config map: \"%s/%s\"", p.config.Namespace, name) + brokerUrl, err := p.fetchBrokerUrl() + ensure.NoError(err) + configData := p.compileTemplate(configFilename, brokerUrl) + p.client.CreateConfigMapOrFail(name, p.config.Namespace, map[string]string{configFilename: configData}) } func (p *prober) deployTriggers() { @@ -117,25 +89,14 @@ func (p *prober) deployTriggers() { if p.config.Serving.Use { subscriberOption = resources.WithSubscriberKServiceRefForTrigger(forwarderName) } - trigger := resources.TriggerV1Beta1( - name, + p.client.CreateTriggerOrFailV1Beta1(name, resources.WithBrokerV1Beta1(brokerName), resources.WithAttributesTriggerFilterV1Beta1( eventingv1beta1.TriggerAnyFilter, fullType, map[string]interface{}{}, ), - subscriberOption, - ) - triggers := p.client.Eventing.EventingV1beta1().Triggers(p.config.Namespace) - p.log.Infof("Deploying trigger: %v", name) - // update trigger with the new reference - _, err := triggers.Create(trigger) - ensure.NoError(err) - testlib.WaitFor(fmt.Sprintf("trigger be ready: %v", name), func() error { - meta := resources.NewMetaResource(name, p.config.Namespace, testlib.TriggerTypeMeta) - return duck.WaitForResourceReady(p.client.Dynamic, meta) - }) + subscriberOption) } } diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 79c7d6805f3..ea0a9f79b12 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -137,7 +137,7 @@ func (p *prober) deploy() { if p.config.Serving.Use { p.deployForwarder() } - ensure.NoError(testlib.AwaitForAll(p.log)) + p.client.WaitForAllTestResourcesReadyOrFail() p.deploySender() ensure.NoError(testlib.AwaitForAll(p.log)) @@ -151,8 +151,7 @@ func (p *prober) remove() { if p.config.Serving.Use { p.removeForwarder() } - p.removeReceiver() - p.removeConfiguration() + p.client.Tracker.Clean(true) } func newProber(log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober {