From ec5f434ea4c3fbc6da0ecf094740989bf1af9b53 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Wed, 24 Oct 2018 14:03:02 -0700 Subject: [PATCH 1/7] Add channel utils --- .../eventing/inmemory/channel/reconcile.go | 206 ++---------------- .../inmemory/channel/reconcile_test.go | 18 +- pkg/provisioners/channel_util.go | 205 +++++++++++++++++ 3 files changed, 228 insertions(+), 201 deletions(-) create mode 100644 pkg/provisioners/channel_util.go diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 439361e8978..9a9de170fad 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -19,30 +19,25 @@ package channel import ( "context" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/controller" - cpcontroller "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterprovisioner" - "github.com/knative/eventing/pkg/sidecar/configmap" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - "github.com/knative/eventing/pkg/system" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + cpcontroller "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterprovisioner" + util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" ) const ( - portName = "http" - portNumber = 80 finalizerName = controllerAgentName ) @@ -100,7 +95,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // regardless of the error. } - if updateStatusErr := r.updateChannel(ctx, c); updateStatusErr != nil { + if updateStatusErr := util.UpdateChannel(ctx, r.client, c); updateStatusErr != nil { logger.Info("Error updating Channel Status", zap.Error(updateStatusErr)) return reconcile.Result{}, updateStatusErr } @@ -136,21 +131,21 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) if c.DeletionTimestamp != nil { // K8s garbage collection will delete the K8s service and VirtualService for this channel. // We use a finalizer to ensure the channel config has been synced. - r.removeFinalizer(c) + util.RemoveFinalizer(c, finalizerName) return nil } - r.addFinalizer(c) + util.AddFinalizer(c, finalizerName) c.Status.SetSubscribable(c.Namespace, c.Name) - if svc, err := r.createK8sService(ctx, c); err != nil { + if svc, err := util.CreateK8sService(ctx, r.client, c); err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) return err } else { c.Status.SetSinkable(controller.ServiceHostName(svc.Name, svc.Namespace)) } - if err := r.createVirtualService(ctx, c); err != nil { + if err := util.CreateVirtualService(ctx, r.client, c); err != nil { logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) return err } @@ -159,181 +154,6 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) return nil } -func (r *reconciler) addFinalizer(c *eventingv1alpha1.Channel) { - finalizers := sets.NewString(c.Finalizers...) - finalizers.Insert(finalizerName) - c.Finalizers = finalizers.List() -} - -func (r *reconciler) removeFinalizer(c *eventingv1alpha1.Channel) { - finalizers := sets.NewString(c.Finalizers...) - finalizers.Delete(finalizerName) - c.Finalizers = finalizers.List() -} - -func (r *reconciler) getK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) { - svcKey := types.NamespacedName{ - Namespace: c.Namespace, - Name: controller.ChannelServiceName(c.Name), - } - svc := &corev1.Service{} - err := r.client.Get(ctx, svcKey, svc) - return svc, err -} - -func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) { - svc, err := r.getK8sService(ctx, c) - - if errors.IsNotFound(err) { - svc = newK8sService(c) - err = r.client.Create(ctx, svc) - } - - // If an error occurred in either Get or Create, we need to reconcile again. - if err != nil { - return nil, err - } - - // Check if this Channel is the owner of the K8s service. - if !metav1.IsControlledBy(svc, c) { - r.logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc)) - } - return svc, nil -} - -func (r *reconciler) getVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { - vsk := client.ObjectKey{ - Namespace: c.Namespace, - Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), - } - vs := &istiov1alpha3.VirtualService{} - err := r.client.Get(ctx, vsk, vs) - return vs, err -} - -func (r *reconciler) createVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) error { - virtualService, err := r.getVirtualService(ctx, c) - - // If the resource doesn't exist, we'll create it - if errors.IsNotFound(err) { - virtualService = newVirtualService(c) - err = r.client.Create(ctx, virtualService) - } - - // If an error occurs during Get/Create, we'll requeue the item so we can - // attempt processing again later. This could have been caused by a - // temporary network failure, or any other transient reason. - if err != nil { - return err - } - - // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't - // consider it an error. - if !metav1.IsControlledBy(virtualService, c) { - r.logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) - } - return nil -} - -// newK8sService creates a new Service for a Channel resource. It also sets the appropriate -// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it. -// As well as being garbage collected when the Channel is deleted. -func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { - labels := map[string]string{ - "channel": c.Name, - "provisioner": c.Spec.Provisioner.Ref.Name, - } - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: controller.ChannelServiceName(c.ObjectMeta.Name), - Namespace: c.Namespace, - Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(c, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "Channel", - }), - }, - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Name: portName, - Port: portNumber, - }, - }, - }, - } -} - -// newVirtualService creates a new VirtualService for a Channel resource. It also sets the -// appropriate OwnerReferences on the resource so handleObject can discover the Channel resource -// that 'owns' it. As well as being garbage collected when the Channel is deleted. -func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.VirtualService { - labels := map[string]string{ - "channel": channel.Name, - "provisioner": channel.Spec.Provisioner.Ref.Name, - } - destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Ref.Name), system.Namespace) - return &istiov1alpha3.VirtualService{ - ObjectMeta: metav1.ObjectMeta{ - Name: controller.ChannelVirtualServiceName(channel.Name), - Namespace: channel.Namespace, - Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(channel, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "Channel", - }), - }, - }, - Spec: istiov1alpha3.VirtualServiceSpec{ - Hosts: []string{ - controller.ServiceHostName(controller.ChannelServiceName(channel.Name), channel.Namespace), - controller.ChannelHostName(channel.Name, channel.Namespace), - }, - Http: []istiov1alpha3.HTTPRoute{{ - Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: controller.ChannelHostName(channel.Name, channel.Namespace), - }, - Route: []istiov1alpha3.DestinationWeight{{ - Destination: istiov1alpha3.Destination{ - Host: destinationHost, - Port: istiov1alpha3.PortSelector{ - Number: portNumber, - }, - }}, - }}, - }, - }, - } -} - -func (r *reconciler) updateChannel(ctx context.Context, u *eventingv1alpha1.Channel) error { - o := &eventingv1alpha1.Channel{} - if err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { - r.logger.Info("Error getting Channel for status update", zap.Error(err), zap.Any("updatedChannel", u)) - return err - } - - updated := false - if !equality.Semantic.DeepEqual(o.Finalizers, u.Finalizers) { - updated = true - o.SetFinalizers(u.Finalizers) - } - if !equality.Semantic.DeepEqual(o.Status, u.Status) { - updated = true - o.Status = u.Status - } - - if updated { - return r.client.Update(ctx, o) - } - return nil -} - func (r *reconciler) syncChannelConfig(ctx context.Context) error { channels, err := r.listAllChannels(ctx) if err != nil { diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 13466bad4ae..5fdb19b4d94 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -24,11 +24,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - controllertesting "github.com/knative/eventing/pkg/controller/testing" - "github.com/knative/eventing/pkg/sidecar/configmap" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" @@ -40,6 +35,13 @@ import ( "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + controllertesting "github.com/knative/eventing/pkg/controller/testing" + util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" ) const ( @@ -591,8 +593,8 @@ func makeK8sService() *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: portName, - Port: portNumber, + Name: util.PortName, + Port: util.PortNumber, }, }, }, @@ -642,7 +644,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { Destination: istiov1alpha3.Destination{ Host: "in-memory-channel-clusterbus.knative-eventing.svc.cluster.local", Port: istiov1alpha3.PortSelector{ - Number: portNumber, + Number: util.PortNumber, }, }}, }}, diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go new file mode 100644 index 00000000000..e32c3368b85 --- /dev/null +++ b/pkg/provisioners/channel_util.go @@ -0,0 +1,205 @@ +package provisioners + +import ( + "context" + + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "github.com/knative/pkg/logging" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + controllerRuntime "sigs.k8s.io/controller-runtime/pkg/client" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + "github.com/knative/eventing/pkg/system" + "k8s.io/apimachinery/pkg/api/equality" +) + +const ( + PortName = "http" + PortNumber = 80 +) + +func AddFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { + finalizers := sets.NewString(c.Finalizers...) + finalizers.Insert(finalizerName) + c.Finalizers = finalizers.List() +} + +func RemoveFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { + finalizers := sets.NewString(c.Finalizers...) + finalizers.Delete(finalizerName) + c.Finalizers = finalizers.List() +} + +func getK8sService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { + svcKey := types.NamespacedName{ + Namespace: c.Namespace, + Name: controller.ChannelServiceName(c.Name), + } + svc := &corev1.Service{} + err := client.Get(ctx, svcKey, svc) + return svc, err +} + +func CreateK8sService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { + svc, err := getK8sService(ctx, client, c) + + if errors.IsNotFound(err) { + svc = newK8sService(c) + err = client.Create(ctx, svc) + } + + // If an error occurred in either Get or Create, we need to reconcile again. + if err != nil { + return nil, err + } + + // Check if this Channel is the owner of the K8s service. + if !metav1.IsControlledBy(svc, c) { + logger := logging.FromContext(ctx) + logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc)) + } + return svc, nil +} + +func getVirtualService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { + vsk := controllerRuntime.ObjectKey{ + Namespace: c.Namespace, + Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), + } + vs := &istiov1alpha3.VirtualService{} + err := client.Get(ctx, vsk, vs) + return vs, err +} + +func CreateVirtualService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) error { + virtualService, err := getVirtualService(ctx, client, c) + + // If the resource doesn't exist, we'll create it + if errors.IsNotFound(err) { + virtualService = newVirtualService(c) + err = client.Create(ctx, virtualService) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return err + } + + // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't + // consider it an error. + if !metav1.IsControlledBy(virtualService, c) { + logger := logging.FromContext(ctx) + logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) + } + return nil +} + +func UpdateChannel(ctx context.Context, client client.Client, u *eventingv1alpha1.Channel) error { + channel := &eventingv1alpha1.Channel{} + err := client.Get(ctx, controllerRuntime.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel) + if err != nil { + return err + } + + updated := false + if !equality.Semantic.DeepEqual(channel.Finalizers, u.Finalizers) { + channel.SetFinalizers(u.ObjectMeta.Finalizers) + updated = true + } + + if !equality.Semantic.DeepEqual(channel.Status, u.Status) { + channel.Status = u.Status + updated = true + } + + if updated == false { + return nil + } + return client.Update(ctx, channel) +} + +// newK8sService creates a new Service for a Channel resource. It also sets the appropriate +// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it. +// As well as being garbage collected when the Channel is deleted. +func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { + labels := map[string]string{ + "channel": c.Name, + "provisioner": c.Spec.Provisioner.Ref.Name, + } + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ChannelServiceName(c.ObjectMeta.Name), + Namespace: c.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(c, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "Channel", + }), + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: PortName, + Port: PortNumber, + }, + }, + }, + } +} + +// newVirtualService creates a new VirtualService for a Channel resource. It also sets the +// appropriate OwnerReferences on the resource so handleObject can discover the Channel resource +// that 'owns' it. As well as being garbage collected when the Channel is deleted. +func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.VirtualService { + labels := map[string]string{ + "channel": channel.Name, + "provisioner": channel.Spec.Provisioner.Ref.Name, + } + destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Ref.Name), system.Namespace) + return &istiov1alpha3.VirtualService{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ChannelVirtualServiceName(channel.Name), + Namespace: channel.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(channel, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "Channel", + }), + }, + }, + Spec: istiov1alpha3.VirtualServiceSpec{ + Hosts: []string{ + controller.ServiceHostName(controller.ChannelServiceName(channel.Name), channel.Namespace), + controller.ChannelHostName(channel.Name, channel.Namespace), + }, + Http: []istiov1alpha3.HTTPRoute{{ + Rewrite: &istiov1alpha3.HTTPRewrite{ + Authority: controller.ChannelHostName(channel.Name, channel.Namespace), + }, + Route: []istiov1alpha3.DestinationWeight{{ + Destination: istiov1alpha3.Destination{ + Host: destinationHost, + Port: istiov1alpha3.PortSelector{ + Number: PortNumber, + }, + }}, + }}, + }, + }, + } +} From 17f26d3d7d2b858cbd49d8caebca35b87c5b9a99 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Wed, 24 Oct 2018 16:51:20 -0700 Subject: [PATCH 2/7] Add Provisioner util and fix comments --- .../inmemory/clusterprovisioner/reconcile.go | 94 +--------------- .../clusterprovisioner/reconcile_test.go | 9 +- pkg/provisioners/channel_util.go | 23 ++-- pkg/provisioners/provisioner_util.go | 100 ++++++++++++++++++ 4 files changed, 121 insertions(+), 105 deletions(-) create mode 100644 pkg/provisioners/provisioner_util.go diff --git a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go index a11cb53ca2f..bd88e92c84c 100644 --- a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go @@ -19,20 +19,14 @@ package clusterprovisioner import ( "context" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/controller" - "github.com/knative/eventing/pkg/system" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + util "github.com/knative/eventing/pkg/provisioners" ) const ( @@ -95,7 +89,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // regardless of the error. } - if updateStatusErr := r.updateClusterProvisionerStatus(ctx, cp); updateStatusErr != nil { + if updateStatusErr := util.UpdateClusterProvisionerStatus(ctx, r.client, cp); updateStatusErr != nil { logger.Info("Error updating ClusterProvisioner Status", zap.Error(updateStatusErr)) return reconcile.Result{}, updateStatusErr } @@ -132,7 +126,7 @@ func (r *reconciler) reconcile(ctx context.Context, cp *eventingv1alpha1.Cluster return nil } - if err := r.createDispatcherService(ctx, cp); err != nil { + if err := util.CreateDispatcherService(ctx, r.client, cp); err != nil { logger.Info("Error creating the ClusterProvisioner's K8s Service", zap.Error(err)) return err } @@ -140,81 +134,3 @@ func (r *reconciler) reconcile(ctx context.Context, cp *eventingv1alpha1.Cluster cp.Status.MarkReady() return nil } - -func (r *reconciler) createDispatcherService(ctx context.Context, cp *eventingv1alpha1.ClusterProvisioner) error { - svcName := controller.ClusterBusDispatcherServiceName(cp.Name) - svcKey := types.NamespacedName{ - Namespace: system.Namespace, - Name: svcName, - } - svc := &corev1.Service{} - err := r.client.Get(ctx, svcKey, svc) - - if errors.IsNotFound(err) { - svc = newDispatcherService(cp) - err = r.client.Create(ctx, svc) - } - - // If an error occurred in either Get or Create, we need to reconcile again. - if err != nil { - return err - } - - // Check if this ClusterProvisioner is the owner of the K8s service. - if !metav1.IsControlledBy(svc, cp) { - r.logger.Warn("ClusterProvisioner's K8s Service is not owned by the ClusterProvisioner", zap.Any("clusterProvisioner", cp), zap.Any("service", svc)) - } - return nil -} - -func (r *reconciler) updateClusterProvisionerStatus(ctx context.Context, u *eventingv1alpha1.ClusterProvisioner) error { - o := &eventingv1alpha1.ClusterProvisioner{} - if err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { - r.logger.Info("Error getting ClusterProvisioner for status update", zap.Error(err), zap.Any("updatedClusterProvisioner", u)) - return err - } - - if !equality.Semantic.DeepEqual(o.Status, u.Status) { - o.Status = u.Status - return r.client.Update(ctx, o) - } - return nil -} - -// newDispatcherService creates a new Service for a ClusterBus resource. It also sets -// the appropriate OwnerReferences on the resource so handleObject can discover -// the ClusterBus resource that 'owns' it. -func newDispatcherService(cp *eventingv1alpha1.ClusterProvisioner) *corev1.Service { - labels := dispatcherLabels(cp.Name) - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: controller.ClusterBusDispatcherServiceName(cp.Name), - Namespace: system.Namespace, - Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(cp, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "ClusterProvisioner", - }), - }, - }, - Spec: corev1.ServiceSpec{ - Selector: labels, - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: 80, - TargetPort: intstr.FromInt(8080), - }, - }, - }, - } -} - -func dispatcherLabels(cpName string) map[string]string { - return map[string]string{ - "clusterProvisioner": cpName, - "role": "dispatcher", - } -} diff --git a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile_test.go b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile_test.go index f0bd62dbdce..ebdc5f506cf 100644 --- a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile_test.go @@ -22,8 +22,6 @@ import ( "fmt" "testing" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/system" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -35,7 +33,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" + util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/system" ) const ( @@ -322,10 +323,10 @@ func makeK8sService() *corev1.Service { BlockOwnerDeletion: &truePointer, }, }, - Labels: dispatcherLabels(Name), + Labels: util.DispatcherLabels(Name), }, Spec: corev1.ServiceSpec{ - Selector: dispatcherLabels(Name), + Selector: util.DispatcherLabels(Name), Ports: []corev1.ServicePort{ { Name: "http", diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index e32c3368b85..ade4fc66acd 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -12,8 +12,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "sigs.k8s.io/controller-runtime/pkg/client" - controllerRuntime "sigs.k8s.io/controller-runtime/pkg/client" + runtimeRuntime "sigs.k8s.io/controller-runtime/pkg/client" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/controller" @@ -38,7 +37,7 @@ func RemoveFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { c.Finalizers = finalizers.List() } -func getK8sService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { +func getK8sService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { svcKey := types.NamespacedName{ Namespace: c.Namespace, Name: controller.ChannelServiceName(c.Name), @@ -48,7 +47,7 @@ func getK8sService(ctx context.Context, client client.Client, c *eventingv1alpha return svc, err } -func CreateK8sService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { +func CreateK8sService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { svc, err := getK8sService(ctx, client, c) if errors.IsNotFound(err) { @@ -69,8 +68,8 @@ func CreateK8sService(ctx context.Context, client client.Client, c *eventingv1al return svc, nil } -func getVirtualService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { - vsk := controllerRuntime.ObjectKey{ +func getVirtualService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { + vsk := runtimeRuntime.ObjectKey{ Namespace: c.Namespace, Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), } @@ -79,7 +78,7 @@ func getVirtualService(ctx context.Context, client client.Client, c *eventingv1a return vs, err } -func CreateVirtualService(ctx context.Context, client client.Client, c *eventingv1alpha1.Channel) error { +func CreateVirtualService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) error { virtualService, err := getVirtualService(ctx, client, c) // If the resource doesn't exist, we'll create it @@ -104,9 +103,9 @@ func CreateVirtualService(ctx context.Context, client client.Client, c *eventing return nil } -func UpdateChannel(ctx context.Context, client client.Client, u *eventingv1alpha1.Channel) error { +func UpdateChannel(ctx context.Context, client runtimeRuntime.Client, u *eventingv1alpha1.Channel) error { channel := &eventingv1alpha1.Channel{} - err := client.Get(ctx, controllerRuntime.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel) + err := client.Get(ctx, runtimeRuntime.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel) if err != nil { return err } @@ -122,10 +121,10 @@ func UpdateChannel(ctx context.Context, client client.Client, u *eventingv1alpha updated = true } - if updated == false { - return nil + if updated { + return client.Update(ctx, channel) } - return client.Update(ctx, channel) + return nil } // newK8sService creates a new Service for a Channel resource. It also sets the appropriate diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go new file mode 100644 index 00000000000..f50f9a8a383 --- /dev/null +++ b/pkg/provisioners/provisioner_util.go @@ -0,0 +1,100 @@ +package provisioners + +import ( + "context" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/logging" +) + +func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, cp *eventingv1alpha1.ClusterProvisioner) error { + svcName := controller.ClusterBusDispatcherServiceName(cp.Name) + svcKey := types.NamespacedName{ + Namespace: system.Namespace, + Name: svcName, + } + svc := &corev1.Service{} + err := client.Get(ctx, svcKey, svc) + + if errors.IsNotFound(err) { + svc = newDispatcherService(cp) + err = client.Create(ctx, svc) + } + + // If an error occurred in either Get or Create, we need to reconcile again. + if err != nil { + return err + } + + // Check if this ClusterProvisioner is the owner of the K8s service. + if !metav1.IsControlledBy(svc, cp) { + logger := logging.FromContext(ctx) + logger.Warn("ClusterProvisioner's K8s Service is not owned by the ClusterProvisioner", zap.Any("clusterProvisioner", cp), zap.Any("service", svc)) + } + return nil +} + +func UpdateClusterProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterProvisioner) error { + o := &eventingv1alpha1.ClusterProvisioner{} + if err := client.Get(ctx, runtimeClient.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { + logger := logging.FromContext(ctx) + logger.Info("Error getting ClusterProvisioner for status update", zap.Error(err), zap.Any("updatedClusterProvisioner", u)) + return err + } + + if !equality.Semantic.DeepEqual(o.Status, u.Status) { + o.Status = u.Status + return client.Update(ctx, o) + } + return nil +} + +// newDispatcherService creates a new Service for a ClusterBus resource. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the ClusterBus resource that 'owns' it. +func newDispatcherService(cp *eventingv1alpha1.ClusterProvisioner) *corev1.Service { + labels := DispatcherLabels(cp.Name) + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ClusterBusDispatcherServiceName(cp.Name), + Namespace: system.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(cp, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "ClusterProvisioner", + }), + }, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + } +} + +func DispatcherLabels(cpName string) map[string]string { + return map[string]string{ + "clusterProvisioner": cpName, + "role": "dispatcher", + } +} From 896cc3a02808a9ccd3dc6bcda06403cd59695a9b Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Thu, 25 Oct 2018 15:27:12 -0700 Subject: [PATCH 3/7] Add unit tests --- .../eventing/inmemory/channel/reconcile.go | 24 ++- .../inmemory/clusterprovisioner/reconcile.go | 10 +- pkg/provisioners/channel_util.go | 33 +-- pkg/provisioners/channel_util_test.go | 201 ++++++++++++++++++ pkg/provisioners/provisioner_util.go | 11 +- pkg/provisioners/provisioner_util_test.go | 120 +++++++++++ 6 files changed, 363 insertions(+), 36 deletions(-) create mode 100644 pkg/provisioners/channel_util_test.go create mode 100644 pkg/provisioners/provisioner_util_test.go diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 9a9de170fad..b355d5ad3f5 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -19,6 +19,7 @@ package channel import ( "context" + "github.com/knative/pkg/logging" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -138,18 +139,33 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) util.AddFinalizer(c, finalizerName) c.Status.SetSubscribable(c.Namespace, c.Name) - if svc, err := util.CreateK8sService(ctx, r.client, c); err != nil { + svc, err := util.CreateK8sService(ctx, r.client, c) + if err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) return err - } else { - c.Status.SetSinkable(controller.ServiceHostName(svc.Name, svc.Namespace)) } - if err := util.CreateVirtualService(ctx, r.client, c); err != nil { + // Check if this Channel is the owner of the K8s service. + if !metav1.IsControlledBy(svc, c) { + logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc)) + } + + c.Status.SetSinkable(controller.ServiceHostName(svc.Name, svc.Namespace)) + + virtualService, err := util.CreateVirtualService(ctx, r.client, c) + + if err != nil { logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) return err } + // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't + // consider it an error. + if !metav1.IsControlledBy(virtualService, c) { + logger := logging.FromContext(ctx) + logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) + } + c.Status.MarkProvisioned() return nil } diff --git a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go index bd88e92c84c..8b009798cee 100644 --- a/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterprovisioner/reconcile.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -126,11 +127,18 @@ func (r *reconciler) reconcile(ctx context.Context, cp *eventingv1alpha1.Cluster return nil } - if err := util.CreateDispatcherService(ctx, r.client, cp); err != nil { + svc, err := util.CreateDispatcherService(ctx, r.client, cp) + + if err != nil { logger.Info("Error creating the ClusterProvisioner's K8s Service", zap.Error(err)) return err } + // Check if this ClusterProvisioner is the owner of the K8s service. + if !metav1.IsControlledBy(svc, cp) { + logger.Warn("ClusterProvisioner's K8s Service is not owned by the ClusterProvisioner", zap.Any("clusterProvisioner", cp), zap.Any("service", svc)) + } + cp.Status.MarkReady() return nil } diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index ade4fc66acd..38d751e6a64 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -4,15 +4,13 @@ import ( "context" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" - "github.com/knative/pkg/logging" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - runtimeRuntime "sigs.k8s.io/controller-runtime/pkg/client" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/controller" @@ -37,7 +35,7 @@ func RemoveFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { c.Finalizers = finalizers.List() } -func getK8sService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { +func getK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { svcKey := types.NamespacedName{ Namespace: c.Namespace, Name: controller.ChannelServiceName(c.Name), @@ -47,7 +45,7 @@ func getK8sService(ctx context.Context, client runtimeRuntime.Client, c *eventin return svc, err } -func CreateK8sService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { +func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { svc, err := getK8sService(ctx, client, c) if errors.IsNotFound(err) { @@ -60,16 +58,11 @@ func CreateK8sService(ctx context.Context, client runtimeRuntime.Client, c *even return nil, err } - // Check if this Channel is the owner of the K8s service. - if !metav1.IsControlledBy(svc, c) { - logger := logging.FromContext(ctx) - logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc)) - } return svc, nil } -func getVirtualService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { - vsk := runtimeRuntime.ObjectKey{ +func getVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { + vsk := runtimeClient.ObjectKey{ Namespace: c.Namespace, Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), } @@ -78,7 +71,7 @@ func getVirtualService(ctx context.Context, client runtimeRuntime.Client, c *eve return vs, err } -func CreateVirtualService(ctx context.Context, client runtimeRuntime.Client, c *eventingv1alpha1.Channel) error { +func CreateVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { virtualService, err := getVirtualService(ctx, client, c) // If the resource doesn't exist, we'll create it @@ -91,21 +84,15 @@ func CreateVirtualService(ctx context.Context, client runtimeRuntime.Client, c * // attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. if err != nil { - return err + return nil, err } - // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't - // consider it an error. - if !metav1.IsControlledBy(virtualService, c) { - logger := logging.FromContext(ctx) - logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) - } - return nil + return virtualService, nil } -func UpdateChannel(ctx context.Context, client runtimeRuntime.Client, u *eventingv1alpha1.Channel) error { +func UpdateChannel(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.Channel) error { channel := &eventingv1alpha1.Channel{} - err := client.Get(ctx, runtimeRuntime.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel) + err := client.Get(ctx, runtimeClient.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel) if err != nil { return err } diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go new file mode 100644 index 00000000000..504a42c39c8 --- /dev/null +++ b/pkg/provisioners/channel_util_test.go @@ -0,0 +1,201 @@ +package provisioners + +import ( + "context" + "fmt" + "testing" + + "github.com/knative/pkg/apis" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" +) + +const ( + channelName = "test-channel" + testNS = "test-namespace" + channelClusterProvisionerName = "channel-cluster-provisioner" +) + +var ( + truePointer = true +) + +func init() { + // Add types to scheme. + istiov1alpha3.AddToScheme(scheme.Scheme) + eventingv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestCreateK8sService(t *testing.T) { + want := makeK8sService() + client := fake.NewFakeClient() + got, _ := CreateK8sService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestCreateK8sService_Existing(t *testing.T) { + want := makeK8sService() + client := fake.NewFakeClient(want) + got, _ := CreateK8sService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestCreateVirtualService(t *testing.T) { + want := makeVirtualService() + client := fake.NewFakeClient() + got, _ := CreateVirtualService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("VirtualService (-want, +got) = %v", diff) + } +} + +func TestCreateVirtualService_Existing(t *testing.T) { + want := makeVirtualService() + client := fake.NewFakeClient(want) + got, _ := CreateVirtualService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("VirtualService (-want, +got) = %v", diff) + } +} + +func TestUpdateChannel(t *testing.T) { + oldChannel := getNewChannel() + client := fake.NewFakeClient(oldChannel) + + want := getNewChannel() + AddFinalizer(want, "test-finalizer") + want.Status.SetSinkable("test-domain") + UpdateChannel(context.TODO(), client, want) + + got := &eventingv1alpha1.Channel{} + client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: channelName}, got) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Channel (-want, +got) = %v", diff) + } +} + +func getNewChannel() *eventingv1alpha1.Channel { + channel := &eventingv1alpha1.Channel{ + TypeMeta: channelType(), + ObjectMeta: om(testNS, channelName), + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: channelClusterProvisionerName, + Kind: "ClusterProvisioner", + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }, + }, + }, + } + // selflink is not filled in when we create the object, so clear it + channel.ObjectMeta.SelfLink = "" + return channel +} + +func channelType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + } +} +func om(namespace, name string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name), + } +} + +func makeK8sService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-channel", channelName), + Namespace: testNS, + Labels: map[string]string{ + "channel": channelName, + "provisioner": channelClusterProvisionerName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: channelName, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: PortName, + Port: PortNumber, + }, + }, + }, + } +} + +func makeVirtualService() *istiov1alpha3.VirtualService { + return &istiov1alpha3.VirtualService{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-channel", channelName), + Namespace: testNS, + Labels: map[string]string{ + "channel": channelName, + "provisioner": channelClusterProvisionerName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: channelName, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + }, + Spec: istiov1alpha3.VirtualServiceSpec{ + Hosts: []string{ + fmt.Sprintf("%s-channel.%s.svc.cluster.local", channelName, testNS), + fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS), + }, + Http: []istiov1alpha3.HTTPRoute{{ + Rewrite: &istiov1alpha3.HTTPRewrite{ + Authority: fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS), + }, + Route: []istiov1alpha3.DestinationWeight{{ + Destination: istiov1alpha3.Destination{ + Host: "channel-cluster-provisioner-clusterbus.knative-eventing.svc.cluster.local", + Port: istiov1alpha3.PortSelector{ + Number: PortNumber, + }, + }}, + }}, + }, + }, + } +} diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go index f50f9a8a383..4aec45740ec 100644 --- a/pkg/provisioners/provisioner_util.go +++ b/pkg/provisioners/provisioner_util.go @@ -19,7 +19,7 @@ import ( "github.com/knative/pkg/logging" ) -func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, cp *eventingv1alpha1.ClusterProvisioner) error { +func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, cp *eventingv1alpha1.ClusterProvisioner) (*corev1.Service, error) { svcName := controller.ClusterBusDispatcherServiceName(cp.Name) svcKey := types.NamespacedName{ Namespace: system.Namespace, @@ -35,15 +35,10 @@ func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, c // If an error occurred in either Get or Create, we need to reconcile again. if err != nil { - return err + return nil, err } - // Check if this ClusterProvisioner is the owner of the K8s service. - if !metav1.IsControlledBy(svc, cp) { - logger := logging.FromContext(ctx) - logger.Warn("ClusterProvisioner's K8s Service is not owned by the ClusterProvisioner", zap.Any("clusterProvisioner", cp), zap.Any("service", svc)) - } - return nil + return svc, nil } func UpdateClusterProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterProvisioner) error { diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go new file mode 100644 index 00000000000..628a14bed5b --- /dev/null +++ b/pkg/provisioners/provisioner_util_test.go @@ -0,0 +1,120 @@ +package provisioners + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/apis" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/knative/eventing/pkg/apis/eventing" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" +) + +const ( + clusterProvisionerName = "kafka" +) + +func TestCreateDispatcherService(t *testing.T) { + want := makeDispatcherService() + client := fake.NewFakeClient() + got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterProvisioner()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestCreateDispatcherService_Existing(t *testing.T) { + want := makeDispatcherService() + client := fake.NewFakeClient(want) + got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterProvisioner()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestUpdateClusterProvisioner(t *testing.T) { + cp := getNewClusterProvisioner() + client := fake.NewFakeClient(cp) + + // Update more than just Status + cp.Status.MarkReady() + cp.ObjectMeta.Annotations = map[string]string{"test-annotation": "testing"} + UpdateClusterProvisionerStatus(context.TODO(), client, cp) + + got := &eventingv1alpha1.ClusterProvisioner{} + client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: clusterProvisionerName}, got) + + // Only status should be updated + want := getNewClusterProvisioner() + want.Status.MarkReady() + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("ClusterProvisioner (-want, +got) = %v", diff) + } +} + +func getNewClusterProvisioner() *eventingv1alpha1.ClusterProvisioner { + clusterProvisioner := &eventingv1alpha1.ClusterProvisioner{ + TypeMeta: ClusterProvisonerType(), + ObjectMeta: om(testNS, clusterProvisionerName), + Spec: eventingv1alpha1.ClusterProvisionerSpec{ + Reconciles: metav1.GroupKind{ + Kind: "Channel", + Group: eventing.GroupName, + }, + }, + } + // selflink is not filled in when we create the object, so clear it + clusterProvisioner.ObjectMeta.SelfLink = "" + return clusterProvisioner +} + +func ClusterProvisonerType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "ClusterProvisioner", + } +} + +func makeDispatcherService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace, + Name: fmt.Sprintf("%s-clusterbus", clusterProvisionerName), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "ClusterProvisioner", + Name: clusterProvisionerName, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + Labels: DispatcherLabels(clusterProvisionerName), + }, + Spec: corev1.ServiceSpec{ + Selector: DispatcherLabels(clusterProvisionerName), + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + } +} From e41e09a923b61d44fb0bf3c77c7fe168ad25e889 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Thu, 25 Oct 2018 15:32:31 -0700 Subject: [PATCH 4/7] Fix logging --- pkg/controller/eventing/inmemory/channel/reconcile.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 5d22e1e76e5..a34a3dcb1cd 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -19,7 +19,6 @@ package channel import ( "context" - "github.com/knative/pkg/logging" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -161,7 +160,6 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't // consider it an error. if !metav1.IsControlledBy(virtualService, c) { - logger := logging.FromContext(ctx) logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) } From e28d777f3be902c251ca4473458dd5b7931d32ec Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Mon, 29 Oct 2018 12:33:50 -0700 Subject: [PATCH 5/7] Fix after pr 562 --- .../clusterchannelprovisioner/reconcile.go | 9 ++-- pkg/provisioners/channel_util.go | 6 +-- pkg/provisioners/channel_util_test.go | 21 ++++---- pkg/provisioners/provisioner_util.go | 32 +++++------ pkg/provisioners/provisioner_util_test.go | 54 +++++++++---------- 5 files changed, 57 insertions(+), 65 deletions(-) diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go index 614b7dd51f0..eb7420627bd 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go @@ -19,8 +19,6 @@ package clusterchannelprovisioner import ( "context" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - util "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -28,6 +26,9 @@ import ( "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + util "github.com/knative/eventing/pkg/provisioners" ) const ( @@ -133,9 +134,9 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste return err } - // Check if this ClusterProvisioner is the owner of the K8s service. + // Check if this ClusterChannelProvisioner is the owner of the K8s service. if !metav1.IsControlledBy(svc, ccp) { - logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterProvisioner", zap.Any("clusterProvisioner", cp), zap.Any("service", svc)) + logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterChannelProvisioner", zap.Any("clusterChannelProvisioner", ccp), zap.Any("service", svc)) } ccp.Status.MarkReady() diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index 38d751e6a64..165ed49cea4 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -120,7 +120,7 @@ func UpdateChannel(ctx context.Context, client runtimeClient.Client, u *eventing func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { labels := map[string]string{ "channel": c.Name, - "provisioner": c.Spec.Provisioner.Ref.Name, + "provisioner": c.Spec.Provisioner.Name, } return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -152,9 +152,9 @@ func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.VirtualService { labels := map[string]string{ "channel": channel.Name, - "provisioner": channel.Spec.Provisioner.Ref.Name, + "provisioner": channel.Spec.Provisioner.Name, } - destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Ref.Name), system.Namespace) + destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace) return &istiov1alpha3.VirtualService{ ObjectMeta: metav1.ObjectMeta{ Name: controller.ChannelVirtualServiceName(channel.Name), diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index 504a42c39c8..1b6b8ab8023 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -19,9 +19,8 @@ import ( ) const ( - channelName = "test-channel" - testNS = "test-namespace" - channelClusterProvisionerName = "channel-cluster-provisioner" + channelName = "test-channel" + testNS = "test-namespace" ) var ( @@ -101,12 +100,10 @@ func getNewChannel() *eventingv1alpha1.Channel { TypeMeta: channelType(), ObjectMeta: om(testNS, channelName), Spec: eventingv1alpha1.ChannelSpec{ - Provisioner: &eventingv1alpha1.ProvisionerReference{ - Ref: &corev1.ObjectReference{ - Name: channelClusterProvisionerName, - Kind: "ClusterProvisioner", - APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - }, + Provisioner: &corev1.ObjectReference{ + Name: clusterChannelProvisionerName, + Kind: "ClusterProvisioner", + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), }, }, } @@ -136,7 +133,7 @@ func makeK8sService() *corev1.Service { Namespace: testNS, Labels: map[string]string{ "channel": channelName, - "provisioner": channelClusterProvisionerName, + "provisioner": clusterChannelProvisionerName, }, OwnerReferences: []metav1.OwnerReference{ { @@ -166,7 +163,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { Namespace: testNS, Labels: map[string]string{ "channel": channelName, - "provisioner": channelClusterProvisionerName, + "provisioner": clusterChannelProvisionerName, }, OwnerReferences: []metav1.OwnerReference{ { @@ -189,7 +186,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: "channel-cluster-provisioner-clusterbus.knative-eventing.svc.cluster.local", + Host: fmt.Sprintf("%s-clusterbus.knative-eventing.svc.cluster.local", clusterChannelProvisionerName), Port: istiov1alpha3.PortSelector{ Number: PortNumber, }, diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go index 4aec45740ec..787a1c58155 100644 --- a/pkg/provisioners/provisioner_util.go +++ b/pkg/provisioners/provisioner_util.go @@ -19,8 +19,8 @@ import ( "github.com/knative/pkg/logging" ) -func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, cp *eventingv1alpha1.ClusterProvisioner) (*corev1.Service, error) { - svcName := controller.ClusterBusDispatcherServiceName(cp.Name) +func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, ccp *eventingv1alpha1.ClusterChannelProvisioner) (*corev1.Service, error) { + svcName := controller.ClusterBusDispatcherServiceName(ccp.Name) svcKey := types.NamespacedName{ Namespace: system.Namespace, Name: svcName, @@ -29,7 +29,7 @@ func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, c err := client.Get(ctx, svcKey, svc) if errors.IsNotFound(err) { - svc = newDispatcherService(cp) + svc = newDispatcherService(ccp) err = client.Create(ctx, svc) } @@ -41,11 +41,11 @@ func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, c return svc, nil } -func UpdateClusterProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterProvisioner) error { - o := &eventingv1alpha1.ClusterProvisioner{} +func UpdateClusterChannelProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterChannelProvisioner) error { + o := &eventingv1alpha1.ClusterChannelProvisioner{} if err := client.Get(ctx, runtimeClient.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { logger := logging.FromContext(ctx) - logger.Info("Error getting ClusterProvisioner for status update", zap.Error(err), zap.Any("updatedClusterProvisioner", u)) + logger.Info("Error getting ClusterChannelProvisioner for status update", zap.Error(err), zap.Any("updatedClusterChannelProvisioner", u)) return err } @@ -56,21 +56,21 @@ func UpdateClusterProvisionerStatus(ctx context.Context, client runtimeClient.Cl return nil } -// newDispatcherService creates a new Service for a ClusterBus resource. It also sets +// newDispatcherService creates a new Service for a ClusterChannelProvisioner resource. It also sets // the appropriate OwnerReferences on the resource so handleObject can discover -// the ClusterBus resource that 'owns' it. -func newDispatcherService(cp *eventingv1alpha1.ClusterProvisioner) *corev1.Service { - labels := DispatcherLabels(cp.Name) +// the ClusterChannelProvisioner resource that 'owns' it. +func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner) *corev1.Service { + labels := DispatcherLabels(ccp.Name) return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: controller.ClusterBusDispatcherServiceName(cp.Name), + Name: controller.ClusterBusDispatcherServiceName(ccp.Name), Namespace: system.Namespace, Labels: labels, OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(cp, schema.GroupVersionKind{ + *metav1.NewControllerRef(ccp, schema.GroupVersionKind{ Group: eventingv1alpha1.SchemeGroupVersion.Group, Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "ClusterProvisioner", + Kind: "ClusterChannelProvisioner", }), }, }, @@ -87,9 +87,9 @@ func newDispatcherService(cp *eventingv1alpha1.ClusterProvisioner) *corev1.Servi } } -func DispatcherLabels(cpName string) map[string]string { +func DispatcherLabels(ccpName string) map[string]string { return map[string]string{ - "clusterProvisioner": cpName, - "role": "dispatcher", + "clusterChannelProvisioner": ccpName, + "role": "dispatcher", } } diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go index 628a14bed5b..7f656253ac2 100644 --- a/pkg/provisioners/provisioner_util_test.go +++ b/pkg/provisioners/provisioner_util_test.go @@ -15,18 +15,17 @@ import ( runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/knative/eventing/pkg/apis/eventing" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" ) const ( - clusterProvisionerName = "kafka" + clusterChannelProvisionerName = "kafka" ) func TestCreateDispatcherService(t *testing.T) { want := makeDispatcherService() client := fake.NewFakeClient() - got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterProvisioner()) + got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) if diff := cmp.Diff(want, got, ignore); diff != "" { @@ -37,7 +36,7 @@ func TestCreateDispatcherService(t *testing.T) { func TestCreateDispatcherService_Existing(t *testing.T) { want := makeDispatcherService() client := fake.NewFakeClient(want) - got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterProvisioner()) + got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) if diff := cmp.Diff(want, got, ignore); diff != "" { @@ -45,20 +44,20 @@ func TestCreateDispatcherService_Existing(t *testing.T) { } } -func TestUpdateClusterProvisioner(t *testing.T) { - cp := getNewClusterProvisioner() - client := fake.NewFakeClient(cp) +func TestUpdateClusterChannelProvisioner(t *testing.T) { + ccp := getNewClusterChannelProvisioner() + client := fake.NewFakeClient(ccp) // Update more than just Status - cp.Status.MarkReady() - cp.ObjectMeta.Annotations = map[string]string{"test-annotation": "testing"} - UpdateClusterProvisionerStatus(context.TODO(), client, cp) + ccp.Status.MarkReady() + ccp.ObjectMeta.Annotations = map[string]string{"test-annotation": "testing"} + UpdateClusterChannelProvisionerStatus(context.TODO(), client, ccp) - got := &eventingv1alpha1.ClusterProvisioner{} - client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: clusterProvisionerName}, got) + got := &eventingv1alpha1.ClusterChannelProvisioner{} + client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: clusterChannelProvisionerName}, got) // Only status should be updated - want := getNewClusterProvisioner() + want := getNewClusterChannelProvisioner() want.Status.MarkReady() ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) @@ -67,26 +66,21 @@ func TestUpdateClusterProvisioner(t *testing.T) { } } -func getNewClusterProvisioner() *eventingv1alpha1.ClusterProvisioner { - clusterProvisioner := &eventingv1alpha1.ClusterProvisioner{ +func getNewClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner { + clusterChannelProvisioner := &eventingv1alpha1.ClusterChannelProvisioner{ TypeMeta: ClusterProvisonerType(), - ObjectMeta: om(testNS, clusterProvisionerName), - Spec: eventingv1alpha1.ClusterProvisionerSpec{ - Reconciles: metav1.GroupKind{ - Kind: "Channel", - Group: eventing.GroupName, - }, - }, + ObjectMeta: om(testNS, clusterChannelProvisionerName), + Spec: eventingv1alpha1.ClusterChannelProvisionerSpec{}, } // selflink is not filled in when we create the object, so clear it - clusterProvisioner.ObjectMeta.SelfLink = "" - return clusterProvisioner + clusterChannelProvisioner.ObjectMeta.SelfLink = "" + return clusterChannelProvisioner } func ClusterProvisonerType() metav1.TypeMeta { return metav1.TypeMeta{ APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - Kind: "ClusterProvisioner", + Kind: "ClusterChannelProvisioner", } } @@ -94,20 +88,20 @@ func makeDispatcherService() *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: system.Namespace, - Name: fmt.Sprintf("%s-clusterbus", clusterProvisionerName), + Name: fmt.Sprintf("%s-clusterbus", clusterChannelProvisionerName), OwnerReferences: []metav1.OwnerReference{ { APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - Kind: "ClusterProvisioner", - Name: clusterProvisionerName, + Kind: "ClusterChannelProvisioner", + Name: clusterChannelProvisionerName, Controller: &truePointer, BlockOwnerDeletion: &truePointer, }, }, - Labels: DispatcherLabels(clusterProvisionerName), + Labels: DispatcherLabels(clusterChannelProvisionerName), }, Spec: corev1.ServiceSpec{ - Selector: DispatcherLabels(clusterProvisionerName), + Selector: DispatcherLabels(clusterChannelProvisionerName), Ports: []corev1.ServicePort{ { Name: "http", From c969f922fa9a46fef28b518a201cc9764bc1c27f Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 6 Nov 2018 04:00:58 +0100 Subject: [PATCH 6/7] renaming --- pkg/provisioners/channel_util_test.go | 9 ++++----- pkg/provisioners/provisioner_util_test.go | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index 1b6b8ab8023..5c3c1c7973f 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -5,6 +5,9 @@ import ( "fmt" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/pkg/apis" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" corev1 "k8s.io/api/core/v1" @@ -12,10 +15,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" ) const ( @@ -102,7 +101,7 @@ func getNewChannel() *eventingv1alpha1.Channel { Spec: eventingv1alpha1.ChannelSpec{ Provisioner: &corev1.ObjectReference{ Name: clusterChannelProvisionerName, - Kind: "ClusterProvisioner", + Kind: "ClusterChannelProvisioner", APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), }, }, diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go index 7f656253ac2..6861182a2a3 100644 --- a/pkg/provisioners/provisioner_util_test.go +++ b/pkg/provisioners/provisioner_util_test.go @@ -7,6 +7,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/system" "github.com/knative/pkg/apis" corev1 "k8s.io/api/core/v1" @@ -14,8 +15,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" ) const ( @@ -62,13 +61,13 @@ func TestUpdateClusterChannelProvisioner(t *testing.T) { ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) if diff := cmp.Diff(want, got, ignore); diff != "" { - t.Errorf("ClusterProvisioner (-want, +got) = %v", diff) + t.Errorf("ClusterChannelProvisioner (-want, +got) = %v", diff) } } func getNewClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner { clusterChannelProvisioner := &eventingv1alpha1.ClusterChannelProvisioner{ - TypeMeta: ClusterProvisonerType(), + TypeMeta: ClusterChannelProvisionerType(), ObjectMeta: om(testNS, clusterChannelProvisionerName), Spec: eventingv1alpha1.ClusterChannelProvisionerSpec{}, } @@ -77,7 +76,7 @@ func getNewClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvision return clusterChannelProvisioner } -func ClusterProvisonerType() metav1.TypeMeta { +func ClusterChannelProvisionerType() metav1.TypeMeta { return metav1.TypeMeta{ APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), Kind: "ClusterChannelProvisioner", From 6c1571c360fb71a847b4b410a13047196c4e6ea6 Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Wed, 7 Nov 2018 00:36:41 -0800 Subject: [PATCH 7/7] Fix import ordering and test --- pkg/provisioners/channel_util_test.go | 5 +++-- pkg/provisioners/provisioner_util_test.go | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index 5c3c1c7973f..0503ee778ec 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -7,7 +7,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/pkg/apis" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" corev1 "k8s.io/api/core/v1" @@ -15,6 +14,8 @@ import ( "k8s.io/client-go/kubernetes/scheme" runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" ) const ( @@ -82,7 +83,7 @@ func TestUpdateChannel(t *testing.T) { want := getNewChannel() AddFinalizer(want, "test-finalizer") - want.Status.SetSinkable("test-domain") + want.Status.SetAddress("test-domain") UpdateChannel(context.TODO(), client, want) got := &eventingv1alpha1.Channel{} diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go index 6861182a2a3..b93e7b780be 100644 --- a/pkg/provisioners/provisioner_util_test.go +++ b/pkg/provisioners/provisioner_util_test.go @@ -7,14 +7,15 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/system" "github.com/knative/pkg/apis" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/system" ) const (