diff --git a/pkg/controller/eventing/inmemory/channel/reconcile.go b/pkg/controller/eventing/inmemory/channel/reconcile.go index 405312dbcc9..b8f56034219 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile.go @@ -19,30 +19,25 @@ package channel import ( "context" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/controller" - ccpcontroller "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterchannelprovisioner" - "github.com/knative/eventing/pkg/sidecar/configmap" - "github.com/knative/eventing/pkg/sidecar/fanout" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" - "github.com/knative/eventing/pkg/system" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + ccpcontroller "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterchannelprovisioner" + util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" ) const ( - portName = "http" - portNumber = 80 finalizerName = controllerAgentName ) @@ -100,7 +95,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // regardless of the error. } - if updateStatusErr := r.updateChannel(ctx, c); updateStatusErr != nil { + if updateStatusErr := util.UpdateChannel(ctx, r.client, c); updateStatusErr != nil { logger.Info("Error updating Channel Status", zap.Error(updateStatusErr)) return reconcile.Result{}, updateStatusErr } @@ -136,200 +131,39 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) if c.DeletionTimestamp != nil { // K8s garbage collection will delete the K8s service and VirtualService for this channel. // We use a finalizer to ensure the channel config has been synced. - r.removeFinalizer(c) + util.RemoveFinalizer(c, finalizerName) return nil } - r.addFinalizer(c) + util.AddFinalizer(c, finalizerName) - if svc, err := r.createK8sService(ctx, c); err != nil { + svc, err := util.CreateK8sService(ctx, r.client, c) + if err != nil { logger.Info("Error creating the Channel's K8s Service", zap.Error(err)) return err - } else { - c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace)) - } - - if err := r.createVirtualService(ctx, c); err != nil { - logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) - return err - } - - c.Status.MarkProvisioned() - return nil -} - -func (r *reconciler) addFinalizer(c *eventingv1alpha1.Channel) { - finalizers := sets.NewString(c.Finalizers...) - finalizers.Insert(finalizerName) - c.Finalizers = finalizers.List() -} - -func (r *reconciler) removeFinalizer(c *eventingv1alpha1.Channel) { - finalizers := sets.NewString(c.Finalizers...) - finalizers.Delete(finalizerName) - c.Finalizers = finalizers.List() -} - -func (r *reconciler) getK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) { - svcKey := types.NamespacedName{ - Namespace: c.Namespace, - Name: controller.ChannelServiceName(c.Name), - } - svc := &corev1.Service{} - err := r.client.Get(ctx, svcKey, svc) - return svc, err -} - -func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) { - svc, err := r.getK8sService(ctx, c) - - if errors.IsNotFound(err) { - svc = newK8sService(c) - err = r.client.Create(ctx, svc) - } - - // If an error occurred in either Get or Create, we need to reconcile again. - if err != nil { - return nil, err } // Check if this Channel is the owner of the K8s service. if !metav1.IsControlledBy(svc, c) { - r.logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc)) + logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc)) } - return svc, nil -} -func (r *reconciler) getVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { - vsk := client.ObjectKey{ - Namespace: c.Namespace, - Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), - } - vs := &istiov1alpha3.VirtualService{} - err := r.client.Get(ctx, vsk, vs) - return vs, err -} - -func (r *reconciler) createVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) error { - virtualService, err := r.getVirtualService(ctx, c) + c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace)) - // If the resource doesn't exist, we'll create it - if errors.IsNotFound(err) { - virtualService = newVirtualService(c) - err = r.client.Create(ctx, virtualService) - } + virtualService, err := util.CreateVirtualService(ctx, r.client, c) - // If an error occurs during Get/Create, we'll requeue the item so we can - // attempt processing again later. This could have been caused by a - // temporary network failure, or any other transient reason. if err != nil { + logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err)) return err } // If the Virtual Service is not controlled by this Channel, we should log a warning, but don't // consider it an error. if !metav1.IsControlledBy(virtualService, c) { - r.logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) - } - return nil -} - -// newK8sService creates a new Service for a Channel resource. It also sets the appropriate -// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it. -// As well as being garbage collected when the Channel is deleted. -func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { - labels := map[string]string{ - "channel": c.Name, - "provisioner": c.Spec.Provisioner.Name, - } - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: controller.ChannelServiceName(c.ObjectMeta.Name), - Namespace: c.Namespace, - Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(c, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "Channel", - }), - }, - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Name: portName, - Port: portNumber, - }, - }, - }, - } -} - -// newVirtualService creates a new VirtualService for a Channel resource. It also sets the -// appropriate OwnerReferences on the resource so handleObject can discover the Channel resource -// that 'owns' it. As well as being garbage collected when the Channel is deleted. -func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.VirtualService { - labels := map[string]string{ - "channel": channel.Name, - "provisioner": channel.Spec.Provisioner.Name, - } - destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace) - return &istiov1alpha3.VirtualService{ - ObjectMeta: metav1.ObjectMeta{ - Name: controller.ChannelVirtualServiceName(channel.Name), - Namespace: channel.Namespace, - Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(channel, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "Channel", - }), - }, - }, - Spec: istiov1alpha3.VirtualServiceSpec{ - Hosts: []string{ - controller.ServiceHostName(controller.ChannelServiceName(channel.Name), channel.Namespace), - controller.ChannelHostName(channel.Name, channel.Namespace), - }, - Http: []istiov1alpha3.HTTPRoute{{ - Rewrite: &istiov1alpha3.HTTPRewrite{ - Authority: controller.ChannelHostName(channel.Name, channel.Namespace), - }, - Route: []istiov1alpha3.DestinationWeight{{ - Destination: istiov1alpha3.Destination{ - Host: destinationHost, - Port: istiov1alpha3.PortSelector{ - Number: portNumber, - }, - }}, - }}, - }, - }, + logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService)) } -} -func (r *reconciler) updateChannel(ctx context.Context, u *eventingv1alpha1.Channel) error { - o := &eventingv1alpha1.Channel{} - if err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { - r.logger.Info("Error getting Channel for status update", zap.Error(err), zap.Any("updatedChannel", u)) - return err - } - - updated := false - if !equality.Semantic.DeepEqual(o.Finalizers, u.Finalizers) { - updated = true - o.SetFinalizers(u.Finalizers) - } - if !equality.Semantic.DeepEqual(o.Status, u.Status) { - updated = true - o.Status = u.Status - } - - if updated { - return r.client.Update(ctx, o) - } + c.Status.MarkProvisioned() return nil } diff --git a/pkg/controller/eventing/inmemory/channel/reconcile_test.go b/pkg/controller/eventing/inmemory/channel/reconcile_test.go index 16199c19bc5..7542a4df80e 100644 --- a/pkg/controller/eventing/inmemory/channel/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/channel/reconcile_test.go @@ -27,6 +27,7 @@ import ( eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" + util "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/sidecar/configmap" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" @@ -611,8 +612,8 @@ func makeK8sService() *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: portName, - Port: portNumber, + Name: util.PortName, + Port: util.PortNumber, }, }, }, @@ -662,7 +663,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { Destination: istiov1alpha3.Destination{ Host: "in-memory-channel-clusterbus.knative-eventing.svc.cluster.local", Port: istiov1alpha3.PortSelector{ - Number: portNumber, + Number: util.PortNumber, }, }}, }}, diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go index 10a8dac8600..eb7420627bd 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go @@ -19,20 +19,16 @@ package clusterchannelprovisioner import ( "context" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/controller" - "github.com/knative/eventing/pkg/system" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + util "github.com/knative/eventing/pkg/provisioners" ) const ( @@ -95,7 +91,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err // regardless of the error. } - if updateStatusErr := r.updateClusterChannelProvisionerStatus(ctx, ccp); updateStatusErr != nil { + if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, ccp); updateStatusErr != nil { logger.Info("Error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr)) return reconcile.Result{}, updateStatusErr } @@ -131,89 +127,18 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste return nil } - if err := r.createDispatcherService(ctx, ccp); err != nil { - logger.Info("Error creating the ClusterChannelProvisioner's K8s Service", zap.Error(err)) - return err - } - - ccp.Status.MarkReady() - return nil -} - -func (r *reconciler) createDispatcherService(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error { - svcName := controller.ClusterBusDispatcherServiceName(ccp.Name) - svcKey := types.NamespacedName{ - Namespace: system.Namespace, - Name: svcName, - } - svc := &corev1.Service{} - err := r.client.Get(ctx, svcKey, svc) - - if errors.IsNotFound(err) { - svc = newDispatcherService(ccp) - err = r.client.Create(ctx, svc) - } + svc, err := util.CreateDispatcherService(ctx, r.client, ccp) - // If an error occurred in either Get or Create, we need to reconcile again. if err != nil { + logger.Info("Error creating the ClusterChannelProvisioner's K8s Service", zap.Error(err)) return err } // Check if this ClusterChannelProvisioner is the owner of the K8s service. if !metav1.IsControlledBy(svc, ccp) { - r.logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterChannelProvisioner", zap.Any("clusterChannelProvisioner", ccp), zap.Any("service", svc)) + logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterChannelProvisioner", zap.Any("clusterChannelProvisioner", ccp), zap.Any("service", svc)) } - return nil -} -func (r *reconciler) updateClusterChannelProvisionerStatus(ctx context.Context, u *eventingv1alpha1.ClusterChannelProvisioner) error { - o := &eventingv1alpha1.ClusterChannelProvisioner{} - if err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { - r.logger.Info("Error getting ClusterChannelProvisioner for status update", zap.Error(err), zap.Any("updatedClusterChannelProvisioner", u)) - return err - } - - if !equality.Semantic.DeepEqual(o.Status, u.Status) { - o.Status = u.Status - return r.client.Update(ctx, o) - } + ccp.Status.MarkReady() return nil } - -// newDispatcherService creates a new Service for a ClusterBus resource. It also sets -// the appropriate OwnerReferences on the resource so handleObject can discover -// the ClusterBus resource that 'owns' it. -func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner) *corev1.Service { - labels := dispatcherLabels(ccp.Name) - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: controller.ClusterBusDispatcherServiceName(ccp.Name), - Namespace: system.Namespace, - Labels: labels, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(ccp, schema.GroupVersionKind{ - Group: eventingv1alpha1.SchemeGroupVersion.Group, - Version: eventingv1alpha1.SchemeGroupVersion.Version, - Kind: "ClusterChannelProvisioner", - }), - }, - }, - Spec: corev1.ServiceSpec{ - Selector: labels, - Ports: []corev1.ServicePort{ - { - Name: "http", - Port: 80, - TargetPort: intstr.FromInt(8080), - }, - }, - }, - } -} - -func dispatcherLabels(ccpName string) map[string]string { - return map[string]string{ - "clusterChannelProvisioner": ccpName, - "role": "dispatcher", - } -} diff --git a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go index b4f9945cc2f..49f0f709c8f 100644 --- a/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile_test.go @@ -22,8 +22,6 @@ import ( "fmt" "testing" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/system" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -35,7 +33,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/controller/testing" + util "github.com/knative/eventing/pkg/provisioners" + "github.com/knative/eventing/pkg/system" ) const ( @@ -290,10 +291,10 @@ func makeK8sService() *corev1.Service { BlockOwnerDeletion: &truePointer, }, }, - Labels: dispatcherLabels(Name), + Labels: util.DispatcherLabels(Name), }, Spec: corev1.ServiceSpec{ - Selector: dispatcherLabels(Name), + Selector: util.DispatcherLabels(Name), Ports: []corev1.ServicePort{ { Name: "http", diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go new file mode 100644 index 00000000000..165ed49cea4 --- /dev/null +++ b/pkg/provisioners/channel_util.go @@ -0,0 +1,191 @@ +package provisioners + +import ( + "context" + + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + "github.com/knative/eventing/pkg/system" + "k8s.io/apimachinery/pkg/api/equality" +) + +const ( + PortName = "http" + PortNumber = 80 +) + +func AddFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { + finalizers := sets.NewString(c.Finalizers...) + finalizers.Insert(finalizerName) + c.Finalizers = finalizers.List() +} + +func RemoveFinalizer(c *eventingv1alpha1.Channel, finalizerName string) { + finalizers := sets.NewString(c.Finalizers...) + finalizers.Delete(finalizerName) + c.Finalizers = finalizers.List() +} + +func getK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { + svcKey := types.NamespacedName{ + Namespace: c.Namespace, + Name: controller.ChannelServiceName(c.Name), + } + svc := &corev1.Service{} + err := client.Get(ctx, svcKey, svc) + return svc, err +} + +func CreateK8sService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*corev1.Service, error) { + svc, err := getK8sService(ctx, client, c) + + if errors.IsNotFound(err) { + svc = newK8sService(c) + err = client.Create(ctx, svc) + } + + // If an error occurred in either Get or Create, we need to reconcile again. + if err != nil { + return nil, err + } + + return svc, nil +} + +func getVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { + vsk := runtimeClient.ObjectKey{ + Namespace: c.Namespace, + Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name), + } + vs := &istiov1alpha3.VirtualService{} + err := client.Get(ctx, vsk, vs) + return vs, err +} + +func CreateVirtualService(ctx context.Context, client runtimeClient.Client, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) { + virtualService, err := getVirtualService(ctx, client, c) + + // If the resource doesn't exist, we'll create it + if errors.IsNotFound(err) { + virtualService = newVirtualService(c) + err = client.Create(ctx, virtualService) + } + + // If an error occurs during Get/Create, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + + return virtualService, nil +} + +func UpdateChannel(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.Channel) error { + channel := &eventingv1alpha1.Channel{} + err := client.Get(ctx, runtimeClient.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel) + if err != nil { + return err + } + + updated := false + if !equality.Semantic.DeepEqual(channel.Finalizers, u.Finalizers) { + channel.SetFinalizers(u.ObjectMeta.Finalizers) + updated = true + } + + if !equality.Semantic.DeepEqual(channel.Status, u.Status) { + channel.Status = u.Status + updated = true + } + + if updated { + return client.Update(ctx, channel) + } + return nil +} + +// newK8sService creates a new Service for a Channel resource. It also sets the appropriate +// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it. +// As well as being garbage collected when the Channel is deleted. +func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service { + labels := map[string]string{ + "channel": c.Name, + "provisioner": c.Spec.Provisioner.Name, + } + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ChannelServiceName(c.ObjectMeta.Name), + Namespace: c.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(c, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "Channel", + }), + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: PortName, + Port: PortNumber, + }, + }, + }, + } +} + +// newVirtualService creates a new VirtualService for a Channel resource. It also sets the +// appropriate OwnerReferences on the resource so handleObject can discover the Channel resource +// that 'owns' it. As well as being garbage collected when the Channel is deleted. +func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.VirtualService { + labels := map[string]string{ + "channel": channel.Name, + "provisioner": channel.Spec.Provisioner.Name, + } + destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace) + return &istiov1alpha3.VirtualService{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ChannelVirtualServiceName(channel.Name), + Namespace: channel.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(channel, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "Channel", + }), + }, + }, + Spec: istiov1alpha3.VirtualServiceSpec{ + Hosts: []string{ + controller.ServiceHostName(controller.ChannelServiceName(channel.Name), channel.Namespace), + controller.ChannelHostName(channel.Name, channel.Namespace), + }, + Http: []istiov1alpha3.HTTPRoute{{ + Rewrite: &istiov1alpha3.HTTPRewrite{ + Authority: controller.ChannelHostName(channel.Name, channel.Namespace), + }, + Route: []istiov1alpha3.DestinationWeight{{ + Destination: istiov1alpha3.Destination{ + Host: destinationHost, + Port: istiov1alpha3.PortSelector{ + Number: PortNumber, + }, + }}, + }}, + }, + }, + } +} diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go new file mode 100644 index 00000000000..0503ee778ec --- /dev/null +++ b/pkg/provisioners/channel_util_test.go @@ -0,0 +1,198 @@ +package provisioners + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/knative/pkg/apis" + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" +) + +const ( + channelName = "test-channel" + testNS = "test-namespace" +) + +var ( + truePointer = true +) + +func init() { + // Add types to scheme. + istiov1alpha3.AddToScheme(scheme.Scheme) + eventingv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestCreateK8sService(t *testing.T) { + want := makeK8sService() + client := fake.NewFakeClient() + got, _ := CreateK8sService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestCreateK8sService_Existing(t *testing.T) { + want := makeK8sService() + client := fake.NewFakeClient(want) + got, _ := CreateK8sService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestCreateVirtualService(t *testing.T) { + want := makeVirtualService() + client := fake.NewFakeClient() + got, _ := CreateVirtualService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("VirtualService (-want, +got) = %v", diff) + } +} + +func TestCreateVirtualService_Existing(t *testing.T) { + want := makeVirtualService() + client := fake.NewFakeClient(want) + got, _ := CreateVirtualService(context.TODO(), client, getNewChannel()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("VirtualService (-want, +got) = %v", diff) + } +} + +func TestUpdateChannel(t *testing.T) { + oldChannel := getNewChannel() + client := fake.NewFakeClient(oldChannel) + + want := getNewChannel() + AddFinalizer(want, "test-finalizer") + want.Status.SetAddress("test-domain") + UpdateChannel(context.TODO(), client, want) + + got := &eventingv1alpha1.Channel{} + client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: channelName}, got) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Channel (-want, +got) = %v", diff) + } +} + +func getNewChannel() *eventingv1alpha1.Channel { + channel := &eventingv1alpha1.Channel{ + TypeMeta: channelType(), + ObjectMeta: om(testNS, channelName), + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &corev1.ObjectReference{ + Name: clusterChannelProvisionerName, + Kind: "ClusterChannelProvisioner", + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }, + }, + } + // selflink is not filled in when we create the object, so clear it + channel.ObjectMeta.SelfLink = "" + return channel +} + +func channelType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + } +} +func om(namespace, name string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + SelfLink: fmt.Sprintf("/apis/eventing/v1alpha1/namespaces/%s/object/%s", namespace, name), + } +} + +func makeK8sService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-channel", channelName), + Namespace: testNS, + Labels: map[string]string{ + "channel": channelName, + "provisioner": clusterChannelProvisionerName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: channelName, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: PortName, + Port: PortNumber, + }, + }, + }, + } +} + +func makeVirtualService() *istiov1alpha3.VirtualService { + return &istiov1alpha3.VirtualService{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-channel", channelName), + Namespace: testNS, + Labels: map[string]string{ + "channel": channelName, + "provisioner": clusterChannelProvisionerName, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: channelName, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + }, + Spec: istiov1alpha3.VirtualServiceSpec{ + Hosts: []string{ + fmt.Sprintf("%s-channel.%s.svc.cluster.local", channelName, testNS), + fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS), + }, + Http: []istiov1alpha3.HTTPRoute{{ + Rewrite: &istiov1alpha3.HTTPRewrite{ + Authority: fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS), + }, + Route: []istiov1alpha3.DestinationWeight{{ + Destination: istiov1alpha3.Destination{ + Host: fmt.Sprintf("%s-clusterbus.knative-eventing.svc.cluster.local", clusterChannelProvisionerName), + Port: istiov1alpha3.PortSelector{ + Number: PortNumber, + }, + }}, + }}, + }, + }, + } +} diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go new file mode 100644 index 00000000000..787a1c58155 --- /dev/null +++ b/pkg/provisioners/provisioner_util.go @@ -0,0 +1,95 @@ +package provisioners + +import ( + "context" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/logging" +) + +func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, ccp *eventingv1alpha1.ClusterChannelProvisioner) (*corev1.Service, error) { + svcName := controller.ClusterBusDispatcherServiceName(ccp.Name) + svcKey := types.NamespacedName{ + Namespace: system.Namespace, + Name: svcName, + } + svc := &corev1.Service{} + err := client.Get(ctx, svcKey, svc) + + if errors.IsNotFound(err) { + svc = newDispatcherService(ccp) + err = client.Create(ctx, svc) + } + + // If an error occurred in either Get or Create, we need to reconcile again. + if err != nil { + return nil, err + } + + return svc, nil +} + +func UpdateClusterChannelProvisionerStatus(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.ClusterChannelProvisioner) error { + o := &eventingv1alpha1.ClusterChannelProvisioner{} + if err := client.Get(ctx, runtimeClient.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil { + logger := logging.FromContext(ctx) + logger.Info("Error getting ClusterChannelProvisioner for status update", zap.Error(err), zap.Any("updatedClusterChannelProvisioner", u)) + return err + } + + if !equality.Semantic.DeepEqual(o.Status, u.Status) { + o.Status = u.Status + return client.Update(ctx, o) + } + return nil +} + +// newDispatcherService creates a new Service for a ClusterChannelProvisioner resource. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the ClusterChannelProvisioner resource that 'owns' it. +func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner) *corev1.Service { + labels := DispatcherLabels(ccp.Name) + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controller.ClusterBusDispatcherServiceName(ccp.Name), + Namespace: system.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(ccp, schema.GroupVersionKind{ + Group: eventingv1alpha1.SchemeGroupVersion.Group, + Version: eventingv1alpha1.SchemeGroupVersion.Version, + Kind: "ClusterChannelProvisioner", + }), + }, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + } +} + +func DispatcherLabels(ccpName string) map[string]string { + return map[string]string{ + "clusterChannelProvisioner": ccpName, + "role": "dispatcher", + } +} diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go new file mode 100644 index 00000000000..b93e7b780be --- /dev/null +++ b/pkg/provisioners/provisioner_util_test.go @@ -0,0 +1,114 @@ +package provisioners + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/knative/pkg/apis" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/system" +) + +const ( + clusterChannelProvisionerName = "kafka" +) + +func TestCreateDispatcherService(t *testing.T) { + want := makeDispatcherService() + client := fake.NewFakeClient() + got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestCreateDispatcherService_Existing(t *testing.T) { + want := makeDispatcherService() + client := fake.NewFakeClient(want) + got, _ := CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("Service (-want, +got) = %v", diff) + } +} + +func TestUpdateClusterChannelProvisioner(t *testing.T) { + ccp := getNewClusterChannelProvisioner() + client := fake.NewFakeClient(ccp) + + // Update more than just Status + ccp.Status.MarkReady() + ccp.ObjectMeta.Annotations = map[string]string{"test-annotation": "testing"} + UpdateClusterChannelProvisionerStatus(context.TODO(), client, ccp) + + got := &eventingv1alpha1.ClusterChannelProvisioner{} + client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: testNS, Name: clusterChannelProvisionerName}, got) + + // Only status should be updated + want := getNewClusterChannelProvisioner() + want.Status.MarkReady() + + ignore := cmpopts.IgnoreTypes(apis.VolatileTime{}) + if diff := cmp.Diff(want, got, ignore); diff != "" { + t.Errorf("ClusterChannelProvisioner (-want, +got) = %v", diff) + } +} + +func getNewClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner { + clusterChannelProvisioner := &eventingv1alpha1.ClusterChannelProvisioner{ + TypeMeta: ClusterChannelProvisionerType(), + ObjectMeta: om(testNS, clusterChannelProvisionerName), + Spec: eventingv1alpha1.ClusterChannelProvisionerSpec{}, + } + // selflink is not filled in when we create the object, so clear it + clusterChannelProvisioner.ObjectMeta.SelfLink = "" + return clusterChannelProvisioner +} + +func ClusterChannelProvisionerType() metav1.TypeMeta { + return metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "ClusterChannelProvisioner", + } +} + +func makeDispatcherService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace, + Name: fmt.Sprintf("%s-clusterbus", clusterChannelProvisionerName), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "ClusterChannelProvisioner", + Name: clusterChannelProvisionerName, + Controller: &truePointer, + BlockOwnerDeletion: &truePointer, + }, + }, + Labels: DispatcherLabels(clusterChannelProvisionerName), + }, + Spec: corev1.ServiceSpec{ + Selector: DispatcherLabels(clusterChannelProvisionerName), + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + } +}