diff --git a/config/400-flow-controller-config.yaml b/config/400-flow-controller-config.yaml new file mode 100644 index 00000000000..a3b3e56e0e4 --- /dev/null +++ b/config/400-flow-controller-config.yaml @@ -0,0 +1,21 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: v1 +kind: ConfigMap +metadata: + name: flow-controller-config + namespace: knative-eventing +data: + # Configuration for the flow controller + default-cluster-bus: stub diff --git a/pkg/controller/flow/provider.go b/pkg/controller/flow/provider.go index 23bf71edce6..a09e931e4a5 100644 --- a/pkg/controller/flow/provider.go +++ b/pkg/controller/flow/provider.go @@ -30,7 +30,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) -const controllerAgentName = "flow-controller" +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "flow-controller" +) type reconciler struct { client client.Client diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index d24dd2027b9..34547f63db3 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -26,6 +26,7 @@ import ( feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" "github.com/knative/eventing/pkg/controller/flow/resources" + "github.com/knative/eventing/pkg/system" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -223,7 +224,12 @@ func (r *reconciler) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Ch } func (r *reconciler) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { - channel := resources.MakeChannel(defaultBusName, flow) + clusterBusName, err := r.getDefaultClusterBusName() + if err != nil { + return nil, err + } + + channel := resources.MakeChannel(clusterBusName, flow) if err := r.client.Create(context.TODO(), channel); err != nil { return nil, err } @@ -289,3 +295,50 @@ func (r *reconciler) createFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv } return feed, nil } + +func (r *reconciler) NewControllerRef(flow *v1alpha1.Flow) *metav1.OwnerReference { + blockOwnerDeletion := false + isController := true + revRef := metav1.NewControllerRef(flow, flowControllerKind) + revRef.BlockOwnerDeletion = &blockOwnerDeletion + revRef.Controller = &isController + return revRef +} + +const ( + // controllerConfigMapName is the name of the configmap in the eventing + // namespace that holds the configuration for this controller. + controllerConfigMapName = "flow-controller-config" + + // defaultClusterBusConfigMapKey is the name of the key in this controller's + // ConfigMap that contains the name of the default cluster bus for the flow + // controller to use. + defaultClusterBusConfigMapKey = "default-cluster-bus" + + // fallbackClusterBusName is the name of the cluster bus that will be used + // for flows if the controller's configmap does not exist or does not + // contain the 'default-cluster-bus' key. + fallbackClusterBusName = "stub" +) + +// getDefaultClusterBusName returns the value of the 'default-cluster-bus' key in +// the knative-system/flow-controller-config configmap or an error. If the +// 'default-cluster-bus' key is not set, it returns the default value "stub". +func (r *reconciler) getDefaultClusterBusName() (string, error) { + configMapKey := client.ObjectKey{ + Namespace: system.Namespace, + Name: controllerConfigMapName, + } + + configMap := &corev1.ConfigMap{} + if err := r.client.Get(context.TODO(), configMapKey, configMap); err != nil { + // return the fallback value if there's an error loading the configmap + return fallbackClusterBusName, nil + } + + if value, ok := configMap.Data[defaultClusterBusConfigMapKey]; ok { + return value, nil + } + + return fallbackClusterBusName, nil // return the fallback value +} diff --git a/pkg/controller/flow/reconcile_test.go b/pkg/controller/flow/reconcile_test.go index e500fb0fc32..dffbc6dff56 100644 --- a/pkg/controller/flow/reconcile_test.go +++ b/pkg/controller/flow/reconcile_test.go @@ -24,6 +24,7 @@ import ( feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" flowsv1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" + "github.com/knative/eventing/pkg/system" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -59,6 +60,24 @@ func init() { var testCases = []controllertesting.TestCase{ { Name: "new flow: adds status, action target resolved", + InitialState: []runtime.Object{ + getNewFlow(), + getFlowControllerConfigMap(), + }, + ReconcileKey: "test/test-flow", + WantResult: reconcile.Result{}, + WantPresent: []runtime.Object{ + getActionTargetResolvedFlow(), + func() *channelsv1alpha1.Channel { + c := getNewChannel() + c.Spec.ClusterBus = "special-bus" + return c + }(), + getNewSubscription(), + }, + }, + { + Name: "new flow: adds status, action target resolved, no flow controller config map, use default 'stub' bus", InitialState: []runtime.Object{ getNewFlow(), }, @@ -169,6 +188,15 @@ func getNewFeed() *feedsv1alpha1.Feed { } } +func getFlowControllerConfigMap() *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: om(system.Namespace, controllerConfigMapName), + Data: map[string]string{ + defaultClusterBusConfigMapKey: "special-bus", + }, + } +} + func flowType() metav1.TypeMeta { return metav1.TypeMeta{ APIVersion: flowsv1alpha1.SchemeGroupVersion.String(), @@ -197,6 +225,13 @@ func subscriptionType() metav1.TypeMeta { } } +func configMapType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "ConfigMap", + } +} + func om(namespace, name string) metav1.ObjectMeta { return metav1.ObjectMeta{ Namespace: namespace,