diff --git a/pkg/reconciler/broker/trigger/trigger.go b/pkg/reconciler/broker/trigger/trigger.go index 1178aba40ab..a27b08ed7d0 100644 --- a/pkg/reconciler/broker/trigger/trigger.go +++ b/pkg/reconciler/broker/trigger/trigger.go @@ -163,7 +163,13 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1 Name: b.Name, Namespace: b.Namespace, } - expected := resources.NewSubscription(t, brokerTrigger, brokerObjRef, uri, b.Spec.Delivery) + + delivery := t.Spec.Delivery + if delivery == nil { + delivery = b.Spec.Delivery + } + + expected := resources.NewSubscription(t, brokerTrigger, brokerObjRef, uri, delivery) sub, err := r.subscriptionLister.Subscriptions(t.Namespace).Get(expected.Name) // If the resource doesn't exist, we'll create it. diff --git a/pkg/reconciler/broker/trigger/trigger_test.go b/pkg/reconciler/broker/trigger/trigger_test.go index 06cd7ac0cad..e5a61222eda 100644 --- a/pkg/reconciler/broker/trigger/trigger_test.go +++ b/pkg/reconciler/broker/trigger/trigger_test.go @@ -22,14 +22,12 @@ import ( "testing" cloudevents "github.com/cloudevents/sdk-go/v2" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" - clientgotesting "k8s.io/client-go/testing" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" v1 "knative.dev/eventing/pkg/apis/duck/v1" @@ -53,6 +51,7 @@ import ( fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/network" + "knative.dev/pkg/ptr" "knative.dev/pkg/resolver" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" @@ -246,6 +245,70 @@ func TestReconcile(t *testing.T) { WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), WithTriggerStatusSubscriberURI(subscriberURI)), }}, + }, { + Name: "Creates subscription with retry from trigger", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerReady, + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithTriggerRetry(5, nil, nil)), + }, + WantCreates: []runtime.Object{ + resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeDelivery(nil, "", ptr.Int32(5), nil, nil)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithTriggerRetry(5, nil, nil), + WithTriggerBrokerReady(), + WithTriggerDependencyReady(), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), + WithTriggerStatusSubscriberURI(subscriberURI)), + }}, + }, { + Name: "Creates subscription with dlq from trigger", + Key: testKey, + Objects: []runtime.Object{ + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerReady, + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName)), + NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithTriggerDeadLeaderSink(nil, "http://example.com")), + }, + WantCreates: []runtime.Object{ + resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeDelivery(nil, "http://example.com", nil, nil, nil)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewTrigger(triggerName, testNS, brokerName, + WithTriggerUID(triggerUID), + WithTriggerSubscriberURI(subscriberURI), + WithTriggerDeadLeaderSink(nil, "http://example.com"), + WithTriggerBrokerReady(), + WithTriggerDependencyReady(), + WithTriggerSubscriberResolvedSucceeded(), + WithTriggerSubscribedUnknown("SubscriptionNotConfigured", "Subscription has not yet been reconciled."), + WithTriggerStatusSubscriberURI(subscriberURI)), + }}, }, { Name: "Subscription Create fails", Key: testKey, @@ -951,10 +1014,30 @@ func makeBrokerRef() *corev1.ObjectReference { Name: brokerName, } } + func makeEmptyDelivery() *eventingduckv1.DeliverySpec { return nil } +func makeDelivery(ref *duckv1.KReference, uri string, retry *int32, backoffPolicy *v1.BackoffPolicyType, backoffDelay *string) *eventingduckv1.DeliverySpec { + ds := &v1.DeliverySpec{ + Retry: retry, + BackoffPolicy: backoffPolicy, + BackoffDelay: backoffDelay, + } + if ref != nil || uri != "" { + var u *apis.URL + if uri != "" { + u, _ = apis.ParseURL(uri) + } + ds.DeadLetterSink = &duckv1.Destination{ + Ref: ref, + URI: u, + } + } + return ds +} + func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object { brokerObjs := []runtime.Object{ NewBroker(brokerName, testNS, diff --git a/pkg/reconciler/testing/v1/trigger.go b/pkg/reconciler/testing/v1/trigger.go index 2a055b4c60c..73e5b8539bc 100644 --- a/pkg/reconciler/testing/v1/trigger.go +++ b/pkg/reconciler/testing/v1/trigger.go @@ -23,9 +23,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + eventingv1 "knative.dev/eventing/pkg/apis/duck/v1" v1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/ptr" ) // TriggerOption enables further configuration of a Trigger. @@ -56,6 +58,33 @@ func WithTriggerSubscriberURI(rawurl string) TriggerOption { } } +func WithTriggerDeadLeaderSink(ref *duckv1.KReference, uri string) TriggerOption { + return func(t *v1.Trigger) { + if t.Spec.Delivery == nil { + t.Spec.Delivery = new(eventingv1.DeliverySpec) + } + var u *apis.URL + if uri != "" { + u, _ = apis.ParseURL(uri) + } + t.Spec.Delivery.DeadLetterSink = &duckv1.Destination{ + Ref: ref, + URI: u, + } + } +} + +func WithTriggerRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoffDelay *string) TriggerOption { + return func(t *v1.Trigger) { + if t.Spec.Delivery == nil { + t.Spec.Delivery = new(eventingv1.DeliverySpec) + } + t.Spec.Delivery.Retry = ptr.Int32(count) + t.Spec.Delivery.BackoffPolicy = backoffPolicy + t.Spec.Delivery.BackoffDelay = backoffDelay + } +} + func WithTriggerSubscriberRef(gvk metav1.GroupVersionKind, name, namespace string) TriggerOption { return func(t *v1.Trigger) { t.Spec.Subscriber = duckv1.Destination{