From 588ef6705c20e77a9f5a40bebe045179426beaf3 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 17 Jun 2021 10:12:29 +0200 Subject: [PATCH 1/3] Support KReference.Group in Subscription.Spec.Channel Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/subscription.go | 12 +++ .../subscription/subscription_test.go | 89 ++++++++++++++++- pkg/reconciler/testing/v1/subscription.go | 10 ++ .../features/kreference_group/sub_channel.go | 98 +++++++++++++++++++ test/experimental/kreference_group_test.go | 1 + 5 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 test/experimental/features/kreference_group/sub_channel.go diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 529047b4419..ea7ad69dc0b 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -310,6 +310,18 @@ func (r *Reconciler) getSubStatus(subscription *v1.Subscription, channel *eventi } func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscription, ref duckv1.KReference) (runtime.Object, pkgreconciler.Event) { + // Resolve the group + if feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { + newRef, err := r.kreferenceResolver.ResolveGroup(&ref) + if err != nil { + logging.FromContext(ctx).Warnw("Failed to resolve Channel reference", + zap.Error(err), + zap.Any("ref", ref)) + return nil, err + } + ref = *newRef + } + // Track the channel using the channelableTracker. // We don't need the explicitly set a channelInformer, as this will dynamically generate one for us. // This code needs to be called before checking the existence of the `channel`, in order to make sure the diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 7f7c0ce3b90..20a8aead90b 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -261,7 +261,7 @@ func TestAllCases(t *testing.T) { Ready: "True", }}), ), - // IMC CRD + // Subscriber CRD eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ Name: "v1beta1", @@ -288,6 +288,93 @@ func TestAllCases(t *testing.T) { MarkSubscriptionReady, ), }}, + }, {}, { + Name: "subscription goes ready with Channel.Group and Subscriber.Ref.Group", + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingGroup(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + eventingtesting.NewCustomResourceDefinition("inmemorychannels.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingGroup(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, }, { Name: "channel does not exist", Objects: []runtime.Object{ diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index 2d665b254f9..6c1cf70ca2e 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -125,6 +125,16 @@ func WithSubscriptionChannel(gvk metav1.GroupVersionKind, name string) Subscript } } +func WithSubscriptionChannelUsingGroup(gvk metav1.GroupVersionKind, name string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Channel = duckv1.KReference{ + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + } + } +} + func WithSubscriptionSubscriberRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { return func(s *v1.Subscription) { s.Spec.Subscriber = &duckv1.Destination{ diff --git a/test/experimental/features/kreference_group/sub_channel.go b/test/experimental/features/kreference_group/sub_channel.go new file mode 100644 index 00000000000..03d22dc6273 --- /dev/null +++ b/test/experimental/features/kreference_group/sub_channel.go @@ -0,0 +1,98 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kreference_group + +import ( + "context" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/test/rekt/resources/channel" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" +) + +// SubscriptionWithChannelGroup tests a scenario where the flow is source -> channel -> sink +// where the channel -> sink subscription uses the KReference.Group field for Subscription.Spec.Channel +func SubscriptionWithChannelGroup() *feature.Feature { + f := feature.NewFeature() + + channelGVK := channel.GVK() + channelGVR := channel.GVR() + channelGroup := channelGVK.GroupKind().Group + _, channelKind := channelGVK.ToAPIVersionAndKind() + + channelName := feature.MakeRandomK8sName("channel") + subName := feature.MakeRandomK8sName("sub") + sinkName := feature.MakeRandomK8sName("sink") + sourceName := feature.MakeRandomK8sName("source") + + ev := cetest.FullEvent() + + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + )) + + f.Setup("Install channel", channel.Install(channelName)) + f.Setup("channel is ready", channel.IsReady(channelName)) + + f.Setup("Install channel -> sink subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, + &messagingv1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subName, + Namespace: namespace, + }, + Spec: messagingv1.SubscriptionSpec{ + Channel: duckv1.KReference{ + Group: channelGroup, + Kind: channelKind, + Name: channelName, + }, + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "Service", + Name: sinkName, + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + f.Setup("subscription channel -> sink is ready", subscription.IsReady(subName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(channelGVR, channelName), + eventshub.InputEvent(ev), + )) + + f.Assert("receive event", assert.OnStore(sinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + + return f +} diff --git a/test/experimental/kreference_group_test.go b/test/experimental/kreference_group_test.go index 99254616243..e394ce6198e 100644 --- a/test/experimental/kreference_group_test.go +++ b/test/experimental/kreference_group_test.go @@ -40,4 +40,5 @@ func TestChannelToChannel(t *testing.T) { ) env.Test(ctx, t, kreference_group.ChannelToChannel()) + env.Test(ctx, t, kreference_group.SubscriptionWithChannelGroup()) } From 7fd4006912a5b696e1951254009e9d8653ffbd2a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 17 Jun 2021 10:13:16 +0200 Subject: [PATCH 2/3] Support KReference.Group in Subscription.Spec.Channel Signed-off-by: Francesco Guardiani --- config/core/resources/subscription.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml index ce90004a2b1..19f55802428 100644 --- a/config/core/resources/subscription.yaml +++ b/config/core/resources/subscription.yaml @@ -47,6 +47,7 @@ spec: name: description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' type: string + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery: description: Delivery configuration type: object From 0ce949d766c6208ca3f3a46cdef9b82a4445f27d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 21 Jun 2021 17:32:04 +0200 Subject: [PATCH 3/3] More tests Signed-off-by: Francesco Guardiani --- .../subscription/subscription_test.go | 168 +++++++++++++++++- pkg/reconciler/testing/v1/subscription.go | 25 +++ 2 files changed, 192 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 20a8aead90b..c4c85b1e176 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -288,7 +288,86 @@ func TestAllCases(t *testing.T) { MarkSubscriptionReady, ), }}, - }, {}, { + }, { + Name: "subscription goes ready with both api version and group", + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingApiVersionAndGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + // IMC CRD + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingApiVersionAndGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, + }, { Name: "subscription goes ready with Channel.Group and Subscriber.Ref.Group", Ctx: feature.ToContext(context.TODO(), feature.Flags{ feature.KReferenceGroup: feature.Enabled, @@ -375,6 +454,93 @@ func TestAllCases(t *testing.T) { MarkSubscriptionReady, ), }}, + }, { + Name: "subscription goes ready with Channel.Group and Channel.APIVersion", + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingApiVersionAndGroup(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + eventingtesting.NewCustomResourceDefinition("inmemorychannels.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingApiVersionAndGroup(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, }, { Name: "channel does not exist", Objects: []runtime.Object{ diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index 6c1cf70ca2e..b42e759672c 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -135,6 +135,17 @@ func WithSubscriptionChannelUsingGroup(gvk metav1.GroupVersionKind, name string) } } +func WithSubscriptionChannelUsingApiVersionAndGroup(gvk metav1.GroupVersionKind, name string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Channel = duckv1.KReference{ + APIVersion: apiVersion(gvk), + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + } + } +} + func WithSubscriptionSubscriberRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { return func(s *v1.Subscription) { s.Spec.Subscriber = &duckv1.Destination{ @@ -161,6 +172,20 @@ func WithSubscriptionSubscriberRefUsingGroup(gvk metav1.GroupVersionKind, name, } } +func WithSubscriptionSubscriberRefUsingApiVersionAndGroup(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Subscriber = &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: apiVersion(gvk), + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + } + } +} + func WithSubscriptionDeliveryRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { return func(s *v1.Subscription) { s.Spec.Delivery = &eventingduckv1.DeliverySpec{