From 725ead4b9611c2b3ed540d8a428557a12d4ddb74 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 14 Sep 2020 12:12:54 +0200 Subject: [PATCH 1/2] Propagate the entire delivery spec to the Channel The subscription reconciler didn't propagate the entire `deliverySpec` to the channel, only the `deadLetterSink` was propagated. Signed-off-by: Pierangelo Di Pilato --- pkg/reconciler/subscription/subscription.go | 39 ++++-- .../subscription/subscription_test.go | 118 ++++++++++++++++-- 2 files changed, 134 insertions(+), 23 deletions(-) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index db87759bb62..0cdc69a82cc 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -524,6 +524,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, + Delivery: deliverySpec(sub), }}, } return @@ -536,6 +537,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c channel.Spec.Subscribable.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI channel.Spec.Subscribable.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI channel.Spec.Subscribable.Subscribers[i].DeadLetterSinkURI = sub.Status.PhysicalSubscription.DeadLetterSinkURI + channel.Spec.Subscribable.Subscribers[i].Delivery = deliverySpec(sub) return } } @@ -548,6 +550,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, DeadLetterSinkURI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, + Delivery: deliverySpec(sub), }) } @@ -558,15 +561,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch channel.Spec.Subscribers[i].Generation = sub.Generation channel.Spec.Subscribers[i].SubscriberURI = sub.Status.PhysicalSubscription.SubscriberURI channel.Spec.Subscribers[i].ReplyURI = sub.Status.PhysicalSubscription.ReplyURI - // Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching - // empty delivery in there. - if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil { - channel.Spec.Subscribers[i].Delivery = &eventingduckv1beta1.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, - }, - } - } + channel.Spec.Subscribers[i].Delivery = deliverySpec(sub) return } } @@ -576,17 +571,37 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, ch Generation: sub.Generation, SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, + Delivery: deliverySpec(sub), } + + // Must not have been found. Add it. + channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd) +} + +func deliverySpec(sub *v1.Subscription) *eventingduckv1beta1.DeliverySpec { + + var delivery *eventingduckv1beta1.DeliverySpec + // Only set the deadletter sink if it's not nil. Otherwise we'll just end up patching // empty delivery in there. if sub.Status.PhysicalSubscription.DeadLetterSinkURI != nil { - toAdd.Delivery = &eventingduckv1beta1.DeliverySpec{ + delivery = &eventingduckv1beta1.DeliverySpec{ DeadLetterSink: &duckv1.Destination{ URI: sub.Status.PhysicalSubscription.DeadLetterSinkURI, }, } } + if sub.Spec.Delivery != nil && (sub.Spec.Delivery.BackoffDelay != nil || sub.Spec.Delivery.Retry != nil || sub.Spec.Delivery.BackoffPolicy != nil) { + if delivery == nil { + delivery = &eventingduckv1beta1.DeliverySpec{} + } + delivery.BackoffPolicy = backoffPolicyV1Beta1(sub.Spec.Delivery.BackoffPolicy) + delivery.Retry = sub.Spec.Delivery.Retry + delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay + } + return delivery +} - // Must not have been found. Add it. - channel.Spec.Subscribers = append(channel.Spec.Subscribers, toAdd) +func backoffPolicyV1Beta1(policyType *eventingduckv1.BackoffPolicyType) *eventingduckv1beta1.BackoffPolicyType { + return (*eventingduckv1beta1.BackoffPolicyType)(policyType) } diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 7b962a2b19a..5921de34724 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -22,9 +22,11 @@ import ( "fmt" "testing" + "k8s.io/utils/pointer" + "knative.dev/pkg/injection/clients/dynamicclient" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" - "knative.dev/pkg/injection/clients/dynamicclient" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -33,14 +35,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1" - eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" - messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable" - "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined" - "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" - "knative.dev/eventing/pkg/duck" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" @@ -50,10 +44,20 @@ import ( logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/resolver" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined" + "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" + "knative.dev/eventing/pkg/duck" + "knative.dev/eventing/pkg/utils" + + . "knative.dev/pkg/reconciler/testing" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake" . "knative.dev/eventing/pkg/reconciler/testing/v1" - . "knative.dev/pkg/reconciler/testing" ) const ( @@ -134,6 +138,8 @@ func init() { } func TestAllCases(t *testing.T) { + linear := eventingduck.BackoffPolicyLinear + table := TableTest{ { Name: "bad workqueue key", @@ -1001,7 +1007,91 @@ func TestAllCases(t *testing.T) { }), patchFinalizers(testNS, "a-"+subscriptionName), }, - }, { + }, + { + Name: "v1 imc+two subscribers for a channel - update delivery - full delivery spec", + Objects: []runtime.Object{ + NewSubscription("a-"+subscriptionName, testNS, + WithSubscriptionUID("a-"+subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), + WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }), + ), + NewUnstructured(subscriberGVK, dlcName, testNS, + WithUnstructuredAddressable(dlcDNS), + ), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelSubscribers(nil), + WithInMemoryChannelAddress(channelDNS), + WithInMemoryChannelReadySubscriber("a-"+subscriptionUID), + WithInMemoryChannelReadySubscriber("b-"+subscriptionUID), + ), + NewService(serviceName, testNS), + }, + Key: testNS + "/" + "a-" + subscriptionName, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName), + Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription("a-"+subscriptionName, testNS, + WithSubscriptionUID("a-"+subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), + // The first reconciliation will initialize the status conditions. + WithInitSubscriptionConditions, + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), + WithSubscriptionDeliverySpec(&eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }), + WithSubscriptionDeadLetterSinkURI(dlcURI), + ), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + { + UID: "a-" + subscriptionUID, + SubscriberURI: serviceURIWithPath, + Delivery: &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: apis.HTTP("dlc.mynamespace.svc.cluster.local"), + }, + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }, + }, + }), + patchFinalizers(testNS, "a-"+subscriptionName), + }, + }, + { Name: "v1 imc+deleted - channel patch succeeded", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, @@ -1130,6 +1220,12 @@ func TestAllCases(t *testing.T) { }, false, logger)) } +func WithSubscriptionDeliverySpec(d *eventingduck.DeliverySpec) SubscriptionOption { + return func(v *messagingv1.Subscription) { + v.Spec.Delivery = d + } +} + func patchSubscribers(namespace, name string, subscribers []eventingduck.SubscriberSpec) clientgotesting.PatchActionImpl { action := clientgotesting.PatchActionImpl{} action.Name = name From 42ccc3ee23b34ca6e5b3b2d842e606f74e5158a8 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 14 Sep 2020 12:44:52 +0200 Subject: [PATCH 2/2] Apply suggestion Signed-off-by: Pierangelo Di Pilato --- pkg/reconciler/subscription/subscription.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 0cdc69a82cc..f81d496628d 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -595,13 +595,9 @@ func deliverySpec(sub *v1.Subscription) *eventingduckv1beta1.DeliverySpec { if delivery == nil { delivery = &eventingduckv1beta1.DeliverySpec{} } - delivery.BackoffPolicy = backoffPolicyV1Beta1(sub.Spec.Delivery.BackoffPolicy) + delivery.BackoffPolicy = (*eventingduckv1beta1.BackoffPolicyType)(sub.Spec.Delivery.BackoffPolicy) delivery.Retry = sub.Spec.Delivery.Retry delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay } return delivery } - -func backoffPolicyV1Beta1(policyType *eventingduckv1.BackoffPolicyType) *eventingduckv1beta1.BackoffPolicyType { - return (*eventingduckv1beta1.BackoffPolicyType)(policyType) -}