Skip to content
This repository was archived by the owner on Nov 28, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/post-install/v0.15.0/storage-version-migration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ spec:
metadata:
labels:
app: "storage-version-migration"
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccountName: knative-eventing-post-install-job
restartPolicy: OnFailure
Expand Down
8 changes: 5 additions & 3 deletions config/pre-install/v0.16.0/storage-version-migration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
apiVersion: batch/v1
kind: Job
metadata:
name: storage-version-migration-v016
name: storage-version-migration-eventing
namespace: knative-eventing
labels:
app: "storage-version-migration"
app: "storage-version-migration-eventing"
eventing.knative.dev/release: devel
spec:
ttlSecondsAfterFinished: 600
backoffLimit: 10
template:
metadata:
labels:
app: "storage-version-migration"
app: "storage-version-migration-eventing"
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccountName: knative-eventing-pre-install-job
restartPolicy: OnFailure
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/flows/v1/parallel_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (ps *ParallelStatus) InitializeConditions() {
// PropagateSubscriptionStatuses sets the ParallelConditionSubscriptionsReady based on
// the status of the incoming subscriptions.
func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*messagingv1.Subscription, subscriptions []*messagingv1.Subscription) {
if ps.BranchStatuses == nil {
if ps.BranchStatuses == nil || len(subscriptions) != len(ps.BranchStatuses) {
ps.BranchStatuses = make([]ParallelBranchStatus, len(subscriptions))
}
allReady := true
Expand Down Expand Up @@ -136,7 +136,7 @@ func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*m
// PropagateChannelStatuses sets the ChannelStatuses and ParallelConditionChannelsReady based on the
// status of the incoming channels.
func (ps *ParallelStatus) PropagateChannelStatuses(ingressChannel *duckv1.Channelable, channels []*duckv1.Channelable) {
if ps.BranchStatuses == nil {
if ps.BranchStatuses == nil || len(channels) != len(ps.BranchStatuses) {
ps.BranchStatuses = make([]ParallelBranchStatus, len(channels))
}
allReady := true
Expand Down
31 changes: 31 additions & 0 deletions pkg/apis/flows/v1/parallel_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,37 @@ func TestParallelPropagateChannelStatuses(t *testing.T) {
}
}

func TestParallelPropagateChannelStatusUpdated(t *testing.T) {
inChannel := getChannelable(true)
initialChannels := []*eventingduckv1.Channelable{getChannelable(true)}
afterChannels := []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}
ps := ParallelStatus{}
ps.PropagateChannelStatuses(inChannel, initialChannels)
if len(ps.BranchStatuses) != 1 {
t.Errorf("unexpected branchstatuses want 1 got %d", len(ps.BranchStatuses))
}
ps.PropagateChannelStatuses(inChannel, afterChannels)
if len(ps.BranchStatuses) != 2 {
t.Errorf("unexpected branchstatuses want 2 got %d", len(ps.BranchStatuses))
}
}

func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) {
initialFsubs := []*messagingv1.Subscription{getSubscription("fsub0", true)}
initialSubs := []*messagingv1.Subscription{getSubscription("sub0", true)}
afterFsubs := []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}
afterSubs := []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}
ps := ParallelStatus{}
ps.PropagateSubscriptionStatuses(initialFsubs, initialSubs)
if len(ps.BranchStatuses) != 1 {
t.Errorf("unexpected branchstatuses want 1 got %d", len(ps.BranchStatuses))
}
ps.PropagateSubscriptionStatuses(afterFsubs, afterSubs)
if len(ps.BranchStatuses) != 2 {
t.Errorf("unexpected branchstatuses want 2 got %d", len(ps.BranchStatuses))
}
}

func TestParallelReady(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 5 additions & 1 deletion pkg/mtbroker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,12 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target str
err = fmt.Errorf("failed to dispatch message: %w", err)
}

_ = h.reporter.ReportEventDispatchTime(reporterArgs, resp.StatusCode, dispatchTime)
sc := 0
if resp != nil {
sc = resp.StatusCode
}

_ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime)
return resp, err
}

Expand Down
43 changes: 43 additions & 0 deletions test/e2e-conformance-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env bash

# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This script runs the end-to-end tests against eventing built from source.

# If you already have the *_OVERRIDE environment variables set, call
# this script with the --run-tests arguments and it will use the cluster
# and run the tests.

# Calling this script without arguments will create a new cluster in
# project $PROJECT_ID, start Knative eventing system, run the tests and
# delete the cluster.

export GO111MODULE=on

source "$(dirname "$0")/e2e-common.sh"

# Script entry point.

initialize $@ --skip-istio-addon


install_mt_broker || fail_test "Could not install MT Channel Based Broker"

install_sugar || fail_test "Could not install Sugar Controller"

echo "Running tests with Multi Tenant Channel Based Broker"
go_test_e2e -timeout=30m -parallel=12 ./test/conformance -brokerclass=MTChannelBasedBroker -channels=messaging.knative.dev/v1beta1:Channel,messaging.knative.dev/v1beta1:InMemoryChannel,messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1alpha2:ApiServerSource,sources.knative.dev/v1alpha2:ContainerSource,sources.knative.dev/v1alpha2:PingSource || fail_test

success
2 changes: 1 addition & 1 deletion test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ install_mt_broker || fail_test "Could not install MT Channel Based Broker"
install_sugar || fail_test "Could not install Sugar Controller"

echo "Running tests with Multi Tenant Channel Based Broker"
go_test_e2e -timeout=30m -parallel=12 ./test/e2e ./test/conformance -brokerclass=MTChannelBasedBroker -channels=messaging.knative.dev/v1beta1:Channel,messaging.knative.dev/v1beta1:InMemoryChannel,messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1alpha2:ApiServerSource,sources.knative.dev/v1alpha2:ContainerSource,sources.knative.dev/v1alpha2:PingSource || fail_test
go_test_e2e -timeout=30m -parallel=12 ./test/e2e -brokerclass=MTChannelBasedBroker -channels=messaging.knative.dev/v1beta1:Channel,messaging.knative.dev/v1beta1:InMemoryChannel,messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1alpha2:ApiServerSource,sources.knative.dev/v1alpha2:ContainerSource,sources.knative.dev/v1alpha2:PingSource || fail_test

success
3 changes: 3 additions & 0 deletions test/lib/duck/resource_inspectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func GetAddressableURI(dynamicClient dynamic.Interface, obj *resources.MetaResou

at := untyped.(*duckv1alpha1.AddressableType)

if at.Status.Address == nil {
return url.URL{}, fmt.Errorf("addressable does not have an Address: %+v", at)
}
au := at.Status.Address.GetURL()
if au.Host == "" {
return url.URL{}, fmt.Errorf("addressable's URL does not have a Host: %+v", at)
Expand Down
59 changes: 10 additions & 49 deletions test/upgrade/prober/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (
"bytes"
"fmt"
"io/ioutil"
"knative.dev/eventing/pkg/reconciler/sugar"
"path"
"runtime"
"text/template"

"github.com/wavesoftware/go-ensure"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
testlib "knative.dev/eventing/test/lib"
Expand All @@ -48,22 +46,13 @@ var (
)

func (p *prober) deployConfiguration() {
p.annotateNamespace()
p.deployBroker()
p.deployConfigMap()
p.deployTriggers()
}

func (p *prober) annotateNamespace() {
ns, err := p.client.Kube.Kube.CoreV1().Namespaces().
Get(p.client.Namespace, metav1.GetOptions{})
ensure.NoError(err)
ns.Labels = map[string]string{
sugar.DeprecatedInjectionLabelKey: sugar.InjectionEnabledLabelValue,
sugar.InjectionLabelKey: sugar.InjectionEnabledLabelValue,
}
_, err = p.client.Kube.Kube.CoreV1().Namespaces().
Update(ns)
ensure.NoError(err)
func (p *prober) deployBroker() {
p.client.CreateBrokerV1Beta1OrFail(brokerName)
}

func (p *prober) fetchBrokerUrl() (*apis.URL, error) {
Expand All @@ -85,28 +74,11 @@ func (p *prober) fetchBrokerUrl() (*apis.URL, error) {

func (p *prober) deployConfigMap() {
name := configName
testlib.WaitFor(fmt.Sprintf("configmap be deployed: %v", name), func() error {
p.log.Infof("Deploying config map: %v", name)

brokerUrl, err := p.fetchBrokerUrl()
if err != nil {
return err
}
configData := p.compileTemplate(configFilename, brokerUrl)
watholaConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Data: map[string]string{
configFilename: configData,
},
}
_, err = p.client.Kube.Kube.CoreV1().ConfigMaps(p.config.Namespace).Create(watholaConfigMap)
if err != nil {
return err
}
return nil
})
p.log.Infof("Deploying config map: \"%s/%s\"", p.config.Namespace, name)
brokerUrl, err := p.fetchBrokerUrl()
ensure.NoError(err)
configData := p.compileTemplate(configFilename, brokerUrl)
p.client.CreateConfigMapOrFail(name, p.config.Namespace, map[string]string{configFilename: configData})
}

func (p *prober) deployTriggers() {
Expand All @@ -117,25 +89,14 @@ func (p *prober) deployTriggers() {
if p.config.Serving.Use {
subscriberOption = resources.WithSubscriberKServiceRefForTrigger(forwarderName)
}
trigger := resources.TriggerV1Beta1(
name,
p.client.CreateTriggerOrFailV1Beta1(name,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(
eventingv1beta1.TriggerAnyFilter,
fullType,
map[string]interface{}{},
),
subscriberOption,
)
triggers := p.client.Eventing.EventingV1beta1().Triggers(p.config.Namespace)
p.log.Infof("Deploying trigger: %v", name)
// update trigger with the new reference
_, err := triggers.Create(trigger)
ensure.NoError(err)
testlib.WaitFor(fmt.Sprintf("trigger be ready: %v", name), func() error {
meta := resources.NewMetaResource(name, p.config.Namespace, testlib.TriggerTypeMeta)
return duck.WaitForResourceReady(p.client.Dynamic, meta)
})
subscriberOption)
}
}

Expand Down
5 changes: 2 additions & 3 deletions test/upgrade/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *prober) deploy() {
if p.config.Serving.Use {
p.deployForwarder()
}
ensure.NoError(testlib.AwaitForAll(p.log))
p.client.WaitForAllTestResourcesReadyOrFail()

p.deploySender()
ensure.NoError(testlib.AwaitForAll(p.log))
Expand All @@ -151,8 +151,7 @@ func (p *prober) remove() {
if p.config.Serving.Use {
p.removeForwarder()
}
p.removeReceiver()
p.removeConfiguration()
p.client.Tracker.Clean(true)
}

func newProber(log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober {
Expand Down