From 27e8ea4dc387c19694ae83e14ed7bcf0e48a9048 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Fri, 9 Nov 2018 01:23:02 -0800 Subject: [PATCH] Control plane services for Kafka Channel Provisioner Co-authored-by: Matthias Wessendorf --- .../provisioners/kafka/kafka-provisioner.yaml | 6 ++ .../kafka/controller/channel/provider.go | 20 ++++- .../kafka/controller/channel/reconcile.go | 83 +++++++++---------- .../controller/channel/reconcile_test.go | 54 ++++++++++++ pkg/provisioners/kafka/controller/provider.go | 12 ++- .../kafka/controller/reconcile.go | 23 ++++- pkg/provisioners/kafka/main.go | 15 ++-- 7 files changed, 157 insertions(+), 56 deletions(-) diff --git a/config/provisioners/kafka/kafka-provisioner.yaml b/config/provisioners/kafka/kafka-provisioner.yaml index 746e39ab341..e19833d5001 100644 --- a/config/provisioners/kafka/kafka-provisioner.yaml +++ b/config/provisioners/kafka/kafka-provisioner.yaml @@ -34,6 +34,12 @@ rules: - apiGroups: ["eventing.knative.dev"] resources: ["clusterchannelprovisioners", "channels"] verbs: ["get", "watch", "list", "update"] +- apiGroups: [""] + resources: ["services"] + verbs: ["get", "watch", "list", "create"] +- apiGroups: ["networking.istio.io"] + resources: ["virtualservices"] + verbs: ["get", "watch", "list", "create"] --- apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/pkg/provisioners/kafka/controller/channel/provider.go b/pkg/provisioners/kafka/controller/channel/provider.go index b386576ee9f..be24497f1cb 100644 --- a/pkg/provisioners/kafka/controller/channel/provider.go +++ b/pkg/provisioners/kafka/controller/channel/provider.go @@ -18,7 +18,9 @@ package channel import ( "github.com/Shopify/sarama" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -27,7 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" common "github.com/knative/eventing/pkg/provisioners/kafka/controller" ) @@ -65,7 +67,21 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi } // Watch Channel events and enqueue Channel object key. - if err := c.Watch(&source.Kind{Type: &v1alpha1.Channel{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(&source.Kind{Type: &eventingv1alpha1.Channel{}}, &handler.EnqueueRequestForObject{}); err != nil { + return nil, err + } + + // Watch the K8s Services that are owned by Channels. + err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) + if err != nil { + logger.Error("unable to watch K8s Services.", zap.Error(err)) + return nil, err + } + + // Watch the VirtualServices that are owned by Channels. + err = c.Watch(&source.Kind{Type: &istiov1alpha3.VirtualService{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true}) + if err != nil { + logger.Error("unable to watch VirtualServices.", zap.Error(err)) return nil, err } diff --git a/pkg/provisioners/kafka/controller/channel/reconcile.go b/pkg/provisioners/kafka/controller/channel/reconcile.go index dc5248dfcdf..2dd03cf53c5 100644 --- a/pkg/provisioners/kafka/controller/channel/reconcile.go +++ b/pkg/provisioners/kafka/controller/channel/reconcile.go @@ -22,15 +22,16 @@ import ( "fmt" "github.com/Shopify/sarama" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/util/sets" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingController "github.com/knative/eventing/pkg/controller" + util "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/provisioners/kafka/controller" ) @@ -41,7 +42,7 @@ const ( ) type channelArgs struct { - NumPartitions int32 `json:"NumPartitions,omitempty"` + NumPartitions int32 } // Reconcile compares the actual state with the desired, and attempts to @@ -88,13 +89,13 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err if clusterChannelProvisioner.Status.IsReady() { // Reconcile this copy of the Channel and then write back any status // updates regardless of whether the reconcile error out. - err = r.reconcile(newChannel) + err = r.reconcile(ctx, newChannel) } else { newChannel.Status.MarkNotProvisioned("NotProvisioned", "ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) err = fmt.Errorf("ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) } - if updateChannelErr := r.updateChannel(ctx, newChannel); updateChannelErr != nil { + if updateChannelErr := util.UpdateChannel(ctx, r.client, newChannel); updateChannelErr != nil { r.logger.Info("failed to update channel status", zap.Error(updateChannelErr)) return reconcile.Result{}, updateChannelErr } @@ -103,7 +104,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, err } -func (r *reconciler) reconcile(channel *v1alpha1.Channel) error { +func (r *reconciler) reconcile(ctx context.Context, channel *v1alpha1.Channel) error { // We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time. // This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162. @@ -131,16 +132,44 @@ func (r *reconciler) reconcile(channel *v1alpha1.Channel) error { if err := r.deprovisionChannel(channel, kafkaClusterAdmin); err != nil { return err } - r.removeFinalizer(channel) + util.RemoveFinalizer(channel, finalizerName) return nil } - r.addFinalizer(channel) + util.AddFinalizer(channel, finalizerName) if err := r.provisionChannel(channel, kafkaClusterAdmin); err != nil { channel.Status.MarkNotProvisioned("NotProvisioned", "error while provisioning: %s", err) return err } + + svc, err := util.CreateK8sService(ctx, r.client, channel) + + if err != nil { + r.logger.Info("error creating the Channel's K8s Service", zap.Error(err)) + return err + } + + // Check if this Channel is the owner of the K8s service. + if !metav1.IsControlledBy(svc, channel) { + r.logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", channel), zap.Any("service", svc)) + } + + channel.Status.SetAddress(eventingController.ServiceHostName(svc.Name, svc.Namespace)) + + virtualService, err := util.CreateVirtualService(ctx, r.client, channel) + + if err != nil { + r.logger.Info("error creating the Virtual Service for the Channel", zap.Error(err)) + return err + } + + // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't + // consider it an error. + if !metav1.IsControlledBy(virtualService, channel) { + r.logger.Warn("VirtualService not owned by Channel", zap.Any("channel", channel), zap.Any("virtualService", virtualService)) + } + channel.Status.MarkProvisioned() // close the connection @@ -207,42 +236,6 @@ func (r *reconciler) getClusterChannelProvisioner() (*v1alpha1.ClusterChannelPro return clusterChannelProvisioner, nil } -func (r *reconciler) updateChannel(ctx context.Context, u *v1alpha1.Channel) error { - channel := &v1alpha1.Channel{} - err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel) - if err != nil { - return err - } - - updated := false - if !equality.Semantic.DeepEqual(channel.Finalizers, u.Finalizers) { - channel.SetFinalizers(u.ObjectMeta.Finalizers) - updated = true - } - - if !equality.Semantic.DeepEqual(channel.Status, u.Status) { - channel.Status = u.Status - updated = true - } - - if updated == false { - return nil - } - return r.client.Update(ctx, channel) -} - -func (r *reconciler) addFinalizer(channel *v1alpha1.Channel) { - finalizers := sets.NewString(channel.Finalizers...) - finalizers.Insert(finalizerName) - channel.Finalizers = finalizers.List() -} - -func (r *reconciler) removeFinalizer(channel *v1alpha1.Channel) { - finalizers := sets.NewString(channel.Finalizers...) - finalizers.Delete(finalizerName) - channel.Finalizers = finalizers.List() -} - func createKafkaAdminClient(config *controller.KafkaProvisionerConfig) (sarama.ClusterAdmin, error) { saramaConf := sarama.NewConfig() saramaConf.Version = sarama.V1_1_0_0 diff --git a/pkg/provisioners/kafka/controller/channel/reconcile_test.go b/pkg/provisioners/kafka/controller/channel/reconcile_test.go index 6becf1e1cbe..fd2422f87ab 100644 --- a/pkg/provisioners/kafka/controller/channel/reconcile_test.go +++ b/pkg/provisioners/kafka/controller/channel/reconcile_test.go @@ -25,6 +25,7 @@ import ( "github.com/Shopify/sarama" "github.com/google/go-cmp/cmp" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -36,6 +37,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" "github.com/knative/eventing/pkg/provisioners" + util "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/provisioners/kafka/controller" ) @@ -43,16 +45,20 @@ const ( channelName = "test-channel" clusterChannelProvisionerName = "kafka-channel" testNS = "test-namespace" + testUID = "test-uid" argumentNumPartitions = "NumPartitions" ) var ( + truePointer = true + deletedTs = metav1.Now().Rfc3339Copy() ) func init() { // Add types to scheme eventingv1alpha1.AddToScheme(scheme.Scheme) + istiov1alpha3.AddToScheme(scheme.Scheme) } var mockFetchError = controllertesting.Mocks{ @@ -124,6 +130,7 @@ var testCases = []controllertesting.TestCase{ InitialState: []runtime.Object{ getNewClusterChannelProvisioner(clusterChannelProvisionerName, true), getNewChannel(channelName, clusterChannelProvisionerName), + makeVirtualService(), }, ReconcileKey: fmt.Sprintf("%s/%s", testNS, channelName), WantResult: reconcile.Result{}, @@ -425,6 +432,7 @@ func getNewChannelWithArgs(name string, args map[string]interface{}) *eventingv1 func getNewChannelProvisionedStatus(name, provisioner string) *eventingv1alpha1.Channel { c := getNewChannel(name, provisioner) c.Status.InitializeConditions() + c.Status.SetAddress(fmt.Sprintf("%s-channel.%s.svc.cluster.local", c.Name, c.Namespace)) c.Status.MarkProvisioned() c.Finalizers = []string{finalizerName} return c @@ -478,6 +486,52 @@ func getNewClusterChannelProvisioner(name string, isReady bool) *eventingv1alpha return clusterChannelProvisioner } +func makeVirtualService() *istiov1alpha3.VirtualService { + return &istiov1alpha3.VirtualService{ + TypeMeta: metav1.TypeMeta{ + APIVersion: istiov1alpha3.SchemeGroupVersion.String(), + Kind: "VirtualService", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-channel", testNS), + Namespace: testNS, + Labels: map[string]string{ + "channel": channelName, + "provisioner": clusterChannelProvisionerName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: channelName, + UID: testUID, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + }, + Spec: istiov1alpha3.VirtualServiceSpec{ + Hosts: []string{ + fmt.Sprintf("%s-channel.%s.svc.cluster.local", channelName, testNS), + fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS), + }, + Http: []istiov1alpha3.HTTPRoute{{ + Rewrite: &istiov1alpha3.HTTPRewrite{ + Authority: fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS), + }, + Route: []istiov1alpha3.DestinationWeight{{ + Destination: istiov1alpha3.Destination{ + Host: "kafka-provisioner.knative-eventing.svc.cluster.local", + Port: istiov1alpha3.PortSelector{ + Number: util.PortNumber, + }, + }}, + }}, + }, + }, + } +} + func om(namespace, name string) metav1.ObjectMeta { return metav1.ObjectMeta{ Namespace: namespace, diff --git a/pkg/provisioners/kafka/controller/provider.go b/pkg/provisioners/kafka/controller/provider.go index 59c0ec94697..24885ee585e 100644 --- a/pkg/provisioners/kafka/controller/provider.go +++ b/pkg/provisioners/kafka/controller/provider.go @@ -17,8 +17,8 @@ limitations under the License. package controller import ( - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -26,6 +26,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" ) const ( @@ -63,6 +66,13 @@ func ProvideController(mgr manager.Manager, config *KafkaProvisionerConfig, logg return nil, err } + // Watch the K8s Services that are owned by ClusterChannelProvisioners. + err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.ClusterChannelProvisioner{}, IsController: true}) + if err != nil { + logger.Error("unable to watch K8s Services.", zap.Error(err)) + return nil, err + } + return c, nil } diff --git a/pkg/provisioners/kafka/controller/reconcile.go b/pkg/provisioners/kafka/controller/reconcile.go index dd5d84b47a0..ff7be5c9fda 100644 --- a/pkg/provisioners/kafka/controller/reconcile.go +++ b/pkg/provisioners/kafka/controller/reconcile.go @@ -23,6 +23,8 @@ import ( "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" @@ -63,7 +65,12 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // Reconcile this copy of the Provisioner and then write back any status // updates regardless of whether the reconcile error out. - err = r.reconcile(newProvisioner) + err = r.reconcile(ctx, newProvisioner) + if err != nil { + r.logger.Info("error reconciling ClusterProvisioner", zap.Error(err)) + // Note that we do not return the error here, because we want to update the Status + // regardless of the error. + } if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, newProvisioner); updateStatusErr != nil { r.logger.Info("error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr)) return reconcile.Result{}, updateStatusErr @@ -73,7 +80,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, err } -func (r *reconciler) reconcile(provisioner *v1alpha1.ClusterChannelProvisioner) error { +func (r *reconciler) reconcile(ctx context.Context, provisioner *v1alpha1.ClusterChannelProvisioner) error { // See if the provisioner has been deleted accessor, err := meta.Accessor(provisioner) if err != nil { @@ -87,6 +94,18 @@ func (r *reconciler) reconcile(provisioner *v1alpha1.ClusterChannelProvisioner) } provisioner.Status.InitializeConditions() + + svc, err := util.CreateDispatcherService(ctx, r.client, provisioner) + if err != nil { + r.logger.Info("error creating the ClusterProvisioner's K8s Service", zap.Error(err)) + return err + } + + // Check if this ClusterChannelProvisioner is the owner of the K8s service. + if !metav1.IsControlledBy(svc, provisioner) { + r.logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterChannelProvisioner", zap.Any("clusterChannelProvisioner", provisioner), zap.Any("service", svc)) + } + // Update Status as Ready provisioner.Status.MarkReady() diff --git a/pkg/provisioners/kafka/main.go b/pkg/provisioners/kafka/main.go index 65b97003b68..eb779abf630 100644 --- a/pkg/provisioners/kafka/main.go +++ b/pkg/provisioners/kafka/main.go @@ -6,11 +6,7 @@ import ( "os" "strings" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/provisioners" - provisionerController "github.com/knative/eventing/pkg/provisioners/kafka/controller" - "github.com/knative/eventing/pkg/provisioners/kafka/controller/channel" - "github.com/knative/pkg/configmap" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -19,6 +15,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" + + eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners" + provisionerController "github.com/knative/eventing/pkg/provisioners/kafka/controller" + "github.com/knative/eventing/pkg/provisioners/kafka/controller/channel" + "github.com/knative/pkg/configmap" ) const ( @@ -47,7 +49,8 @@ func main() { // Add custom types to this array to get them into the manager's scheme. schemeFuncs := []SchemeFunc{ - v1alpha1.AddToScheme, + eventingv1alpha.AddToScheme, + istiov1alpha3.AddToScheme, } for _, schemeFunc := range schemeFuncs { schemeFunc(mgr.GetScheme())