From e69c6f852733b1de8c812bc417fe13a83b1908d0 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 14 Sep 2020 13:26:02 +0200 Subject: [PATCH] Propagate the entire delivery spec to the Channel (#4042) * Propagate the entire delivery spec to the Channel The subscription reconciler didn't propagate the entire `deliverySpec` to the channel, only the `deadLetterSink` was propagated. Signed-off-by: Pierangelo Di Pilato * Apply suggestion Signed-off-by: Pierangelo Di Pilato --- pkg/reconciler/subscription/subscription.go | 37 ++++-- .../subscription/subscription_test.go | 111 ++++++++++++++---- 2 files changed, 115 insertions(+), 33 deletions(-) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index d9e94ea25fa..9fa77fa9f9a 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -524,6 +524,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, + Delivery: deliverySpec(sub), }}, } return @@ -536,6 +537,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c channel.Spec.Subscribable.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI channel.Spec.Subscribable.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI channel.Spec.Subscribable.Subscribers[i].DeadLetterSinkURI = sub.Status.PhysicalSubscription.DeadLetterSinkURI + channel.Spec.Subscribable.Subscribers[i].Delivery = deliverySpec(sub) return } } @@ -548,6 +550,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, + Delivery: deliverySpec(sub), }) } @@ -558,15 +561,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch channel.Spec.Subscribers[i].Generation = sub.Generation channel.Spec.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI channel.Spec.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI - // Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching - // empty delivery in there. - if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil { - channel.Spec.Subscribers[i].Delivery = &eventingduckv1beta1.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, - }, - } - } + channel.Spec.Subscribers[i].Delivery = deliverySpec(sub) return } } @@ -576,17 +571,33 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch Generation: sub.Generation, SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, + Delivery: deliverySpec(sub), } + + // Must not have been found. Add it. + channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd) +} + +func deliverySpec(sub *v1beta1.Subscription) *eventingduckv1beta1.DeliverySpec { + + var delivery *eventingduckv1beta1.DeliverySpec + // Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching // empty delivery in there. if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil { - toAdd.Delivery = &eventingduckv1beta1.DeliverySpec{ + delivery = &eventingduckv1beta1.DeliverySpec{ DeadLetterSink: &duckv1.Destination{ URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, }, } } - - // Must not have been found. Add it. - channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd) + if sub.Spec.Delivery != nil && (sub.Spec.Delivery.BackoffDelay != nil || sub.Spec.Delivery.Retry != nil || sub.Spec.Delivery.BackoffPolicy != nil) { + if delivery == nil { + delivery = &eventingduckv1beta1.DeliverySpec{} + } + delivery.BackoffPolicy = (*eventingduckv1beta1.BackoffPolicyType)(sub.Spec.Delivery.BackoffPolicy) + delivery.Retry = sub.Spec.Delivery.Retry + delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay + } + return delivery } diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index b785e2b9203..2341121a1b6 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -22,9 +22,12 @@ import ( "fmt" "testing" + "k8s.io/utils/pointer" + "knative.dev/pkg/injection/clients/dynamicclient" + eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" - "knative.dev/pkg/injection/clients/dynamicclient" corev1 "k8s.io/api/core/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -33,14 +36,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" - eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" - messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" - "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable" - "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined" - "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription" - "knative.dev/eventing/pkg/duck" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" @@ -50,10 +45,22 @@ import ( logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/resolver" + eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" + messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined" + "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription" + "knative.dev/eventing/pkg/duck" + "knative.dev/eventing/pkg/utils" + + . "knative.dev/pkg/reconciler/testing" + + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/inmemorychannel/fake" . "knative.dev/eventing/pkg/reconciler/testing/v1beta1" - . "knative.dev/pkg/reconciler/testing" ) const ( @@ -67,6 +74,8 @@ const ( subscriptionName = "testsubscription" testNS = "testnamespace" subscriptionGeneration = 1 + + finalizerName = "subscriptions.messaging.knative.dev" ) // subscriptions have: channel -> SUB -> subscriber -viaSub-> reply @@ -147,6 +156,8 @@ func init() { } func TestAllCases(t *testing.T) { + linear := eventingduck.BackoffPolicyLinear + table := TableTest{ { Name: "bad workqueue key", @@ -1222,6 +1233,39 @@ func TestAllCases(t *testing.T) { WithChannelableReadySubscriber("a-"+subscriptionUID), WithChannelableReadySubscriber("b-"+subscriptionUID), ), + }, + }, + { + Name: "v1 imc+two subscribers for a channel - update delivery - full delivery spec", + Objects: []runtime.Object{ + NewSubscription("a-"+subscriptionName, testNS, + WithSubscriptionUID("a-"+subscriptionUID), + WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), + WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }), + ), + NewUnstructured(subscriberGVK, dlcName, testNS, + WithUnstructuredAddressable(dlcDNS), + ), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelSubscribers(nil), + WithInMemoryChannelAddress(channelDNS), + WithInMemoryChannelReadySubscriber("a-"+subscriptionUID), + WithInMemoryChannelReadySubscriber("b-"+subscriptionUID), + ), NewService(serviceName, testNS), }, Key: testNS + "/" + "a-" + subscriptionName, @@ -1233,24 +1277,49 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription("a-"+subscriptionName, testNS, WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), + WithSubscriptionChannel(imcV1Beta1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, MarkReferencesResolved, MarkAddedToChannel, WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), + WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }), + WithSubscriptionDeadLetterSinkURI(dlcURI), ), }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribersV1Alpha1(testNS, channelName, []eventingduckv1alpha1.SubscriberSpec{ - {UID: "b-" + subscriptionUID}, - {UID: "a-" + subscriptionUID, SubscriberURI: serviceURIWithPath}, + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + { + UID: "a-" + subscriptionUID, + SubscriberURI: serviceURIWithPath, + Delivery: &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: apis.HTTP("dlc.mynamespace.svc.cluster.local"), + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }, + }, }), patchFinalizers(testNS, "a-"+subscriptionName), }, - }, { - Name: "v1beta1 imc+deleted - channel patch succeeded", + }, + { + Name: "v1 imc+deleted - channel patch succeeded", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), @@ -1440,6 +1509,12 @@ func patchSubscribersV1Alpha1(namespace, name string, subscribers []eventingduck return action } +func WithSubscriptionDeliverySpec(d *eventingduck.DeliverySpec) SubscriptionOption { + return func(v *messagingv1beta1.Subscription) { + v.Spec.Delivery = d + } +} + func patchSubscribers(namespace, name string, subscribers []eventingduck.SubscriberSpec) clientgotesting.PatchActionImpl { action := clientgotesting.PatchActionImpl{} action.Name = name @@ -1470,10 +1545,6 @@ func patchSubscribers(namespace, name string, subscribers []eventingduck.Subscri return action } -const ( - finalizerName = "subscriptions.messaging.knative.dev" -) - func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { action := clientgotesting.PatchActionImpl{} action.Name = name