diff --git a/cmd/mtchannel_broker/main.go b/cmd/mtchannel_broker/main.go index 0dc534bb1c0..f7a6232cef7 100644 --- a/cmd/mtchannel_broker/main.go +++ b/cmd/mtchannel_broker/main.go @@ -26,6 +26,7 @@ import ( "knative.dev/pkg/signals" "knative.dev/eventing/pkg/reconciler/mtbroker" + mttrigger "knative.dev/eventing/pkg/reconciler/mtbroker/trigger" ) func main() { @@ -37,5 +38,5 @@ func main() { cfg := sharedmain.ParseAndGetConfigOrDie() cfg.QPS = float32(*clientQPS) cfg.Burst = *clientBurst - sharedmain.MainWithConfig(signals.NewContext(), "mt-broker-controller", cfg, mtbroker.NewController) + sharedmain.MainWithConfig(signals.NewContext(), "mt-broker-controller", cfg, mtbroker.NewController, mttrigger.NewController) } diff --git a/pkg/apis/eventing/register.go b/pkg/apis/eventing/register.go index ead3498bd28..29ffcc907b0 100644 --- a/pkg/apis/eventing/register.go +++ b/pkg/apis/eventing/register.go @@ -59,6 +59,21 @@ const ( // BrokerChannelAddressStatusAnnotationKey is the broker status // annotation key used to specify the address of its channel. BrokerChannelAddressStatusAnnotationKey = "knative.dev/channelAddress" + + // BrokerChannelAPIVersionStatusAnnotationKey is the broker status + // annotation key used to specify the APIVersion of the channel for + // the triggers to subscribe to. + BrokerChannelAPIVersionStatusAnnotationKey = "knative.dev/channelAPIVersion" + + // BrokerChannelKindStatusAnnotationKey is the broker status + // annotation key used to specify the Kind of the channel for + // the triggers to subscribe to. + BrokerChannelKindStatusAnnotationKey = "knative.dev/channelKind" + + // BrokerChannelNameStatusAnnotationKey is the broker status + // annotation key used to specify the name of the channel for + // the triggers to subscribe to. + BrokerChannelNameStatusAnnotationKey = "knative.dev/channelName" ) var ( diff --git a/pkg/reconciler/mtbroker/broker.go b/pkg/reconciler/mtbroker/broker.go index fe56094a20a..1538cf136eb 100644 --- a/pkg/reconciler/mtbroker/broker.go +++ b/pkg/reconciler/mtbroker/broker.go @@ -20,8 +20,6 @@ import ( "context" "errors" "fmt" - "reflect" - "time" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -29,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/dynamic" corev1listers "k8s.io/client-go/listers/core/v1" @@ -47,7 +44,6 @@ import ( "knative.dev/pkg/apis" duckapis "knative.dev/pkg/apis/duck" pkgduckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/controller" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" @@ -79,7 +75,6 @@ type Reconciler struct { // Check that our Reconciler implements Interface var _ brokerreconciler.Interface = (*Reconciler)(nil) -var _ brokerreconciler.Finalizer = (*Reconciler)(nil) var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker") @@ -92,29 +87,6 @@ type ReconcilerArgs struct { } func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event { - triggerChan, err := r.reconcileKind(ctx, b) - if err != nil { - logging.FromContext(ctx).Errorw("Problem reconciling broker", zap.Error(err)) - } - - if b.Status.IsReady() { - // So, at this point the Broker is ready and everything should be solid - // for the triggers to act upon, so reconcile them. - te := r.reconcileTriggers(ctx, b, triggerChan) - if te != nil { - logging.FromContext(ctx).Errorw("Problem reconciling triggers", zap.Error(te)) - return fmt.Errorf("failed to reconcile triggers: %v", te) - } - } else { - // Broker is not ready, but propagate it's status to my triggers. - if te := r.propagateBrokerStatusToTriggers(ctx, b.Namespace, b.Name, &b.Status); te != nil { - return fmt.Errorf("Trigger reconcile failed: %v", te) - } - } - return err -} - -func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (*corev1.ObjectReference, pkgreconciler.Event) { logging.FromContext(ctx).Infow("Reconciling", zap.Any("Broker", b)) // 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel. @@ -122,34 +94,34 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (* chanMan, err := r.getChannelTemplate(ctx, b) if err != nil { b.Status.MarkTriggerChannelFailed("ChannelTemplateFailed", "Error on setting up the ChannelTemplate: %s", err) - return nil, err + return err } logging.FromContext(ctx).Infow("Reconciling the trigger channel") c, err := resources.NewChannel("trigger", b, &chanMan.template, TriggerChannelLabels(b.Name)) if err != nil { logging.FromContext(ctx).Errorw(fmt.Sprintf("Failed to create Trigger Channel object: %s/%s", chanMan.ref.Namespace, chanMan.ref.Name), zap.Error(err)) - return nil, err + return err } triggerChan, err := r.reconcileChannel(ctx, chanMan.inf, chanMan.ref, c, b) if err != nil { logging.FromContext(ctx).Errorw("Problem reconciling the trigger channel", zap.Error(err)) b.Status.MarkTriggerChannelFailed("ChannelFailure", "%v", err) - return nil, fmt.Errorf("Failed to reconcile trigger channel: %v", err) + return fmt.Errorf("Failed to reconcile trigger channel: %v", err) } if triggerChan.Status.Address == nil { logging.FromContext(ctx).Debugw("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan)) b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.") // Ok to return nil for error here, once channel address becomes available, this will get requeued. - return &chanMan.ref, nil + return nil } if url := triggerChan.Status.Address.URL; url == nil || url.Host == "" { logging.FromContext(ctx).Debugw("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan)) b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.") // Ok to return nil for error here, once channel address becomes available, this will get requeued. - return &chanMan.ref, nil + return nil } // Attach the channel address as a status annotation. @@ -157,6 +129,9 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (* b.Status.Annotations = make(map[string]string, 1) } b.Status.Annotations[eventing.BrokerChannelAddressStatusAnnotationKey] = triggerChan.Status.Address.URL.String() + b.Status.Annotations[eventing.BrokerChannelKindStatusAnnotationKey] = chanMan.ref.Kind + b.Status.Annotations[eventing.BrokerChannelAPIVersionStatusAnnotationKey] = chanMan.ref.APIVersion + b.Status.Annotations[eventing.BrokerChannelNameStatusAnnotationKey] = chanMan.ref.Name channelStatus := &duckv1.ChannelableStatus{AddressStatus: pkgduckv1.AddressStatus{Address: &pkgduckv1.Addressable{URL: triggerChan.Status.Address.URL}}} b.Status.PropagateTriggerChannelReadiness(channelStatus) @@ -165,7 +140,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (* if err != nil { logging.FromContext(ctx).Errorw("Problem getting endpoints for filter", zap.String("namespace", system.Namespace()), zap.Error(err)) b.Status.MarkFilterFailed("ServiceFailure", "%v", err) - return nil, err + return err } b.Status.PropagateFilterAvailability(filterEndpoints) @@ -173,7 +148,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (* if err != nil { logging.FromContext(ctx).Errorw("Problem getting endpoints for ingress", zap.String("namespace", system.Namespace()), zap.Error(err)) b.Status.MarkIngressFailed("ServiceFailure", "%v", err) - return nil, err + return err } b.Status.PropagateIngressAvailability(ingressEndpoints) @@ -187,7 +162,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (* // So, at this point the Broker is ready and everything should be solid // for the triggers to act upon. - return &chanMan.ref, nil + return nil } type channelTemplate struct { @@ -255,13 +230,6 @@ func (r *Reconciler) getChannelTemplate(ctx context.Context, b *eventingv1.Broke }, nil } -func (r *Reconciler) FinalizeKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event { - if err := r.propagateBrokerStatusToTriggers(ctx, b.Namespace, b.Name, nil); err != nil { - return fmt.Errorf("Trigger reconcile failed: %v", err) - } - return nil -} - // reconcileChannel reconciles Broker's 'b' underlying channel. func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *eventingv1.Broker) (*duckv1.Channelable, error) { lister, err := r.channelableTracker.ListerFor(channelObjRef) @@ -309,75 +277,3 @@ func TriggerChannelLabels(brokerName string) map[string]string { "eventing.knative.dev/brokerEverything": "true", } } - -// reconcileTriggers reconciles the Triggers that are pointed to this broker -func (r *Reconciler) reconcileTriggers(ctx context.Context, b *eventingv1.Broker, triggerChan *corev1.ObjectReference) error { - recorder := controller.GetEventRecorder(ctx) - triggers, err := r.triggerLister.Triggers(b.Namespace).List(labels.Everything()) - if err != nil { - return err - } - for _, t := range triggers { - if t.Spec.Broker == b.Name { - trigger := t.DeepCopy() - tErr := r.reconcileTrigger(ctx, b, trigger, triggerChan) - if tErr != nil { - logging.FromContext(ctx).Errorw("Reconciling trigger failed:", zap.String("name", t.Name), zap.Error(err)) - recorder.Eventf(trigger, corev1.EventTypeWarning, triggerReconcileFailed, "Trigger reconcile failed: %v", tErr) - } - trigger.Status.ObservedGeneration = t.Generation - if _, updateStatusErr := r.updateTriggerStatus(ctx, trigger); updateStatusErr != nil { - logging.FromContext(ctx).Errorw("Failed to update Trigger status", zap.Error(updateStatusErr)) - recorder.Eventf(trigger, corev1.EventTypeWarning, triggerUpdateStatusFailed, "Failed to update Trigger's status: %v", updateStatusErr) - } - } - } - return nil -} - -func (r *Reconciler) propagateBrokerStatusToTriggers(ctx context.Context, namespace, name string, bs *eventingv1.BrokerStatus) error { - recorder := controller.GetEventRecorder(ctx) - triggers, err := r.triggerLister.Triggers(namespace).List(labels.Everything()) - if err != nil { - return err - } - for _, t := range triggers { - if t.Spec.Broker == name { - // Don't modify informers copy - trigger := t.DeepCopy() - trigger.Status.InitializeConditions() - if bs == nil { - trigger.Status.MarkBrokerFailed("BrokerDoesNotExist", "Broker %q does not exist", name) - } else { - trigger.Status.PropagateBrokerCondition(bs.GetTopLevelCondition()) - } - if _, updateStatusErr := r.updateTriggerStatus(ctx, trigger); updateStatusErr != nil { - logging.FromContext(ctx).Errorw("Failed to update Trigger status", zap.Error(updateStatusErr)) - recorder.Eventf(trigger, corev1.EventTypeWarning, triggerUpdateStatusFailed, "Failed to update Trigger's status: %v", updateStatusErr) - return updateStatusErr - } - } - } - return nil -} - -// lifted from pkg/reconciler/reconcile_common for now... -// groomConditionsTransitionTime ensures that the LastTransitionTime only advances for resources -// where the condition has changed during reconciliation. This also ensures that all advanced -// conditions share the same timestamp. -func groomConditionsTransitionTime(resource, oldResource pkgduckv1.KRShaped) { - now := apis.VolatileTime{Inner: metav1.NewTime(time.Now())} - sts := resource.GetStatus() - for i := range sts.Conditions { - cond := &sts.Conditions[i] - - if oldCond := oldResource.GetStatus().GetCondition(cond.Type); oldCond != nil { - cond.LastTransitionTime = oldCond.LastTransitionTime - if reflect.DeepEqual(cond, oldCond) { - continue - } - } - - cond.LastTransitionTime = now - } -} diff --git a/pkg/reconciler/mtbroker/broker_test.go b/pkg/reconciler/mtbroker/broker_test.go index 2d986663473..cc81b5a6790 100644 --- a/pkg/reconciler/mtbroker/broker_test.go +++ b/pkg/reconciler/mtbroker/broker_test.go @@ -71,6 +71,10 @@ const ( triggerNameLong = "test-trigger-name-is-a-long-name" triggerUIDLong = "cafed00d-cafed00d-cafed00d-cafed00d-cafed00d" + triggerChannelAPIVersion = "messaging.knative.dev/v1" + triggerChannelKind = "InMemoryChannel" + triggerChannelName = "test-broker-kne-trigger" + subscriberURI = "http://example.com/subscriber/" subscriberKind = "Service" subscriberName = "subscriber-name" @@ -89,8 +93,6 @@ const ( currentGeneration = 1 outdatedGeneration = 0 - finalizerName = "brokers.eventing.knative.dev" - imcSpec = ` apiVersion: "messaging.knative.dev/v1" kind: "InMemoryChannel" @@ -126,10 +128,8 @@ var ( APIVersion: "eventing.knative.dev/v1", }, } - sinkDNS = "sink.mynamespace.svc." + utils.GetClusterDomainName() - sinkURI = "http://" + sinkDNS - finalizerUpdatedEvent = Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "test-broker" finalizers`) - + sinkDNS = "sink.mynamespace.svc." + utils.GetClusterDomainName() + sinkURI = "http://" + sinkDNS brokerAddress = &apis.URL{ Scheme: "http", Host: fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, systemNS, utils.GetClusterDomainName()), @@ -176,12 +176,8 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), }, WantEvents: []string{ - finalizerUpdatedEvent, Eventf(corev1.EventTypeWarning, "InternalError", "failed to find channelTemplate"), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(eventing.MTChannelBrokerClassValue), @@ -200,12 +196,8 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), }, WantEvents: []string{ - finalizerUpdatedEvent, Eventf(corev1.EventTypeWarning, "UpdateFailed", `Failed to update status for "test-broker": missing field(s): spec.config.name`), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(eventing.MTChannelBrokerClassValue), @@ -232,12 +224,8 @@ func TestReconcile(t *testing.T) { WithTriggerChannelFailed("ChannelTemplateFailed", `Error on setting up the ChannelTemplate: configmap "test-configmap" not found`)), }}, WantEvents: []string{ - finalizerUpdatedEvent, Eventf(corev1.EventTypeWarning, "InternalError", `configmap "test-configmap" not found`), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, WantErr: true, }, { Name: "Trigger Channel.Create error", @@ -263,12 +251,8 @@ func TestReconcile(t *testing.T) { InduceFailure("create", "inmemorychannels"), }, WantEvents: []string{ - finalizerUpdatedEvent, Eventf(corev1.EventTypeWarning, "InternalError", "Failed to reconcile trigger channel: %v", "inducing failure for create inmemorychannels"), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, WantErr: true, }, { Name: "Trigger Channel.Create no address", @@ -290,12 +274,6 @@ func TestReconcile(t *testing.T) { WithBrokerConfig(config()), WithTriggerChannelFailed("NoAddress", "Channel does not have an address.")), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, - WantEvents: []string{ - finalizerUpdatedEvent, - }, }, { Name: "Trigger Channel.Create no host in the url", Key: testKey, @@ -314,12 +292,6 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithTriggerChannelFailed("NoAddress", "Channel does not have an address.")), }}, - WantEvents: []string{ - finalizerUpdatedEvent, - }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, }, { Name: "nil config, not a configmap", Key: testKey, @@ -330,12 +302,8 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), }, WantEvents: []string{ - finalizerUpdatedEvent, Eventf(corev1.EventTypeWarning, "InternalError", `Broker.Spec.Config configuration not supported, only [kind: ConfigMap, apiVersion: v1]`), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, WithBrokerClass(eventing.MTChannelBrokerClassValue), @@ -363,12 +331,6 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithTriggerChannelFailed("NoAddress", "Channel does not have an address.")), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, - WantEvents: []string{ - finalizerUpdatedEvent, - }, }, { Name: "Trigger Channel endpoints fails", Key: testKey, @@ -387,13 +349,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithTriggerChannelReady(), WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), WithFilterFailed("ServiceFailure", `endpoints "broker-filter" not found`)), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, WantEvents: []string{ - finalizerUpdatedEvent, Eventf(corev1.EventTypeWarning, "InternalError", `endpoints "broker-filter" not found`), }, WantErr: true, @@ -420,14 +381,11 @@ func TestReconcile(t *testing.T) { WithBrokerConfig(config()), WithBrokerReady, WithBrokerAddressURI(brokerAddress), - WithChannelAddressAnnotation(triggerChannelURL)), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)), }}, - WantEvents: []string{ - finalizerUpdatedEvent, - }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, }, { Name: "Successful Reconciliation, status update fails", Key: testKey, @@ -454,669 +412,15 @@ func TestReconcile(t *testing.T) { WithBrokerConfig(config()), WithBrokerReady, WithBrokerAddressURI(brokerAddress), - WithChannelAddressAnnotation(triggerChannelURL)), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)), }}, WantEvents: []string{ - finalizerUpdatedEvent, Eventf(corev1.EventTypeWarning, "UpdateFailed", `Failed to update status for "test-broker": inducing failure for update brokers`), }, WantErr: true, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, - }, { - Name: "Successful Reconciliation, with single trigger", - Key: testKey, - Objects: []runtime.Object{ - NewBroker(brokerName, testNS, - WithBrokerClass(eventing.MTChannelBrokerClassValue), - WithBrokerConfig(config()), - WithInitBrokerConditions), - createChannel(testNS, true), - imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI)), - }, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithTriggerBrokerReady(), - WithTriggerDependencyReady(), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), - WithTriggerStatusSubscriberURI(subscriberURI)), - }, { - Object: NewBroker(brokerName, testNS, - WithBrokerClass(eventing.MTChannelBrokerClassValue), - WithBrokerConfig(config()), - WithBrokerReady, - WithBrokerAddressURI(brokerAddress), - WithChannelAddressAnnotation(triggerChannelURL)), - }}, - WantEvents: []string{ - finalizerUpdatedEvent, - }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, - }, { - Name: "Fail Reconciliation, with single trigger, trigger status updated", - Key: testKey, - Objects: []runtime.Object{ - NewBroker(brokerName, testNS, - WithBrokerClass(eventing.MTChannelBrokerClassValue), - WithInitBrokerConditions), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerDependencyReady(), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), - WithTriggerStatusSubscriberURI(subscriberURI)), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithTriggerDependencyReady(), - WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), - WithTriggerBrokerFailed("ChannelTemplateFailed", "Error on setting up the ChannelTemplate: failed to find channelTemplate"), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerStatusSubscriberURI(subscriberURI)), - }, { - Object: NewBroker(brokerName, testNS, - WithBrokerClass(eventing.MTChannelBrokerClassValue), - WithInitBrokerConditions, - WithTriggerChannelFailed("ChannelTemplateFailed", "Error on setting up the ChannelTemplate: failed to find channelTemplate")), - }}, - WantEvents: []string{ - finalizerUpdatedEvent, - Eventf(corev1.EventTypeWarning, "InternalError", "failed to find channelTemplate"), - }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, brokerName), - }, - WantErr: true, - }, { - Name: "Broker being deleted, marks trigger as not ready due to broker missing", - Key: testKey, - Objects: []runtime.Object{ - NewBroker(brokerName, testNS, - WithBrokerClass(eventing.MTChannelBrokerClassValue), - WithBrokerConfig(config()), - WithInitBrokerConditions, - WithBrokerFinalizers("brokers.eventing.knative.dev"), - WithBrokerResourceVersion(""), - WithBrokerDeletionTimestamp), - imcConfigMap(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI)), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithTriggerBrokerFailed("BrokerDoesNotExist", `Broker "test-broker" does not exist`)), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchRemoveFinalizers(testNS, brokerName), - }, - WantEvents: []string{ - finalizerUpdatedEvent, - }, - }, { - Name: "Broker being deleted, marks trigger as not ready due to broker missing, fails", - Key: testKey, - Objects: []runtime.Object{ - NewBroker(brokerName, testNS, - WithBrokerClass(eventing.MTChannelBrokerClassValue), - WithBrokerConfig(config()), - WithInitBrokerConditions, - WithBrokerFinalizers("brokers.eventing.knative.dev"), - WithBrokerResourceVersion(""), - WithBrokerDeletionTimestamp), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI)), - imcConfigMap(), - }, - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("update", "triggers"), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithTriggerBrokerFailed("BrokerDoesNotExist", `Broker "test-broker" does not exist`)), - }}, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "TriggerUpdateStatusFailed", `Failed to update Trigger's status: inducing failure for update triggers`), - Eventf(corev1.EventTypeWarning, "InternalError", "Trigger reconcile failed: inducing failure for update triggers"), - }, - WantErr: true, - }, { - Name: "Trigger being deleted", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerDeleted, - WithTriggerSubscriberURI(subscriberURI))}...), - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerDeleted, - WithInitTriggerConditions, - WithTriggerSubscriberURI(subscriberURI)), - }}, - }, { - Name: "Trigger subscription create fails", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI))}...), - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("create", "subscriptions"), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerNotSubscribed("NotSubscribed", "inducing failure for create subscriptions")), - }}, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionCreateFailed", "Create Trigger's subscription failed: inducing failure for create subscriptions"), - Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconcile failed: inducing failure for create subscriptions"), - }, - }, { - Name: "Trigger subscription create fails, update status fails", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI))}...), - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("create", "subscriptions"), - InduceFailure("update", "triggers"), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerNotSubscribed("NotSubscribed", "inducing failure for create subscriptions")), - }}, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionCreateFailed", "Create Trigger's subscription failed: inducing failure for create subscriptions"), - Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconcile failed: inducing failure for create subscriptions"), - Eventf(corev1.EventTypeWarning, "TriggerUpdateStatusFailed", "Failed to update Trigger's status: inducing failure for update triggers"), - }, - }, { - Name: "Trigger subscription delete fails", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI)), - makeDifferentReadySubscription()}...), - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("delete", "subscriptions"), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerNotSubscribed("NotSubscribed", "inducing failure for delete subscriptions"))}, - }, - WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: subscriptionName, - }}, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Delete Trigger's subscription failed: inducing failure for delete subscriptions"), - Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconcile failed: inducing failure for delete subscriptions"), - }, - }, { - Name: "Trigger subscription create after delete fails", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI)), - makeDifferentReadySubscription()}...), - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("create", "subscriptions"), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerNotSubscribed("NotSubscribed", "inducing failure for create subscriptions")), - }}, - WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: subscriptionName, - }}, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionCreateFailed", "Create Trigger's subscription failed: inducing failure for create subscriptions"), - Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconcile failed: inducing failure for create subscriptions"), - }, - }, { - Name: "Trigger subscription not owned by Trigger", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI)), - makeFilterSubscriptionNotOwnedByTrigger()}...), - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerSubscriberURI(subscriberURI), - WithTriggerUID(triggerUID), - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerNotSubscribed("NotSubscribed", `trigger "test-trigger" does not own subscription "test-broker-test-trigger-test-trigger-uid"`), - WithTriggerStatusSubscriberURI(subscriberURI)), - }}, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", `Trigger reconcile failed: trigger "test-trigger" does not own subscription "test-broker-test-trigger-test-trigger-uid"`), - }, - }, { - Name: "Trigger subscription update works", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI)), - makeDifferentReadySubscription()}...), - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithTriggerBrokerReady(), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerSubscriptionNotConfigured(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady()), - }}, - WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: subscriptionName, - }}, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - }, { - Name: "Trigger has subscriber ref exists", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeSubscriberAddressableAsUnstructured(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), - WithInitTriggerConditions)}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerSubscriptionNotConfigured(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady(), - ), - }}, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - }, { - Name: "Trigger has subscriber ref exists and URI", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeSubscriberAddressableAsUnstructured(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRefAndURIReference(subscriberGVK, subscriberName, testNS, subscriberURIReference), - WithInitTriggerConditions, - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRefAndURIReference(subscriberGVK, subscriberName, testNS, subscriberURIReference), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerSubscriptionNotConfigured(), - WithTriggerStatusSubscriberURI(subscriberResolvedTargetURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady(), - ), - }}, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - }, { - Name: "Trigger has subscriber ref exists kubernetes Service", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeSubscriberKubernetesServiceAsUnstructured(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRef(k8sServiceGVK, subscriberName, testNS), - WithInitTriggerConditions, - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRef(k8sServiceGVK, subscriberName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerSubscriptionNotConfigured(), - WithTriggerStatusSubscriberURI(k8sServiceResolvedURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady(), - ), - }}, - WantCreates: []runtime.Object{ - makeFilterSubscription(), - }, - }, { - Name: "Trigger has subscriber ref doesn't exist", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), - WithInitTriggerConditions, - )}...), - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", `Trigger reconcile failed: failed to get ref &ObjectReference{Kind:Service,Namespace:test-namespace,Name:subscriber-name,UID:,APIVersion:serving.knative.dev/v1,ResourceVersion:,FieldPath:,}: services.serving.knative.dev "subscriber-name" not found`), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerSubscriberResolvedFailed("Unable to get the Subscriber's URI", `failed to get ref &ObjectReference{Kind:Service,Namespace:test-namespace,Name:subscriber-name,UID:,APIVersion:serving.knative.dev/v1,ResourceVersion:,FieldPath:,}: services.serving.knative.dev "subscriber-name" not found`), - ), - }}, - }, { - Name: "Subscription not ready, trigger marked not ready", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeFalseStatusSubscription(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerNotSubscribed("testInducedError", "test induced error"), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady(), - ), - }}, - }, { - Name: "Subscription ready, trigger marked ready", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeReadySubscription(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithTriggerBrokerReady(), - WithTriggerSubscribed(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady(), - ), - }}, - }, { - Name: "Dependency doesn't exist", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeReadySubscription(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - )}...), - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "TriggerReconcileFailed", "Trigger reconcile failed: propagating dependency readiness: getting the dependency: pingsources.sources.knative.dev \"test-ping-source\" not found"), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - WithTriggerBrokerReady(), - WithTriggerSubscribed(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyFailed("DependencyDoesNotExist", "Dependency does not exist: pingsources.sources.knative.dev \"test-ping-source\" not found"), - ), - }}, - }, { - Name: "The status of Dependency is False", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeReadySubscription(), - makeFalseStatusPingSource(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - WithTriggerBrokerReady(), - WithTriggerSubscribed(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyFailed("NotFound", ""), - ), - }}, - }, { - Name: "The status of Dependency is Unknown", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeReadySubscription(), - makeUnknownStatusCronJobSource(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - WithTriggerBrokerReady(), - WithTriggerSubscribed(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyUnknown("", ""), - ), - }}, - }, - { - Name: "Dependency generation not equal", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeReadySubscription(), - makeGenerationNotEqualPingSource(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - WithTriggerBrokerReady(), - WithTriggerSubscribed(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyUnknown("GenerationNotEqual", fmt.Sprintf("The dependency's metadata.generation, %q, is not equal to its status.observedGeneration, %q.", currentGeneration, outdatedGeneration))), - }}, - }, - { - Name: "Dependency ready", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeReadySubscription(), - makeReadyPingSource(), - NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - )}...), - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerName, testNS, brokerName, - WithTriggerUID(triggerUID), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - WithTriggerBrokerReady(), - WithTriggerSubscribed(), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady(), - ), - }}, - }, - - { - Name: "Trigger has deprecated named subscriber", - Key: testKey, - Objects: allBrokerObjectsReadyPlus([]runtime.Object{ - makeReadySubscriptionDeprecatedName(triggerNameLong, triggerUIDLong), - makeReadyPingSource(), - NewTrigger(triggerNameLong, testNS, brokerName, - WithTriggerUID(triggerUIDLong), - WithTriggerSubscriberURI(subscriberURI), - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - )}...), - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, subscriptionDeleted, `Deprecated subscription removed: "%s/%s"`, testNS, makeReadySubscriptionDeprecatedName(triggerNameLong, triggerUIDLong).Name), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewTrigger(triggerNameLong, testNS, brokerName, - WithTriggerUID(triggerUIDLong), - WithTriggerSubscriberURI(subscriberURI), - // The first reconciliation will initialize the status conditions. - WithInitTriggerConditions, - WithDependencyAnnotation(dependencyAnnotation), - WithTriggerBrokerReady(), - WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), - WithTriggerStatusSubscriberURI(subscriberURI), - WithTriggerSubscriberResolvedSucceeded(), - WithTriggerDependencyReady(), - ), - }}, - WantCreates: []runtime.Object{ - makeReadySubscriptionWithCustomData(triggerNameLong, triggerUIDLong), - }, - WantDeletes: []clientgotesting.DeleteActionImpl{{ - Name: makeReadySubscriptionDeprecatedName(triggerNameLong, triggerUIDLong).Name, - }}, }, } @@ -1340,7 +644,10 @@ func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object { WithBrokerFinalizers("brokers.eventing.knative.dev"), WithBrokerResourceVersion(""), WithBrokerAddressURI(brokerAddress), - WithChannelAddressAnnotation(triggerChannelURL)), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)), createChannel(testNS, true), imcConfigMap(), NewEndpoints(filterServiceName, systemNS, @@ -1465,24 +772,6 @@ func makeSubscriberKubernetesServiceAsUnstructured() *unstructured.Unstructured } } -func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { - action := clientgotesting.PatchActionImpl{} - action.Name = name - action.Namespace = namespace - patch := `{"metadata":{"finalizers":["` + finalizerName + `"],"resourceVersion":""}}` - action.Patch = []byte(patch) - return action -} - -func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionImpl { - action := clientgotesting.PatchActionImpl{} - action.Name = name - action.Namespace = namespace - patch := `{"metadata":{"finalizers":[],"resourceVersion":""}}` - action.Patch = []byte(patch) - return action -} - // FilterLabels generates the labels present on all resources representing the filter of the given // Broker. func FilterLabels() map[string]string { diff --git a/pkg/reconciler/mtbroker/controller.go b/pkg/reconciler/mtbroker/controller.go index 518b27f9a88..dc57b259438 100644 --- a/pkg/reconciler/mtbroker/controller.go +++ b/pkg/reconciler/mtbroker/controller.go @@ -19,7 +19,6 @@ package mtbroker import ( "context" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" @@ -97,28 +96,6 @@ func NewController( Handler: controller.HandleAll(impl.Enqueue), }) - // Reconcile Broker (which transitively reconciles the triggers), when Subscriptions - // that I own are changed. - subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Broker")), - Handler: controller.HandleAll(impl.EnqueueControllerOf), - }) - - // Reconcile trigger (by enqueuing the broker specified in the label) when subscriptions - // of triggers change. - subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: pkgreconciler.LabelExistsFilterFunc(eventing.BrokerLabelKey), - Handler: controller.HandleAll(impl.EnqueueLabelOfNamespaceScopedResource("" /*any namespace*/, eventing.BrokerLabelKey)), - }) - - triggerInformer.Informer().AddEventHandler(controller.HandleAll( - func(obj interface{}) { - if trigger, ok := obj.(*eventingv1.Trigger); ok { - impl.EnqueueKey(types.NamespacedName{Namespace: trigger.Namespace, Name: trigger.Spec.Broker}) - } - }, - )) - // When the endpoints in our multi-tenant filter/ingress change, do a global resync. // During installation, we might reconcile Brokers before our shared filter/ingress is // ready, so when these endpoints change perform a global resync. diff --git a/pkg/reconciler/mtbroker/trigger/controller.go b/pkg/reconciler/mtbroker/trigger/controller.go new file mode 100644 index 00000000000..3ed2cfd8503 --- /dev/null +++ b/pkg/reconciler/mtbroker/trigger/controller.go @@ -0,0 +1,76 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mttrigger + +import ( + "context" + + "k8s.io/client-go/tools/cache" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" + triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" + subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" + triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" + "knative.dev/eventing/pkg/duck" + "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" + configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/injection/clients/dynamicclient" + "knative.dev/pkg/logging" + "knative.dev/pkg/resolver" +) + +// NewController initializes the controller and is called by the generated code +// Registers event handlers to enqueue events +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + logger := logging.FromContext(ctx) + triggerInformer := triggerinformer.Get(ctx) + brokerInformer := brokerinformer.Get(ctx) + subscriptionInformer := subscriptioninformer.Get(ctx) + configmapInformer := configmapinformer.Get(ctx) + + r := &Reconciler{ + eventingClientSet: eventingclient.Get(ctx), + dynamicClientSet: dynamicclient.Get(ctx), + subscriptionLister: subscriptionInformer.Lister(), + brokerLister: brokerInformer.Lister(), + triggerLister: triggerInformer.Lister(), + configmapLister: configmapInformer.Lister(), + } + impl := triggerreconciler.NewImpl(ctx, r) + r.impl = impl + + logger.Info("Setting up event handlers") + + r.kresourceTracker = duck.NewListableTracker(ctx, conditions.Get, impl.EnqueueKey, controller.GetTrackerLease(ctx)) + r.uriResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey) + + triggerInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + // Reconcile Trigger when my Subscription changes + subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Trigger")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + + return impl +} diff --git a/pkg/reconciler/mtbroker/trigger/controller_test.go b/pkg/reconciler/mtbroker/trigger/controller_test.go new file mode 100644 index 00000000000..c9d73a88582 --- /dev/null +++ b/pkg/reconciler/mtbroker/trigger/controller_test.go @@ -0,0 +1,41 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mttrigger + +import ( + "testing" + + "knative.dev/pkg/configmap" + . "knative.dev/pkg/reconciler/testing" + + // Fake injection informers + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" + _ "knative.dev/pkg/client/injection/ducks/duck/v1/conditions/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" +) + +func TestNew(t *testing.T) { + ctx, _ := SetupFakeContext(t) + + c := NewController(ctx, configmap.NewStaticWatcher()) + + if c == nil { + t.Fatal("Expected NewController to return a non-nil value") + } +} diff --git a/pkg/reconciler/mtbroker/trigger.go b/pkg/reconciler/mtbroker/trigger/trigger.go similarity index 71% rename from pkg/reconciler/mtbroker/trigger.go rename to pkg/reconciler/mtbroker/trigger/trigger.go index d35136f3fba..956a4f3d438 100644 --- a/pkg/reconciler/mtbroker/trigger.go +++ b/pkg/reconciler/mtbroker/trigger/trigger.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package mtbroker +package mttrigger import ( "context" @@ -26,9 +26,16 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + corev1listers "k8s.io/client-go/listers/core/v1" + "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + clientset "knative.dev/eventing/pkg/client/clientset/versioned" + eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" + "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler/mtbroker/resources" "knative.dev/eventing/pkg/reconciler/names" "knative.dev/eventing/pkg/reconciler/sugar/trigger/path" @@ -37,9 +44,13 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/controller" "knative.dev/pkg/logging" + pkgreconciler "knative.dev/pkg/reconciler" + "knative.dev/pkg/resolver" "knative.dev/pkg/system" ) +var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker") + const ( // Name of the corev1.Events emitted from the Trigger reconciliation process. triggerReconcileFailed = "TriggerReconcileFailed" @@ -50,21 +61,66 @@ const ( subscriptionDeleted = "SubscriptionDeleted" ) -func (r *Reconciler) reconcileTrigger(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference) error { +type Reconciler struct { + eventingClientSet clientset.Interface + dynamicClientSet dynamic.Interface + + // listers index properties about resources + subscriptionLister messaginglisters.SubscriptionLister + brokerLister eventinglisters.BrokerLister + triggerLister eventinglisters.TriggerLister + configmapLister corev1listers.ConfigMapLister + + // Dynamic tracker to track KResources. In particular, it tracks the dependency between Triggers and Sources. + kresourceTracker duck.ListableTracker + + // Dynamic tracker to track AddressableTypes. In particular, it tracks Trigger subscribers. + uriResolver *resolver.URIResolver + impl *controller.Impl +} + +func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) pkgreconciler.Event { + logging.FromContext(ctx).Infow("Reconciling", zap.Any("Trigger", t)) t.Status.InitializeConditions() if t.DeletionTimestamp != nil { // Everything is cleaned up by the garbage collector. return nil } + // Start tracking the broker + r.trackBroker(ctx, t) + + b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker) + if err != nil { + if apierrs.IsNotFound(err) { + logging.FromContext(ctx).Errorw(fmt.Sprintf("Trigger %s/%s has no broker %q", t.Namespace, t.Name, t.Spec.Broker)) + t.Status.MarkBrokerFailed("BrokerDoesNotExist", "Broker %q does not exist", t.Spec.Broker) + // Ok to return nil here. Once the Broker comes available, or Trigger changes, we get requeued. + return nil + } else { + t.Status.MarkBrokerFailed("FailedToGetBroker", "Failed to get broker %q : %s", t.Spec.Broker, err) + return err + } + } + // If it's not my brokerclass, ignore + if b.Annotations[eventing.BrokerClassKey] != eventing.MTChannelBrokerClassValue { + logging.FromContext(ctx).Infow("Ignoring trigger %s/%s", t.Namespace, t.Name) + return nil + } t.Status.PropagateBrokerCondition(b.Status.GetTopLevelCondition()) - if brokerTrigger == nil { - // Should not happen because Broker is ready to go if we get here - return errors.New("failed to find Broker's Trigger channel") + // If Broker is not ready, we're done, but once it becomes ready, we'll get requeued. + if !b.Status.IsReady() { + logging.FromContext(ctx).Errorw("Broker is not ready", zap.Any("Broker", b)) + return nil } + brokerTrigger, err := getBrokerChannelRef(ctx, b) + if err != nil { + t.Status.MarkBrokerFailed("MissingBrokerChannel", "Failed to get broker %q annotations: %s", t.Spec.Broker, err) + return fmt.Errorf("failed to find Broker's Trigger channel: %s", err) + } if t.Spec.Subscriber.Ref != nil { // To call URIFromDestination(dest apisv1alpha1.Destination, parent interface{}), dest.Ref must have a Namespace // We will use the Namespace of Trigger as the Namespace of dest.Ref @@ -96,6 +152,22 @@ func (r *Reconciler) reconcileTrigger(ctx context.Context, b *eventingv1.Broker, return nil } +func (r *Reconciler) trackBroker(ctx context.Context, t *eventingv1.Trigger) error { + trackKResource := r.kresourceTracker.TrackInNamespace(t) + brokerObjRef := corev1.ObjectReference{ + Kind: brokerGVK.Kind, + APIVersion: brokerGVK.GroupVersion().String(), + Name: t.Spec.Broker, + Namespace: t.Namespace, + } + + if err := trackKResource(brokerObjRef); err != nil { + return fmt.Errorf("Failed to track broker %q : %s", t.Spec.Broker, err) + } + logging.FromContext(ctx).Infow("Tracking:", zap.Any("Broker", brokerObjRef)) + return nil +} + // subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels. func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference) (*messagingv1.Subscription, error) { recorder := controller.GetEventRecorder(ctx) @@ -130,7 +202,6 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1 logging.FromContext(ctx).Info("Creating subscription") sub, err = r.eventingClientSet.MessagingV1().Subscriptions(t.Namespace).Create(expected) if err != nil { - recorder.Eventf(t, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Trigger's subscription failed: %v", err) return nil, err } return sub, nil @@ -176,26 +247,6 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, t *eventingv1.Tr return newSub, nil } -func (r *Reconciler) updateTriggerStatus(ctx context.Context, desired *eventingv1.Trigger) (*eventingv1.Trigger, error) { - trigger, err := r.triggerLister.Triggers(desired.Namespace).Get(desired.Name) - if err != nil { - return nil, err - } - - // Do the postprocessing for unnecessary trigger status changes. - groomConditionsTransitionTime(desired, trigger) - - if equality.Semantic.DeepEqual(trigger.Status, desired.Status) { - return trigger, nil - } - - // Don't modify the informers copy. - existing := trigger.DeepCopy() - existing.Status = desired.Status - - return r.eventingClientSet.EventingV1().Triggers(desired.Namespace).UpdateStatus(existing) -} - func (r *Reconciler) checkDependencyAnnotation(ctx context.Context, t *eventingv1.Trigger, b *eventingv1.Broker) error { if dependencyAnnotation, ok := t.GetAnnotations()[eventingv1.DependencyAnnotation]; ok { dependencyObjRef, err := eventingv1.GetObjRefFromDependencyAnnotation(dependencyAnnotation) @@ -246,3 +297,18 @@ func (r *Reconciler) propagateDependencyReadiness(ctx context.Context, t *eventi t.Status.PropagateDependencyStatus(dependency) return nil } + +func getBrokerChannelRef(ctx context.Context, b *eventingv1.Broker) (*corev1.ObjectReference, error) { + if b.Status.Annotations != nil { + ref := &corev1.ObjectReference{ + Kind: b.Status.Annotations[eventing.BrokerChannelKindStatusAnnotationKey], + APIVersion: b.Status.Annotations[eventing.BrokerChannelAPIVersionStatusAnnotationKey], + Name: b.Status.Annotations[eventing.BrokerChannelNameStatusAnnotationKey], + Namespace: b.Namespace, + } + if ref.Kind != "" && ref.APIVersion != "" && ref.Name != "" && ref.Namespace != "" { + return ref, nil + } + } + return nil, errors.New("Broker.Status.Annotations nil or missing values") +} diff --git a/pkg/reconciler/mtbroker/trigger/trigger_test.go b/pkg/reconciler/mtbroker/trigger/trigger_test.go new file mode 100644 index 00000000000..b319ba51fd2 --- /dev/null +++ b/pkg/reconciler/mtbroker/trigger/trigger_test.go @@ -0,0 +1,1022 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mttrigger + +import ( + "context" + "fmt" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + + clientgotesting "k8s.io/client-go/testing" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/eventing" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2" + fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" + "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" + "knative.dev/eventing/pkg/duck" + "knative.dev/eventing/pkg/reconciler/mtbroker/resources" + "knative.dev/eventing/pkg/utils" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + v1addr "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" + "knative.dev/pkg/client/injection/ducks/duck/v1/conditions" + v1a1addr "knative.dev/pkg/client/injection/ducks/duck/v1alpha1/addressable" + v1b1addr "knative.dev/pkg/client/injection/ducks/duck/v1beta1/addressable" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" + logtesting "knative.dev/pkg/logging/testing" + "knative.dev/pkg/resolver" + + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" + rtv1alpha1 "knative.dev/eventing/pkg/reconciler/testing" + . "knative.dev/eventing/pkg/reconciler/testing/v1" + _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" + . "knative.dev/pkg/reconciler/testing" +) + +const ( + systemNS = "knative-testing" + testNS = "test-namespace" + brokerName = "test-broker" + + configMapName = "test-configmap" + + triggerName = "test-trigger" + triggerUID = "test-trigger-uid" + triggerNameLong = "test-trigger-name-is-a-long-name" + triggerUIDLong = "cafed00d-cafed00d-cafed00d-cafed00d-cafed00d" + + triggerChannelAPIVersion = "messaging.knative.dev/v1" + triggerChannelKind = "InMemoryChannel" + triggerChannelName = "test-broker-kne-trigger" + + subscriberURI = "http://example.com/subscriber/" + subscriberKind = "Service" + subscriberName = "subscriber-name" + subscriberGroup = "serving.knative.dev" + subscriberVersion = "v1" + + pingSourceName = "test-ping-source" + testSchedule = "*/2 * * * *" + testData = "data" + sinkName = "testsink" + dependencyAnnotation = "{\"kind\":\"PingSource\",\"name\":\"test-ping-source\",\"apiVersion\":\"sources.knative.dev/v1alpha2\"}" + subscriberURIReference = "foo" + subscriberResolvedTargetURI = "http://example.com/subscriber/foo" + + k8sServiceResolvedURI = "http://subscriber-name.test-namespace.svc.cluster.local/" + currentGeneration = 1 + outdatedGeneration = 0 + + imcSpec = ` +apiVersion: "messaging.knative.dev/v1" +kind: "InMemoryChannel" +` +) + +var ( + testKey = fmt.Sprintf("%s/%s", testNS, triggerName) + + triggerChannelHostname = fmt.Sprintf("foo.bar.svc.%s", utils.GetClusterDomainName()) + triggerChannelURL = fmt.Sprintf("http://%s", triggerChannelHostname) + + filterServiceName = "broker-filter" + ingressServiceName = "broker-ingress" + + subscriptionName = fmt.Sprintf("%s-%s-%s", brokerName, triggerName, triggerUID) + + subscriberAPIVersion = fmt.Sprintf("%s/%s", subscriberGroup, subscriberVersion) + subscriberGVK = metav1.GroupVersionKind{ + Group: subscriberGroup, + Version: subscriberVersion, + Kind: subscriberKind, + } + k8sServiceGVK = metav1.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Service", + } + brokerDestv1 = duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: sinkName, + Kind: "Broker", + APIVersion: "eventing.knative.dev/v1", + }, + } + sinkDNS = "sink.mynamespace.svc." + utils.GetClusterDomainName() + sinkURI = "http://" + sinkDNS + + brokerAddress = &apis.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s.%s.svc.%s", ingressServiceName, systemNS, utils.GetClusterDomainName()), + Path: fmt.Sprintf("/%s/%s", testNS, brokerName), + } +) + +func init() { + // Add types to scheme + _ = eventingv1.AddToScheme(scheme.Scheme) + _ = duckv1.AddToScheme(scheme.Scheme) +} + +func TestReconcile(t *testing.T) { + table := TableTest{ + { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { + Name: "Trigger not found", + Key: testKey, + }, { + Name: "Trigger is being deleted", + Key: testKey, + Objects: []runtime.Object{NewTrigger(triggerName, testNS, brokerName, WithTriggerDeleted)}, + }, { + Name: "Broker does not exist", + Key: testKey, + Objects: []runtime.Object{ + NewTrigger(triggerName, testNS, brokerName, + WithInitTriggerConditions, + WithTriggerSubscriberURI(subscriberURI)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithTriggerBrokerFailed("BrokerDoesNotExist", `Broker "test-broker" does not exist`)), + }}, + }, { + Name: "Not my broker class - no status updates", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass("not-my-broker"), + WithBrokerConfig(config()), + WithInitBrokerConditions), + NewTrigger(triggerName, testNS, brokerName, + WithInitTriggerConditions, + WithTriggerSubscriberURI(subscriberURI)), + }, + }, { + Name: "Broker not reconciled yet", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config())), + NewTrigger(triggerName, testNS, brokerName, + WithInitTriggerConditions, + WithTriggerSubscriberURI(subscriberURI), + WithTriggerBrokerNotConfigured()), + }, + }, { + Name: "Broker not ready yet", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithFilterFailed("nofilter", "NoFilter")), + NewTrigger(triggerName, testNS, brokerName, + WithInitTriggerConditions, + WithTriggerSubscriberURI(subscriberURI), + WithTriggerBrokerFailed("nofilter", "NoFilter")), + }, + }, { + Name: "Creates subscription", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerReady, + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI)), + }, + WantCreates: []runtime.Object{ + makeFilterSubscription(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithTriggerBrokerReady(), + WithTriggerDependencyReady(), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), + WithTriggerStatusSubscriberURI(subscriberURI)), + }}, + }, { + Name: "Subscription Create fails", + Key: testKey, + Objects: []runtime.Object{ + ReadyBroker(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriberResolvedSucceeded()), + }, + WantCreates: []runtime.Object{ + makeFilterSubscription(), + }, + WithReactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "subscriptions"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberURI(subscriberURI), + WithTriggerNotSubscribed("NotSubscribed", "inducing failure for create subscriptions")), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "InternalError", "inducing failure for create subscriptions"), + }, + WantErr: true, + }, { + Name: "Trigger subscription create fails, update status fails", + Key: testKey, + Objects: []runtime.Object{ + ReadyBroker(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI)), + }, + WithReactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "subscriptions"), + InduceFailure("update", "triggers"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerNotSubscribed("NotSubscribed", "inducing failure for create subscriptions")), + }}, + WantCreates: []runtime.Object{ + makeFilterSubscription(), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "UpdateFailed", `Failed to update status for "test-trigger": inducing failure for update triggers`), + }, + WantErr: true, + }, { + Name: "Trigger subscription not owned by Trigger", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI)), + makeFilterSubscriptionNotOwnedByTrigger()}...), + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerSubscriberURI(subscriberURI), + WithTriggerUID(triggerUID), + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerNotSubscribed("NotSubscribed", `trigger "test-trigger" does not own subscription "test-broker-test-trigger-test-trigger-uid"`), + WithTriggerStatusSubscriberURI(subscriberURI)), + }}, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "InternalError", `trigger "test-trigger" does not own subscription "test-broker-test-trigger-test-trigger-uid"`), + }, + WantErr: true, + }, { + Name: "Trigger subscription update works", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI)), + makeDifferentReadySubscription()}...), + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithTriggerBrokerReady(), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriptionNotConfigured(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyReady()), + }}, + WantDeletes: []clientgotesting.DeleteActionImpl{{ + Name: subscriptionName, + }}, + WantCreates: []runtime.Object{ + makeFilterSubscription(), + }, + }, { + Name: "Trigger has subscriber ref exists", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeSubscriberAddressableAsUnstructured(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), + WithInitTriggerConditions)}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriptionNotConfigured(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyReady(), + ), + }}, + WantCreates: []runtime.Object{ + makeFilterSubscription(), + }, + }, { + Name: "Trigger has subscriber ref exists and URI", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeSubscriberAddressableAsUnstructured(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRefAndURIReference(subscriberGVK, subscriberName, testNS, subscriberURIReference), + WithInitTriggerConditions, + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRefAndURIReference(subscriberGVK, subscriberName, testNS, subscriberURIReference), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriptionNotConfigured(), + WithTriggerStatusSubscriberURI(subscriberResolvedTargetURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyReady(), + ), + }}, + WantCreates: []runtime.Object{ + makeFilterSubscription(), + }, + }, { + Name: "Trigger has subscriber ref exists kubernetes Service", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeSubscriberKubernetesServiceAsUnstructured(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRef(k8sServiceGVK, subscriberName, testNS), + WithInitTriggerConditions, + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRef(k8sServiceGVK, subscriberName, testNS), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriptionNotConfigured(), + WithTriggerStatusSubscriberURI(k8sServiceResolvedURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyReady(), + ), + }}, + WantCreates: []runtime.Object{ + makeFilterSubscription(), + }, + }, { + Name: "Trigger has subscriber ref doesn't exist", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), + WithInitTriggerConditions, + )}...), + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "InternalError", `failed to get ref &ObjectReference{Kind:Service,Namespace:test-namespace,Name:subscriber-name,UID:,APIVersion:serving.knative.dev/v1,ResourceVersion:,FieldPath:,}: services.serving.knative.dev "subscriber-name" not found`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberRef(subscriberGVK, subscriberName, testNS), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscriberResolvedFailed("Unable to get the Subscriber's URI", `failed to get ref &ObjectReference{Kind:Service,Namespace:test-namespace,Name:subscriber-name,UID:,APIVersion:serving.knative.dev/v1,ResourceVersion:,FieldPath:,}: services.serving.knative.dev "subscriber-name" not found`), + ), + }}, + WantErr: true, + }, { + Name: "Subscription not ready, trigger marked not ready", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeFalseStatusSubscription(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerNotSubscribed("testInducedError", "test induced error"), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyReady(), + ), + }}, + }, { + Name: "Subscription ready, trigger marked ready", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeReadySubscription(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithTriggerBrokerReady(), + WithTriggerSubscribed(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyReady(), + ), + }}, + }, { + Name: "Dependency doesn't exist", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeReadySubscription(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + )}...), + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "InternalError", "propagating dependency readiness: getting the dependency: pingsources.sources.knative.dev \"test-ping-source\" not found"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + WithTriggerBrokerReady(), + WithTriggerSubscribed(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyFailed("DependencyDoesNotExist", "Dependency does not exist: pingsources.sources.knative.dev \"test-ping-source\" not found"), + ), + }}, + WantErr: true, + }, { + Name: "The status of Dependency is False", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeReadySubscription(), + makeFalseStatusPingSource(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + WithTriggerBrokerReady(), + WithTriggerSubscribed(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyFailed("NotFound", ""), + ), + }}, + }, { + Name: "The status of Dependency is Unknown", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeReadySubscription(), + makeUnknownStatusCronJobSource(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + WithTriggerBrokerReady(), + WithTriggerSubscribed(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyUnknown("", ""), + ), + }}, + }, + { + Name: "Dependency generation not equal", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeReadySubscription(), + makeGenerationNotEqualPingSource(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + WithTriggerBrokerReady(), + WithTriggerSubscribed(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyUnknown("GenerationNotEqual", fmt.Sprintf("The dependency's metadata.generation, %q, is not equal to its status.observedGeneration, %q.", currentGeneration, outdatedGeneration))), + }}, + }, + { + Name: "Dependency ready", + Key: testKey, + Objects: allBrokerObjectsReadyPlus([]runtime.Object{ + makeReadySubscription(), + makeReadyPingSource(), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + )}...), + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + // The first reconciliation will initialize the status conditions. + WithInitTriggerConditions, + WithDependencyAnnotation(dependencyAnnotation), + WithTriggerBrokerReady(), + WithTriggerSubscribed(), + WithTriggerStatusSubscriberURI(subscriberURI), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerDependencyReady(), + ), + }}, + }, + } + + logger := logtesting.TestLogger(t) + table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { + ctx = channelable.WithDuck(ctx) + ctx = v1a1addr.WithDuck(ctx) + ctx = v1b1addr.WithDuck(ctx) + ctx = v1addr.WithDuck(ctx) + ctx = conditions.WithDuck(ctx) + r := &Reconciler{ + eventingClientSet: fakeeventingclient.Get(ctx), + dynamicClientSet: fakedynamicclient.Get(ctx), + subscriptionLister: listers.GetSubscriptionLister(), + triggerLister: listers.GetTriggerLister(), + + brokerLister: listers.GetBrokerLister(), + configmapLister: listers.GetConfigMapLister(), + kresourceTracker: duck.NewListableTracker(ctx, conditions.Get, func(types.NamespacedName) {}, 0), + uriResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), + } + return trigger.NewReconciler(ctx, logger, + fakeeventingclient.Get(ctx), listers.GetTriggerLister(), + controller.GetEventRecorder(ctx), + r) + + }, + false, + logger, + )) +} + +func config() *duckv1.KReference { + return &duckv1.KReference{ + Name: configMapName, + Namespace: testNS, + Kind: "ConfigMap", + APIVersion: "v1", + } +} + +func imcConfigMap() *corev1.ConfigMap { + return NewConfigMap(configMapName, testNS, + WithConfigMapData(map[string]string{"channelTemplateSpec": imcSpec})) +} + +func createChannel(namespace string, ready bool) *unstructured.Unstructured { + name := fmt.Sprintf("%s-kne-trigger", brokerName) + labels := map[string]interface{}{ + eventing.BrokerLabelKey: brokerName, + "eventing.knative.dev/brokerEverything": "true", + } + annotations := map[string]interface{}{ + "eventing.knative.dev/scope": "cluster", + } + if ready { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "kind": "InMemoryChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": namespace, + "name": name, + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "eventing.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Broker", + "name": brokerName, + "uid": "", + }, + }, + "labels": labels, + "annotations": annotations, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "url": triggerChannelURL, + }, + }, + }, + } + } + + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "kind": "InMemoryChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": namespace, + "name": name, + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "eventing.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Broker", + "name": brokerName, + "uid": "", + }, + }, + "labels": labels, + "annotations": annotations, + }, + }, + } +} + +func createChannelNoHostInUrl(namespace string) *unstructured.Unstructured { + name := fmt.Sprintf("%s-kne-trigger", brokerName) + labels := map[string]interface{}{ + eventing.BrokerLabelKey: brokerName, + "eventing.knative.dev/brokerEverything": "true", + } + annotations := map[string]interface{}{ + "eventing.knative.dev/scope": "cluster", + } + + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "kind": "InMemoryChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": namespace, + "name": name, + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "eventing.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Broker", + "name": brokerName, + "uid": "", + }, + }, + "labels": labels, + "annotations": annotations, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "url": "http://", + }, + }, + }, + } +} + +func createTriggerChannelRef() *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Namespace: testNS, + Name: fmt.Sprintf("%s-kne-trigger", brokerName), + } +} + +func makeServiceURI() *apis.URL { + return &apis.URL{ + Scheme: "http", + Host: fmt.Sprintf("broker-filter.%s.svc.%s", systemNS, utils.GetClusterDomainName()), + Path: fmt.Sprintf("/triggers/%s/%s/%s", testNS, triggerName, triggerUID), + } +} +func makeFilterSubscription() *messagingv1.Subscription { + return resources.NewSubscription(makeTrigger(), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeEmptyDelivery()) +} + +func makeTrigger() *eventingv1.Trigger { + return &eventingv1.Trigger{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1", + Kind: "Trigger", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + Name: triggerName, + UID: triggerUID, + }, + Spec: eventingv1.TriggerSpec{ + Broker: brokerName, + Filter: &eventingv1.TriggerFilter{ + Attributes: map[string]string{"Source": "Any", "Type": "Any"}, + }, + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: subscriberName, + Namespace: testNS, + Kind: subscriberKind, + APIVersion: subscriberAPIVersion, + }, + }, + }, + } +} + +func makeBrokerRef() *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1", + Kind: "Broker", + Namespace: testNS, + Name: brokerName, + } +} +func makeEmptyDelivery() *eventingduckv1.DeliverySpec { + return nil +} + +func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object { + brokerObjs := []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerReady, + WithBrokerResourceVersion(""), + WithBrokerAddressURI(brokerAddress), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)), + createChannel(testNS, true), + imcConfigMap(), + NewEndpoints(filterServiceName, systemNS, + WithEndpointsLabels(FilterLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpoints(ingressServiceName, systemNS, + WithEndpointsLabels(IngressLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + } + return append(brokerObjs[:], objs...) +} + +// Just so we can test subscription updates +func makeDifferentReadySubscription() *messagingv1.Subscription { + s := makeFilterSubscription() + s.Spec.Subscriber.URI = apis.HTTP("different.example.com") + s.Status = *eventingv1.TestHelper.ReadySubscriptionStatus() + return s +} + +func makeFilterSubscriptionNotOwnedByTrigger() *messagingv1.Subscription { + sub := makeFilterSubscription() + sub.OwnerReferences = []metav1.OwnerReference{} + return sub +} + +func makeReadySubscription() *messagingv1.Subscription { + s := makeFilterSubscription() + s.Status = *eventingv1.TestHelper.ReadySubscriptionStatus() + return s +} + +func makeReadySubscriptionDeprecatedName(triggerName, triggerUID string) *messagingv1.Subscription { + s := makeFilterSubscription() + t := NewTrigger(triggerName, testNS, brokerName) + t.UID = types.UID(triggerUID) + s.Name = utils.GenerateFixedName(t, fmt.Sprintf("%s-%s", brokerName, triggerName)) + s.Status = *eventingv1.TestHelper.ReadySubscriptionStatus() + return s +} + +func makeReadySubscriptionWithCustomData(triggerName, triggerUID string) *messagingv1.Subscription { + t := makeTrigger() + t.Name = triggerName + t.UID = types.UID(triggerUID) + + uri := makeServiceURI() + uri.Path = fmt.Sprintf("/triggers/%s/%s/%s", testNS, triggerName, triggerUID) + + return resources.NewSubscription(t, createTriggerChannelRef(), makeBrokerRef(), uri, makeEmptyDelivery()) +} + +func makeSubscriberAddressableAsUnstructured() *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": subscriberAPIVersion, + "kind": subscriberKind, + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": subscriberName, + }, + "status": map[string]interface{}{ + "address": map[string]interface{}{ + "url": subscriberURI, + }, + }, + }, + } +} + +func makeFalseStatusSubscription() *messagingv1.Subscription { + s := makeFilterSubscription() + s.Status.MarkReferencesNotResolved("testInducedError", "test induced error") + return s +} + +func makeFalseStatusPingSource() *sourcesv1alpha2.PingSource { + return rtv1alpha1.NewPingSourceV1Alpha2(pingSourceName, testNS, rtv1alpha1.WithPingSourceV1A2SinkNotFound) +} + +func makeUnknownStatusCronJobSource() *sourcesv1alpha2.PingSource { + cjs := rtv1alpha1.NewPingSourceV1Alpha2(pingSourceName, testNS) + cjs.Status.InitializeConditions() + return cjs +} + +func makeGenerationNotEqualPingSource() *sourcesv1alpha2.PingSource { + c := makeFalseStatusPingSource() + c.Generation = currentGeneration + c.Status.ObservedGeneration = outdatedGeneration + return c +} + +func makeReadyPingSource() *sourcesv1alpha2.PingSource { + u, _ := apis.ParseURL(sinkURI) + return rtv1alpha1.NewPingSourceV1Alpha2(pingSourceName, testNS, + rtv1alpha1.WithPingSourceV1A2Spec(sourcesv1alpha2.PingSourceSpec{ + Schedule: testSchedule, + JsonData: testData, + SourceSpec: duckv1.SourceSpec{ + Sink: brokerDestv1, + }, + }), + rtv1alpha1.WithInitPingSourceV1A2Conditions, + rtv1alpha1.WithValidPingSourceV1A2Schedule, + rtv1alpha1.WithValidPingSourceV1A2Resources, + rtv1alpha1.WithPingSourceV1A2Deployed, + rtv1alpha1.WithPingSourceV1A2CloudEventAttributes, + rtv1alpha1.WithPingSourceV1A2Sink(u), + ) +} +func makeSubscriberKubernetesServiceAsUnstructured() *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Service", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": subscriberName, + }, + }, + } +} + +// FilterLabels generates the labels present on all resources representing the filter of the given +// Broker. +func FilterLabels() map[string]string { + return map[string]string{ + "eventing.knative.dev/brokerRole": "filter", + } +} + +func IngressLabels() map[string]string { + return map[string]string{ + "eventing.knative.dev/brokerRole": "ingress", + } +} + +// Create Ready Broker with proper annotations. +func ReadyBroker() *eventingv1.Broker { + return NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerReady, + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)) +} diff --git a/pkg/reconciler/testing/v1/broker.go b/pkg/reconciler/testing/v1/broker.go index e9ee8bf287d..b4f609a7feb 100644 --- a/pkg/reconciler/testing/v1/broker.go +++ b/pkg/reconciler/testing/v1/broker.go @@ -184,3 +184,30 @@ func WithChannelAddressAnnotation(address string) BrokerOption { b.Status.Annotations[eventing.BrokerChannelAddressStatusAnnotationKey] = address } } + +func WithChannelAPIVersionAnnotation(apiVersion string) BrokerOption { + return func(b *v1.Broker) { + if b.Status.Annotations == nil { + b.Status.Annotations = make(map[string]string, 1) + } + b.Status.Annotations[eventing.BrokerChannelAPIVersionStatusAnnotationKey] = apiVersion + } +} + +func WithChannelKindAnnotation(kind string) BrokerOption { + return func(b *v1.Broker) { + if b.Status.Annotations == nil { + b.Status.Annotations = make(map[string]string, 1) + } + b.Status.Annotations[eventing.BrokerChannelKindStatusAnnotationKey] = kind + } +} + +func WithChannelNameAnnotation(name string) BrokerOption { + return func(b *v1.Broker) { + if b.Status.Annotations == nil { + b.Status.Annotations = make(map[string]string, 1) + } + b.Status.Annotations[eventing.BrokerChannelNameStatusAnnotationKey] = name + } +} diff --git a/pkg/reconciler/testing/v1/trigger.go b/pkg/reconciler/testing/v1/trigger.go index 219fa903bd5..2a055b4c60c 100644 --- a/pkg/reconciler/testing/v1/trigger.go +++ b/pkg/reconciler/testing/v1/trigger.go @@ -115,6 +115,13 @@ func WithTriggerBrokerFailed(reason, message string) TriggerOption { } } +// WithTriggerBrokerNotConfigured marks the Broker as not having been reconciled. +func WithTriggerBrokerNotConfigured() TriggerOption { + return func(t *v1.Trigger) { + t.Status.MarkBrokerNotConfigured() + } +} + // WithTriggerBrokerUnknown marks the Broker as unknown func WithTriggerBrokerUnknown(reason, message string) TriggerOption { return func(t *v1.Trigger) { @@ -207,7 +214,6 @@ func WithTriggerSubscriberResolvedUnknown(reason, message string) TriggerOption } } -// TODO: this can be a runtime object func WithTriggerDeleted(t *v1.Trigger) { deleteTime := metav1.NewTime(time.Unix(1e9, 0)) t.ObjectMeta.SetDeletionTimestamp(&deleteTime) diff --git a/test/config/chaosduck.yaml b/test/config/chaosduck.yaml index e542df74ae3..0757644cf87 100644 --- a/test/config/chaosduck.yaml +++ b/test/config/chaosduck.yaml @@ -78,8 +78,6 @@ spec: image: ko://knative.dev/pkg/leaderelection/chaosduck args: [ - # TODO(https://github.com/knative/eventing/issues/3591): Enable once MT Broker chaos issues are fixed. - "-disable=mt-broker-controller", # TODO(https://github.com/knative/eventing/issues/3590): Enable once IMC chaos issues are fixed. "-disable=imc-controller", "-disable=imc-dispatcher", ] diff --git a/test/config/config-logging.yaml b/test/config/config-logging.yaml new file mode 100644 index 00000000000..79f035d1773 --- /dev/null +++ b/test/config/config-logging.yaml @@ -0,0 +1,46 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-logging + namespace: knative-eventing + labels: + eventing.knative.dev/release: devel + +data: + zap-logger-config: | + { + "level": "debug", + "development": false, + "outputPaths": ["stdout"], + "errorOutputPaths": ["stderr"], + "encoding": "json", + "encoderConfig": { + "timeKey": "ts", + "levelKey": "level", + "nameKey": "logger", + "callerKey": "caller", + "messageKey": "msg", + "stacktraceKey": "stacktrace", + "lineEnding": "", + "levelEncoder": "", + "timeEncoder": "iso8601", + "durationEncoder": "", + "callerEncoder": "" + } + } + # Log level overrides + loglevel.mt-broker-controller: "debug" diff --git a/test/e2e-common.sh b/test/e2e-common.sh index 7a007291e86..17e9c0e686b 100755 --- a/test/e2e-common.sh +++ b/test/e2e-common.sh @@ -196,8 +196,7 @@ function install_mt_broker() { find ${TMP_MT_CHANNEL_BASED_BROKER_CONFIG_DIR} -type f -name "*.yaml" -exec sed -i "s/namespace: ${KNATIVE_DEFAULT_NAMESPACE}/namespace: ${TEST_EVENTING_NAMESPACE}/g" {} + ko apply --strict -f ${TMP_MT_CHANNEL_BASED_BROKER_CONFIG_DIR} || return 1 - # TODO(https://github.com/knative/eventing/issues/3591): Enable once MT Broker chaos issues are fixed. - # scale_controlplane mt-broker-controller + scale_controlplane mt-broker-controller wait_until_pods_running ${TEST_EVENTING_NAMESPACE} || fail_test "Knative Eventing with MT Broker did not come up" } @@ -216,6 +215,11 @@ function install_sugar() { } function unleash_duck() { + echo "enable debug logging" + cat test/config/config-logging.yaml | \ + sed "s/namespace: ${KNATIVE_DEFAULT_NAMESPACE}/namespace: ${TEST_EVENTING_NAMESPACE}/g" | \ + ko apply --strict -f - || return $? + echo "unleash the duck" cat test/config/chaosduck.yaml | \ sed "s/namespace: ${KNATIVE_DEFAULT_NAMESPACE}/namespace: ${TEST_EVENTING_NAMESPACE}/g" | \