Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/apis/eventing/v1alpha1/broker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (bs *BrokerStatus) PropagateFilterDeploymentAvailability(d *appsv1.Deployme
}
}

func (bs *BrokerStatus) MarkIngressReady() {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngress)
}

func (bs *BrokerStatus) MarkFilterReady() {
brokerCondSet.Manage(bs).MarkTrue(BrokerConditionFilter)
}

// SetAddress makes this Broker addressable by setting the hostname. It also
// sets the BrokerConditionAddressable to true.
func (bs *BrokerStatus) SetAddress(url *apis.URL) {
Expand Down
134 changes: 29 additions & 105 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,15 @@ import (
"fmt"

"go.uber.org/zap"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmeta"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/apis/eventing"
Expand All @@ -45,7 +42,7 @@ import (
"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler"
"knative.dev/eventing/pkg/reconciler/broker/resources"
"knative.dev/eventing/pkg/reconciler/names"
"knative.dev/eventing/pkg/reconciler/internal/service"
duckapis "knative.dev/pkg/apis/duck"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
Expand All @@ -62,11 +59,11 @@ type Reconciler struct {

// listers index properties about resources
brokerLister eventinglisters.BrokerLister
serviceLister corev1listers.ServiceLister
deploymentLister appsv1listers.DeploymentLister
subscriptionLister messaginglisters.SubscriptionLister
triggerLister eventinglisters.TriggerLister

serviceReconciler service.Reconciler

channelableTracker duck.ListableTracker

ingressImage string
Expand Down Expand Up @@ -112,12 +109,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr
return nil
}
}
filterSvc, err := r.reconcileKind(ctx, b)
filterSvcURL, err := r.reconcileKind(ctx, b)

if b.Status.IsReady() {
// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon, so reconcile them.
te := r.reconcileTriggers(ctx, b, filterSvc)
te := r.reconcileTriggers(ctx, b, filterSvcURL)
if te != nil {
logging.FromContext(ctx).Error("Problem reconciling triggers", zap.Error(te))
return fmt.Errorf("failed to reconcile triggers: %v", te)
Expand All @@ -131,7 +128,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1alpha1.Broker) pkgr
return err
}

func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, pkgreconciler.Event) {
func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*apis.URL, pkgreconciler.Event) {
logging.FromContext(ctx).Debug("Reconciling", zap.Any("Broker", b))
b.Status.InitializeConditions()
b.Status.ObservedGeneration = b.Generation
Expand Down Expand Up @@ -183,42 +180,34 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1alpha1.Broker) (*co
b.Status.TriggerChannel = &chanMan.ref
b.Status.PropagateTriggerChannelReadiness(&triggerChan.Status)

filterDeployment, err := r.reconcileFilterDeployment(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
b.Status.MarkFilterFailed("DeploymentFailure", "%v", err)
return nil, err
}
filterSvc, err := r.reconcileFilterService(ctx, b)
filterStatus, err := r.reconcileFilterService(ctx, b)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return nil, err
}
b.Status.PropagateFilterDeploymentAvailability(filterDeployment)

ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err))
b.Status.MarkIngressFailed("DeploymentFailure", "%v", err)
return nil, err
if filterStatus.IsReady {
b.Status.MarkFilterReady()
} else {
b.Status.MarkFilterFailed(filterStatus.Reason, filterStatus.Message)
}

svc, err := r.reconcileIngressService(ctx, b)
ingressStatus, err := r.reconcileIngressService(ctx, b, triggerChan)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
return nil, err
}
b.Status.PropagateIngressDeploymentAvailability(ingressDeployment)
b.Status.SetAddress(&apis.URL{
Scheme: "http",
Host: names.ServiceHostName(svc.Name, svc.Namespace),
})
if ingressStatus.IsReady {
b.Status.MarkIngressReady()
} else {
b.Status.MarkIngressFailed(ingressStatus.Reason, ingressStatus.Message)
}
b.Status.SetAddress(ingressStatus.URL)

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return filterSvc, nil
return filterStatus.URL, nil
}

type channelTemplate struct {
Expand Down Expand Up @@ -298,20 +287,14 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, b *v1alpha1.Broker) pkgre
return newReconciledNormal(b.Namespace, b.Name)
}

// reconcileFilterDeployment reconciles Broker's 'b' filter deployment.
func (r *Reconciler) reconcileFilterDeployment(ctx context.Context, b *v1alpha1.Broker) (*v1.Deployment, error) {
expected := resources.MakeFilterDeployment(&resources.FilterArgs{
// reconcileFilterService reconciles Broker's 'b' filter service.
func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*service.Status, error) {
svcArgs := resources.MakeFilterServiceArgs(&resources.FilterArgs{
Broker: b,
Image: r.filterImage,
ServiceAccountName: r.filterServiceAccountName,
})
return r.reconcileDeployment(ctx, expected)
}

// reconcileFilterService reconciles Broker's 'b' filter service.
func (r *Reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) {
expected := resources.MakeFilterService(b)
return r.reconcileService(ctx, expected)
return r.serviceReconciler.Reconcile(ctx, *kmeta.NewControllerRef(b), *svcArgs)
}

// reconcileChannel reconciles Broker's 'b' underlying channel.
Expand Down Expand Up @@ -362,78 +345,19 @@ func TriggerChannelLabels(brokerName string) map[string]string {
}
}

// reconcileDeployment reconciles the K8s Deployment 'd'.
func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) (*v1.Deployment, error) {
current, err := r.deploymentLister.Deployments(d.Namespace).Get(d.Name)
if apierrs.IsNotFound(err) {
current, err = r.KubeClientSet.AppsV1().Deployments(d.Namespace).Create(d)
if err != nil {
return nil, err
}
return current, nil
} else if err != nil {
return nil, err
}

if !equality.Semantic.DeepDerivative(d.Spec, current.Spec) {
// Don't modify the informers copy.
desired := current.DeepCopy()
desired.Spec = d.Spec
current, err = r.KubeClientSet.AppsV1().Deployments(current.Namespace).Update(desired)
if err != nil {
return nil, err
}
}
return current, nil
}

// reconcileService reconciles the K8s Service 'svc'.
func (r *Reconciler) reconcileService(ctx context.Context, svc *corev1.Service) (*corev1.Service, error) {
current, err := r.serviceLister.Services(svc.Namespace).Get(svc.Name)
if apierrs.IsNotFound(err) {
current, err = r.KubeClientSet.CoreV1().Services(svc.Namespace).Create(svc)
if err != nil {
return nil, err
}
return current, nil
} else if err != nil {
return nil, err
}

// spec.clusterIP is immutable and is set on existing services. If we don't set this to the same value, we will
// encounter an error while updating.
svc.Spec.ClusterIP = current.Spec.ClusterIP
if !equality.Semantic.DeepDerivative(svc.Spec, current.Spec) {
// Don't modify the informers copy.
desired := current.DeepCopy()
desired.Spec = svc.Spec
current, err = r.KubeClientSet.CoreV1().Services(current.Namespace).Update(desired)
if err != nil {
return nil, err
}
}
return current, nil
}

// reconcileIngressDeploymentCRD reconciles the Ingress Deployment for a CRD backed channel.
func (r *Reconciler) reconcileIngressDeployment(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) (*v1.Deployment, error) {
expected := resources.MakeIngressDeployment(&resources.IngressArgs{
func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker, c *duckv1alpha1.Channelable) (*service.Status, error) {
svcArgs := resources.MakeIngressServiceArgs(&resources.IngressArgs{
Broker: b,
Image: r.ingressImage,
ServiceAccountName: r.ingressServiceAccountName,
ChannelAddress: c.Status.Address.GetURL().Host,
})
return r.reconcileDeployment(ctx, expected)
}

// reconcileIngressService reconciles the Ingress Service.
func (r *Reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Broker) (*corev1.Service, error) {
expected := resources.MakeIngressService(b)
return r.reconcileService(ctx, expected)
return r.serviceReconciler.Reconcile(ctx, *kmeta.NewControllerRef(b), *svcArgs)
}

// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvc *corev1.Service) error {
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker, filterSvcURL *apis.URL) error {

// TODO: Figure out the labels stuff... If webhook does it, we can filter like this...
// Find all the Triggers that have been labeled as belonging to me
Expand All @@ -447,7 +371,7 @@ func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1alpha1.Broker,
for _, t := range triggers {
if t.Spec.Broker == b.Name {
trigger := t.DeepCopy()
tErr := r.reconcileTrigger(ctx, b, trigger, filterSvc)
tErr := r.reconcileTrigger(ctx, b, trigger, filterSvcURL)
if tErr != nil {
r.Logger.Error("Reconciling trigger failed:", zap.String("name", t.Name), zap.Error(err))
r.Recorder.Eventf(trigger, corev1.EventTypeWarning, triggerReconcileFailed, "Trigger reconcile failed: %v", tErr)
Expand Down
Loading