Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/defaultchannel/channel_defaulter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ const (
// channel CRD.
ConfigMapName = "default-ch-webhook"

// channelDefaulterKey is the key in the ConfigMap to get the name of the default
// ChannelDefaulterKey is the key in the ConfigMap to get the name of the default
// Channel CRD.
channelDefaulterKey = "default-ch-config"
ChannelDefaulterKey = "default-ch-config"
)

// Configuration is the data structure serialized to YAML in the config map. When a Channel needs to be
// Config is the data structure serialized to YAML in the config map. When a Channel needs to be
// defaulted, the Channel's namespace will be used as a key into NamespaceDefaults, if there is
// something present, then that is used. If not, then the ClusterDefault is used.
type Config struct {
Expand Down Expand Up @@ -77,9 +77,9 @@ func (cd *ChannelDefaulter) UpdateConfigMap(cm *corev1.ConfigMap) {
cd.logger.Info("UpdateConfigMap on a nil map")
return
}
defaultChannelConfig, present := cm.Data[channelDefaulterKey]
defaultChannelConfig, present := cm.Data[ChannelDefaulterKey]
if !present {
cd.logger.Info("ConfigMap is missing key", zap.String("key", channelDefaulterKey), zap.Any("configMap", cm))
cd.logger.Info("ConfigMap is missing key", zap.String("key", ChannelDefaulterKey), zap.Any("configMap", cm))
return
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/defaultchannel/channel_defaulter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestChannelDefaulter_UpdateConfigMap(t *testing.T) {
"key missing in update": {
initialConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: configYaml,
ChannelDefaulterKey: configYaml,
},
},
expectedAfterInitial: config.ClusterDefault,
Expand All @@ -129,55 +129,55 @@ func TestChannelDefaulter_UpdateConfigMap(t *testing.T) {
"bad yaml is ignored": {
initialConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: configYaml,
ChannelDefaulterKey: configYaml,
},
},
expectedAfterInitial: config.ClusterDefault,
updatedConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: "foo -> bar",
ChannelDefaulterKey: "foo -> bar",
},
},
expectedAfterUpdate: config.ClusterDefault,
},
"empty config is accepted": {
initialConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: configYaml,
ChannelDefaulterKey: configYaml,
},
},
expectedAfterInitial: config.ClusterDefault,
updatedConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: "{}",
ChannelDefaulterKey: "{}",
},
},
expectedAfterUpdate: nil,
},
"empty string is ignored": {
initialConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: configYaml,
ChannelDefaulterKey: configYaml,
},
},
expectedAfterInitial: config.ClusterDefault,
updatedConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: "",
ChannelDefaulterKey: "",
},
},
expectedAfterUpdate: config.ClusterDefault,
},
"update to same channel": {
initialConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: configYaml,
ChannelDefaulterKey: configYaml,
},
},
expectedAfterInitial: config.ClusterDefault,
updatedConfig: &corev1.ConfigMap{
Data: map[string]string{
channelDefaulterKey: configYaml,
ChannelDefaulterKey: configYaml,
},
},
expectedAfterUpdate: config.ClusterDefault,
Expand Down
1 change: 1 addition & 0 deletions test/base/resources/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
const (
InMemoryChannelKind string = "InMemoryChannel"

ChannelKind string = "Channel"
SequenceKind string = "Sequence"
ChoiceKind string = "Choice"
)
Expand Down
14 changes: 12 additions & 2 deletions test/common/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var coreAPIVersion = corev1.SchemeGroupVersion.Version
var rbacAPIGroup = rbacv1.SchemeGroupVersion.Group
var rbacAPIVersion = rbacv1.SchemeGroupVersion.Version

// CreateChannelOrFail will create a Channel Resource in Eventing.
// CreateChannelOrFail will create a typed Channel Resource in Eventing or fail the test if there is an error.
func (client *Client) CreateChannelOrFail(name string, channelTypeMeta *metav1.TypeMeta) {
namespace := client.Namespace
metaResource := resources.NewMetaResource(name, namespace, channelTypeMeta)
Expand All @@ -47,13 +47,23 @@ func (client *Client) CreateChannelOrFail(name string, channelTypeMeta *metav1.T
client.Tracker.Add(gvr.Group, gvr.Version, gvr.Resource, namespace, name)
}

// CreateChannelsOrFail will create a list of Channel Resources in Eventing.
// CreateChannelsOrFail will create a list of typed Channel Resources in Eventing or fail the test if there is an error.
func (client *Client) CreateChannelsOrFail(names []string, channelTypeMeta *metav1.TypeMeta) {
for _, name := range names {
client.CreateChannelOrFail(name, channelTypeMeta)
}
}

// CreateChannelWithDefaultOrFail will create a default Channel Resource in Eventing or fail the test if there is an error.
func (client *Client) CreateChannelWithDefaultOrFail(channel *messagingv1alpha1.Channel) {
channels := client.Eventing.MessagingV1alpha1().Channels(client.Namespace)
_, err := channels.Create(channel)
if err != nil {
client.T.Fatalf("Failed to create channel %q: %v", channel.Name, err)
}
client.Tracker.AddObj(channel)
}

// CreateSubscriptionOrFail will create a Subscription or fail the test if there is an error.
func (client *Client) CreateSubscriptionOrFail(
name, channelName string,
Expand Down
14 changes: 7 additions & 7 deletions test/common/typemeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ func SourcesTypeMeta(kind string) *metav1.TypeMeta {
}
}

// InMemoryChannelTypeMeta is the TypeMeta ref for InMemoryChannel.
var InMemoryChannelTypeMeta = MessagingTypeMeta(resources.InMemoryChannelKind)
// GetChannelTypeMeta gets the actual typemeta of the typed channel.
func GetChannelTypeMeta(channelKind string) *metav1.TypeMeta {
return MessagingTypeMeta(channelKind)
}

// ChannelTypeMeta is the TypeMeta ref for Channel.
var ChannelTypeMeta = MessagingTypeMeta(resources.ChannelKind)

// SequenceTypeMeta is the TypeMeta ref for Sequence.
var SequenceTypeMeta = MessagingTypeMeta(resources.SequenceKind)
Expand All @@ -71,8 +76,3 @@ func MessagingTypeMeta(kind string) *metav1.TypeMeta {
APIVersion: resources.MessagingAPIVersion,
}
}

// GetChannelTypeMeta gets the actual typemeta of the channel.
func GetChannelTypeMeta(channelName string) *metav1.TypeMeta {
return MessagingTypeMeta(channelName)
}
35 changes: 35 additions & 0 deletions test/e2e/channel_defaulter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// +build e2e

/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"testing"

"knative.dev/eventing/test/e2e/helpers"
)

// TestChannelClusterDefaulter tests a cluster defaulted channel can be created with the template specified through configmap.
func TestChannelClusterDefaulter(t *testing.T) {
helpers.ChannelClusterDefaulterTestHelper(t, channelTestRunner)
}

// TestChannelNamespaceDefaulter tests a namespace defaulted channel can be created with the template specified through configmap.
func TestChannelNamespaceDefaulter(t *testing.T) {
helpers.ChannelNamespaceDefaulterTestHelper(t, channelTestRunner)
}
180 changes: 180 additions & 0 deletions test/e2e/helpers/channel_defaulter_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helpers

import (
"fmt"
"testing"

"github.com/ghodss/yaml"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/defaultchannel"
eventingtesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/eventing/test/base"
"knative.dev/eventing/test/base/resources"
"knative.dev/eventing/test/common"
)

const (
// configMapName is the name of the ConfigMap that contains the configuration for the default
// channel CRD.
configMapName = defaultchannel.ConfigMapName

// channelDefaulterKey is the key in the ConfigMap to get the name of the default
// Channel CRD.
channelDefaulterKey = defaultchannel.ChannelDefaulterKey
)

// ChannelClusterDefaulterTestHelper is the helper function for channel_defaulter_test
func ChannelClusterDefaulterTestHelper(t *testing.T, channelTestRunner common.ChannelTestRunner) {
channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) {
// these tests cannot be run in parallel as they have cluster-wide impact
client := common.Setup(st, false)
defer common.TearDown(client)

if err := updateDefaultChannelCM(client, func(conf *defaultchannel.Config) {
setClusterDefaultChannel(conf, channel)
}); err != nil {
st.Fatalf("Failed to update the defaultchannel configmap: %v", err)
}

defaultChannelTestHelper(st, client, channel)
})
}

// ChannelNamespaceDefaulterTestHelper is the helper function for channel_defaulter_test
func ChannelNamespaceDefaulterTestHelper(t *testing.T, channelTestRunner common.ChannelTestRunner) {
channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) {
// we cannot run these tests in parallel as the updateDefaultChannelCM function is not thread-safe
// TODO(Fredy-Z): make updateDefaultChannelCM thread-safe and run in parallel if the tests are taking too long to finish
client := common.Setup(st, false)
defer common.TearDown(client)

if err := updateDefaultChannelCM(client, func(conf *defaultchannel.Config) {
setNamespaceDefaultChannel(conf, client.Namespace, channel)
}); err != nil {
st.Fatalf("Failed to update the defaultchannel configmap: %v", err)
}

defaultChannelTestHelper(st, client, channel)
})
}

func defaultChannelTestHelper(t *testing.T, client *common.Client, expectedChannelKind string) {
channelName := "e2e-defaulter-channel"
senderName := "e2e-defaulter-sender"
subscriptionName := "e2e-defaulter-subscription"
loggerPodName := "e2e-defaulter-logger-pod"

// create channel
client.CreateChannelWithDefaultOrFail(eventingtesting.NewChannel(channelName, client.Namespace))

// create logger service as the subscriber
pod := resources.EventLoggerPod(loggerPodName)
client.CreatePodOrFail(pod, common.WithService(loggerPodName))

// create subscription to subscribe the channel, and forward the received events to the logger service
client.CreateSubscriptionOrFail(
subscriptionName,
channelName,
common.ChannelTypeMeta,
resources.WithSubscriberForSubscription(loggerPodName),
)

// wait for all test resources to be ready, so that we can start sending events
if err := client.WaitForAllTestResourcesReady(); err != nil {
t.Fatalf("Failed to get all test resources ready: %v", err)
}

// check if the defaultchannel creates exactly one underlying channel given the spec
metaResourceList := resources.NewMetaResourceList(client.Namespace, common.GetChannelTypeMeta(expectedChannelKind))
objs, err := base.GetGenericObjectList(client.Dynamic, metaResourceList, &eventingduck.SubscribableType{})
if err != nil {
t.Fatalf("Failed to list the underlying channels: %v", err)
}
if len(objs) != 1 {
t.Fatalf("The defaultchannel is expected to create 1 underlying channel, but got %d", len(objs))
}

// send fake CloudEvent to the channel
body := fmt.Sprintf("TestSingleEvent %s", uuid.NewUUID())
event := &resources.CloudEvent{
Source: senderName,
Type: resources.CloudEventDefaultType,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Encoding: resources.CloudEventDefaultEncoding,
}

if err := client.SendFakeEventToAddressable(senderName, channelName, common.ChannelTypeMeta, event); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the channel %q", channelName)
}

// verify the logger service receives the event
if err := client.CheckLog(loggerPodName, common.CheckerContains(body)); err != nil {
t.Fatalf("String %q not found in logs of logger pod %q: %v", body, loggerPodName, err)
}
}

// updateDefaultChannelCM will update the default channel configmap
func updateDefaultChannelCM(client *common.Client, updateConfig func(config *defaultchannel.Config)) error {
systemNamespace := resources.SystemNamespace
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to do all these programmatic reading and updating of the configMap? and then call the defaulter UpdateConfigMap? Can't you just create a ConfigMap object in memory and set the appropriate value to a key in the Data field?
No need to do it now, but maybe when you add the thread-safety thing...

Copy link
Copy Markdown
Contributor Author

@chizhg chizhg Sep 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can do that, but I think we still cannot get rid of the yaml.Unmarshal and yaml.Marshal stuff, since it's not safe to directly change yaml, even in test.

cmInterface := client.Kube.Kube.CoreV1().ConfigMaps(systemNamespace)
// get the defaultchannel configmap
configMap, err := cmInterface.Get(configMapName, metav1.GetOptions{})
if err != nil {
return err
}
// get the defaultchannel config value
defaultChannelConfig, hasDefault := configMap.Data[channelDefaulterKey]
config := &defaultchannel.Config{}
if hasDefault {
if err := yaml.Unmarshal([]byte(defaultChannelConfig), config); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look at the channelDefaulter.UpdateConfigMap... we are converting the yaml to json, and then doing some update there... I think there was a problem with the casing of the keys otherwise...
BTW, is that something we can call, instead of duplicating the logic here? If not, no worries...

Copy link
Copy Markdown
Contributor Author

@chizhg chizhg Sep 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have to update the configmap ref first, then call the UpdateConfigMap method, and seems we don't have existed code doing the former.
And I feel the channelDefaulter is more for internal implementation, the only interesting thing to end user is the configmap itself.
So I didn't change anything here..

return err
}
}

// update the defaultchannel config
updateConfig(config)
configBytes, err := yaml.Marshal(*config)
if err != nil {
return err
}
// update the defaultchannel configmap
configMap.Data[channelDefaulterKey] = string(configBytes)
_, err = cmInterface.Update(configMap)
return nil
}

// setClusterDefaultChannel will set the default channel for cluster-wide
func setClusterDefaultChannel(config *defaultchannel.Config, channel string) {
config.ClusterDefault.TypeMeta = *common.GetChannelTypeMeta(channel)
}

// setNamespaceDefaultChannel will set the default channel for namespace-wide
func setNamespaceDefaultChannel(config *defaultchannel.Config, namespace, channel string) {
namespaceDefaults := config.NamespaceDefaults
if spec, exists := namespaceDefaults[namespace]; exists {
spec.TypeMeta = *common.GetChannelTypeMeta(channel)
} else {
spec = &eventingduck.ChannelTemplateSpec{
TypeMeta: *common.GetChannelTypeMeta(channel),
}
namespaceDefaults[namespace] = spec
}
}
2 changes: 1 addition & 1 deletion test/performance/latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

func TestLatencyForInMemoryBrokerTrigger(t *testing.T) {
testLatencyForBrokerTrigger(t, common.InMemoryChannelTypeMeta)
testLatencyForBrokerTrigger(t, common.GetChannelTypeMeta(resources.InMemoryChannelKind))
}

func testLatencyForBrokerTrigger(t *testing.T, channelTypeMeta *metav1.TypeMeta) {
Expand Down