Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/reconciler/mtbroker/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package mttrigger
import (
"context"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/client/injection/ducks/duck/v1/conditions"
Expand All @@ -33,6 +37,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
)

Expand Down Expand Up @@ -66,6 +71,28 @@ func NewController(

triggerInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also filter the trigger changes based on which broker it points to.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3wscott can you explain more what you mean? doesn't the label selector filter by the broker it points to? Or am I misunderstanding.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broker impl only needs triggers for brokers. We filter broker changes for triggers but we should do the other side too?


// Filter Brokers and enqueue associated Triggers
brokerFilter := pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, eventing.MTChannelBrokerClassValue, false /*allowUnset*/)
brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: brokerFilter,
Handler: controller.HandleAll(func(obj interface{}) {

if broker, ok := obj.(*eventingv1.Broker); ok {

selector := labels.SelectorFromSet(map[string]string{eventing.BrokerLabelKey: broker.Name})
triggers, err := triggerInformer.Lister().Triggers(broker.Namespace).List(selector)
if err != nil {
logger.Warn("Failed to list triggers", zap.Any("broker", broker), zap.Error(err))
return
}

for _, trigger := range triggers {
impl.Enqueue(trigger)
}
}
}),
})

// Reconcile Trigger when my Subscription changes
subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Trigger")),
Expand Down
18 changes: 0 additions & 18 deletions pkg/reconciler/mtbroker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p
// Everything is cleaned up by the garbage collector.
return nil
}
// Start tracking the broker
r.trackBroker(ctx, t)

b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker)
if err != nil {
Expand Down Expand Up @@ -152,22 +150,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p
return nil
}

func (r *Reconciler) trackBroker(ctx context.Context, t *eventingv1.Trigger) error {
trackKResource := r.kresourceTracker.TrackInNamespace(t)
brokerObjRef := corev1.ObjectReference{
Kind: brokerGVK.Kind,
APIVersion: brokerGVK.GroupVersion().String(),
Name: t.Spec.Broker,
Namespace: t.Namespace,
}

if err := trackKResource(brokerObjRef); err != nil {
return fmt.Errorf("Failed to track broker %q : %s", t.Spec.Broker, err)
}
logging.FromContext(ctx).Infow("Tracking:", zap.Any("Broker", brokerObjRef))
return nil
}

// subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels.
func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference) (*messagingv1.Subscription, error) {
recorder := controller.GetEventRecorder(ctx)
Expand Down