diff --git a/pkg/controller/flow/reconcile.go b/pkg/controller/flow/reconcile.go index 106b0b7b753..e542588f5db 100644 --- a/pkg/controller/flow/reconcile.go +++ b/pkg/controller/flow/reconcile.go @@ -25,6 +25,7 @@ import ( channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller/flow/resources" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -222,19 +223,7 @@ func (r *reconciler) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Ch } func (r *reconciler) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { - channelName := flow.Name - channel := &channelsv1alpha1.Channel{ - ObjectMeta: metav1.ObjectMeta{ - Name: channelName, - Namespace: flow.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *r.NewControllerRef(flow), - }, - }, - Spec: channelsv1alpha1.ChannelSpec{ - ClusterBus: defaultBusName, - }, - } + channel := resources.MakeChannel(defaultBusName, flow) if err := r.client.Create(context.TODO(), channel); err != nil { return nil, err } @@ -263,20 +252,7 @@ func (r *reconciler) reconcileSubscription(channelName string, target string, fl } func (r *reconciler) createSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { - subscriptionName := flow.Name - subscription := &channelsv1alpha1.Subscription{ - ObjectMeta: metav1.ObjectMeta{ - Name: subscriptionName, - Namespace: flow.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *r.NewControllerRef(flow), - }, - }, - Spec: channelsv1alpha1.SubscriptionSpec{ - Channel: channelName, - Subscriber: target, - }, - } + subscription := resources.MakeSubscription(channelName, target, flow) if err := r.client.Create(context.TODO(), subscription); err != nil { return nil, err } @@ -307,46 +283,9 @@ func (r *reconciler) reconcileFeed(channelDNS string, flow *v1alpha1.Flow) (*fee } func (r *reconciler) createFeed(channelDNS string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { - feedName := flow.Name - feed := &feedsv1alpha1.Feed{ - ObjectMeta: metav1.ObjectMeta{ - Name: feedName, - Namespace: flow.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *r.NewControllerRef(flow), - }, - }, - Spec: feedsv1alpha1.FeedSpec{ - Action: feedsv1alpha1.FeedAction{DNSName: channelDNS}, - Trigger: feedsv1alpha1.EventTrigger{ - EventType: flow.Spec.Trigger.EventType, - Resource: flow.Spec.Trigger.Resource, - Service: flow.Spec.Trigger.Service, - }, - }, - } - if flow.Spec.ServiceAccountName != "" { - feed.Spec.ServiceAccountName = flow.Spec.ServiceAccountName - } - - if flow.Spec.Trigger.Parameters != nil { - feed.Spec.Trigger.Parameters = flow.Spec.Trigger.Parameters - } - if flow.Spec.Trigger.ParametersFrom != nil { - feed.Spec.Trigger.ParametersFrom = flow.Spec.Trigger.ParametersFrom - } - + feed := resources.MakeFeed(channelDNS, flow) if err := r.client.Create(context.TODO(), feed); err != nil { return nil, err } return feed, nil } - -func (r *reconciler) NewControllerRef(flow *v1alpha1.Flow) *metav1.OwnerReference { - blockOwnerDeletion := false - isController := true - revRef := metav1.NewControllerRef(flow, flowControllerKind) - revRef.BlockOwnerDeletion = &blockOwnerDeletion - revRef.Controller = &isController - return revRef -} diff --git a/pkg/controller/flow/resources/channel.go b/pkg/controller/flow/resources/channel.go new file mode 100644 index 00000000000..7a6479df378 --- /dev/null +++ b/pkg/controller/flow/resources/channel.go @@ -0,0 +1,42 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeChannel(defaultBusName string, flow *v1alpha1.Flow) *channelsv1alpha1.Channel { + channelName := flow.Name + channel := &channelsv1alpha1.Channel{ + ObjectMeta: metav1.ObjectMeta{ + Name: channelName, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(flow), + }, + }, + Spec: channelsv1alpha1.ChannelSpec{ + ClusterBus: defaultBusName, + }, + } + return channel +} diff --git a/pkg/controller/flow/resources/feed.go b/pkg/controller/flow/resources/feed.go new file mode 100644 index 00000000000..a6e764424f1 --- /dev/null +++ b/pkg/controller/flow/resources/feed.go @@ -0,0 +1,56 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + + "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeFeed(channelDNS string, flow *v1alpha1.Flow) *feedsv1alpha1.Feed { + feed := &feedsv1alpha1.Feed{ + ObjectMeta: metav1.ObjectMeta{ + Name: flow.Name, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(flow), + }, + }, + Spec: feedsv1alpha1.FeedSpec{ + Action: feedsv1alpha1.FeedAction{DNSName: channelDNS}, + Trigger: feedsv1alpha1.EventTrigger{ + EventType: flow.Spec.Trigger.EventType, + Resource: flow.Spec.Trigger.Resource, + Service: flow.Spec.Trigger.Service, + }, + }, + } + if flow.Spec.ServiceAccountName != "" { + feed.Spec.ServiceAccountName = flow.Spec.ServiceAccountName + } + + if flow.Spec.Trigger.Parameters != nil { + feed.Spec.Trigger.Parameters = flow.Spec.Trigger.Parameters + } + if flow.Spec.Trigger.ParametersFrom != nil { + feed.Spec.Trigger.ParametersFrom = flow.Spec.Trigger.ParametersFrom + } + return feed +} diff --git a/pkg/controller/flow/resources/subscription.go b/pkg/controller/flow/resources/subscription.go new file mode 100644 index 00000000000..9885f46f78b --- /dev/null +++ b/pkg/controller/flow/resources/subscription.go @@ -0,0 +1,43 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeSubscription(channelName string, target string, flow *v1alpha1.Flow) *channelsv1alpha1.Subscription { + subscriptionName := flow.Name + subscription := &channelsv1alpha1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subscriptionName, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(flow), + }, + }, + Spec: channelsv1alpha1.SubscriptionSpec{ + Channel: channelName, + Subscriber: target, + }, + } + return subscription +} diff --git a/pkg/controller/owner_references.go b/pkg/controller/owner_references.go new file mode 100644 index 00000000000..9ade5292daf --- /dev/null +++ b/pkg/controller/owner_references.go @@ -0,0 +1,65 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + + channelsv1alpha "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + feedsv1alpha "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + flowsv1alpha "github.com/knative/eventing/pkg/apis/flows/v1alpha1" +) + +func kind(obj metav1.Object) schema.GroupVersionKind { + switch obj.(type) { + // Channels + case *channelsv1alpha.Bus: + return channelsv1alpha.SchemeGroupVersion.WithKind("Bus") + case *channelsv1alpha.Channel: + return channelsv1alpha.SchemeGroupVersion.WithKind("Channel") + case *channelsv1alpha.ClusterBus: + return channelsv1alpha.SchemeGroupVersion.WithKind("ClusterBus") + + // Feeds + case *feedsv1alpha.ClusterEventType: + return feedsv1alpha.SchemeGroupVersion.WithKind("ClusterEventType") + case *feedsv1alpha.ClusterEventSource: + return feedsv1alpha.SchemeGroupVersion.WithKind("ClusterEventSource") + case *feedsv1alpha.EventType: + return feedsv1alpha.SchemeGroupVersion.WithKind("EventType") + case *feedsv1alpha.EventSource: + return feedsv1alpha.SchemeGroupVersion.WithKind("EventSource") + + // Flows + case *flowsv1alpha.Flow: + return flowsv1alpha.SchemeGroupVersion.WithKind("Flow") + + default: + panic(fmt.Sprintf("Unsupported object type %T", obj)) + } +} + +// NewControllerRef creates an OwnerReference pointing to the given Resource. +func NewControllerRef(obj metav1.Object) *metav1.OwnerReference { + blockOwnerDeletion := false + ref := metav1.NewControllerRef(obj, kind(obj)) + ref.BlockOwnerDeletion = &blockOwnerDeletion + return ref +}