Skip to content
204 changes: 19 additions & 185 deletions pkg/controller/eventing/inmemory/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,25 @@ package channel
import (
"context"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/controller"
ccpcontroller "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterchannelprovisioner"
"github.com/knative/eventing/pkg/sidecar/configmap"
"github.com/knative/eventing/pkg/sidecar/fanout"
"github.com/knative/eventing/pkg/sidecar/multichannelfanout"
"github.com/knative/eventing/pkg/system"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/controller"
ccpcontroller "github.com/knative/eventing/pkg/controller/eventing/inmemory/clusterchannelprovisioner"
util "github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/sidecar/configmap"
"github.com/knative/eventing/pkg/sidecar/fanout"
"github.com/knative/eventing/pkg/sidecar/multichannelfanout"
)

const (
portName = "http"
portNumber = 80
finalizerName = controllerAgentName
)

Expand Down Expand Up @@ -100,7 +95,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
// regardless of the error.
}

if updateStatusErr := r.updateChannel(ctx, c); updateStatusErr != nil {
if updateStatusErr := util.UpdateChannel(ctx, r.client, c); updateStatusErr != nil {
logger.Info("Error updating Channel Status", zap.Error(updateStatusErr))
return reconcile.Result{}, updateStatusErr
}
Expand Down Expand Up @@ -136,200 +131,39 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
if c.DeletionTimestamp != nil {
// K8s garbage collection will delete the K8s service and VirtualService for this channel.
// We use a finalizer to ensure the channel config has been synced.
r.removeFinalizer(c)
util.RemoveFinalizer(c, finalizerName)
return nil
}

r.addFinalizer(c)
util.AddFinalizer(c, finalizerName)

if svc, err := r.createK8sService(ctx, c); err != nil {
svc, err := util.CreateK8sService(ctx, r.client, c)
if err != nil {
logger.Info("Error creating the Channel's K8s Service", zap.Error(err))
return err
} else {
c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace))
}

if err := r.createVirtualService(ctx, c); err != nil {
logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
return err
}

c.Status.MarkProvisioned()
return nil
}

func (r *reconciler) addFinalizer(c *eventingv1alpha1.Channel) {
finalizers := sets.NewString(c.Finalizers...)
finalizers.Insert(finalizerName)
c.Finalizers = finalizers.List()
}

func (r *reconciler) removeFinalizer(c *eventingv1alpha1.Channel) {
finalizers := sets.NewString(c.Finalizers...)
finalizers.Delete(finalizerName)
c.Finalizers = finalizers.List()
}

func (r *reconciler) getK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) {
svcKey := types.NamespacedName{
Namespace: c.Namespace,
Name: controller.ChannelServiceName(c.Name),
}
svc := &corev1.Service{}
err := r.client.Get(ctx, svcKey, svc)
return svc, err
}

func (r *reconciler) createK8sService(ctx context.Context, c *eventingv1alpha1.Channel) (*corev1.Service, error) {
svc, err := r.getK8sService(ctx, c)

if errors.IsNotFound(err) {
svc = newK8sService(c)
err = r.client.Create(ctx, svc)
}

// If an error occurred in either Get or Create, we need to reconcile again.
if err != nil {
return nil, err
}

// Check if this Channel is the owner of the K8s service.
if !metav1.IsControlledBy(svc, c) {
r.logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc))
logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", c), zap.Any("service", svc))
}
return svc, nil
}

func (r *reconciler) getVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) (*istiov1alpha3.VirtualService, error) {
vsk := client.ObjectKey{
Namespace: c.Namespace,
Name: controller.ChannelVirtualServiceName(c.ObjectMeta.Name),
}
vs := &istiov1alpha3.VirtualService{}
err := r.client.Get(ctx, vsk, vs)
return vs, err
}

func (r *reconciler) createVirtualService(ctx context.Context, c *eventingv1alpha1.Channel) error {
virtualService, err := r.getVirtualService(ctx, c)
c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace))

// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
virtualService = newVirtualService(c)
err = r.client.Create(ctx, virtualService)
}
virtualService, err := util.CreateVirtualService(ctx, r.client, c)

// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
return err
}

// If the Virtual Service is not controlled by this Channel, we should log a warning, but don't
// consider it an error.
if !metav1.IsControlledBy(virtualService, c) {
r.logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService))
}
return nil
}

// newK8sService creates a new Service for a Channel resource. It also sets the appropriate
// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it.
// As well as being garbage collected when the Channel is deleted.
func newK8sService(c *eventingv1alpha1.Channel) *corev1.Service {
labels := map[string]string{
"channel": c.Name,
"provisioner": c.Spec.Provisioner.Name,
}
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: controller.ChannelServiceName(c.ObjectMeta.Name),
Namespace: c.Namespace,
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(c, schema.GroupVersionKind{
Group: eventingv1alpha1.SchemeGroupVersion.Group,
Version: eventingv1alpha1.SchemeGroupVersion.Version,
Kind: "Channel",
}),
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: portName,
Port: portNumber,
},
},
},
}
}

// newVirtualService creates a new VirtualService for a Channel resource. It also sets the
// appropriate OwnerReferences on the resource so handleObject can discover the Channel resource
// that 'owns' it. As well as being garbage collected when the Channel is deleted.
func newVirtualService(channel *eventingv1alpha1.Channel) *istiov1alpha3.VirtualService {
labels := map[string]string{
"channel": channel.Name,
"provisioner": channel.Spec.Provisioner.Name,
}
destinationHost := controller.ServiceHostName(controller.ClusterBusDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace)
return &istiov1alpha3.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: controller.ChannelVirtualServiceName(channel.Name),
Namespace: channel.Namespace,
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(channel, schema.GroupVersionKind{
Group: eventingv1alpha1.SchemeGroupVersion.Group,
Version: eventingv1alpha1.SchemeGroupVersion.Version,
Kind: "Channel",
}),
},
},
Spec: istiov1alpha3.VirtualServiceSpec{
Hosts: []string{
controller.ServiceHostName(controller.ChannelServiceName(channel.Name), channel.Namespace),
controller.ChannelHostName(channel.Name, channel.Namespace),
},
Http: []istiov1alpha3.HTTPRoute{{
Rewrite: &istiov1alpha3.HTTPRewrite{
Authority: controller.ChannelHostName(channel.Name, channel.Namespace),
},
Route: []istiov1alpha3.DestinationWeight{{
Destination: istiov1alpha3.Destination{
Host: destinationHost,
Port: istiov1alpha3.PortSelector{
Number: portNumber,
},
}},
}},
},
},
logger.Warn("VirtualService not owned by Channel", zap.Any("channel", c), zap.Any("virtualService", virtualService))
}
}

func (r *reconciler) updateChannel(ctx context.Context, u *eventingv1alpha1.Channel) error {
o := &eventingv1alpha1.Channel{}
if err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, o); err != nil {
r.logger.Info("Error getting Channel for status update", zap.Error(err), zap.Any("updatedChannel", u))
return err
}

updated := false
if !equality.Semantic.DeepEqual(o.Finalizers, u.Finalizers) {
updated = true
o.SetFinalizers(u.Finalizers)
}
if !equality.Semantic.DeepEqual(o.Status, u.Status) {
updated = true
o.Status = u.Status
}

if updated {
return r.client.Update(ctx, o)
}
c.Status.MarkProvisioned()
return nil
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/eventing/inmemory/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
controllertesting "github.com/knative/eventing/pkg/controller/testing"
util "github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/sidecar/configmap"
"github.com/knative/eventing/pkg/sidecar/fanout"
"github.com/knative/eventing/pkg/sidecar/multichannelfanout"
Expand Down Expand Up @@ -611,8 +612,8 @@ func makeK8sService() *corev1.Service {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: portName,
Port: portNumber,
Name: util.PortName,
Port: util.PortNumber,
},
},
},
Expand Down Expand Up @@ -662,7 +663,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService {
Destination: istiov1alpha3.Destination{
Host: "in-memory-channel-clusterbus.knative-eventing.svc.cluster.local",
Port: istiov1alpha3.PortSelector{
Number: portNumber,
Number: util.PortNumber,
},
}},
}},
Expand Down
Loading