diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml index ce90004a2b1..19f55802428 100644 --- a/config/core/resources/subscription.yaml +++ b/config/core/resources/subscription.yaml @@ -47,6 +47,7 @@ spec: name: description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' type: string + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery: description: Delivery configuration type: object diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 529047b4419..ea7ad69dc0b 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -310,6 +310,18 @@ func (r *Reconciler) getSubStatus(subscription *v1.Subscription, channel *eventi } func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscription, ref duckv1.KReference) (runtime.Object, pkgreconciler.Event) { + // Resolve the group + if feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { + newRef, err := r.kreferenceResolver.ResolveGroup(&ref) + if err != nil { + logging.FromContext(ctx).Warnw("Failed to resolve Channel reference", + zap.Error(err), + zap.Any("ref", ref)) + return nil, err + } + ref = *newRef + } + // Track the channel using the channelableTracker. // We don't need the explicitly set a channelInformer, as this will dynamically generate one for us. // This code needs to be called before checking the existence of the `channel`, in order to make sure the diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 7f7c0ce3b90..c4c85b1e176 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -261,6 +261,85 @@ func TestAllCases(t *testing.T) { Ready: "True", }}), ), + // Subscriber CRD + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, + }, { + Name: "subscription goes ready with both api version and group", + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingApiVersionAndGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), // IMC CRD eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ @@ -278,10 +357,184 @@ func TestAllCases(t *testing.T) { Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingApiVersionAndGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, + }, { + Name: "subscription goes ready with Channel.Group and Subscriber.Ref.Group", + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingGroup(imcV1GVK, channelName), WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), WithSubscriptionReply(imcV1GVK, replyName, testNS), WithInitSubscriptionConditions, WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + eventingtesting.NewCustomResourceDefinition("inmemorychannels.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingGroup(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, + }, { + Name: "subscription goes ready with Channel.Group and Channel.APIVersion", + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingApiVersionAndGroup(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + eventingtesting.NewCustomResourceDefinition("inmemorychannels.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannelUsingApiVersionAndGroup(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), WithSubscriptionPhysicalSubscriptionReply(replyURI), // - Status Update - diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index 2d665b254f9..b42e759672c 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -125,6 +125,27 @@ func WithSubscriptionChannel(gvk metav1.GroupVersionKind, name string) Subscript } } +func WithSubscriptionChannelUsingGroup(gvk metav1.GroupVersionKind, name string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Channel = duckv1.KReference{ + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + } + } +} + +func WithSubscriptionChannelUsingApiVersionAndGroup(gvk metav1.GroupVersionKind, name string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Channel = duckv1.KReference{ + APIVersion: apiVersion(gvk), + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + } + } +} + func WithSubscriptionSubscriberRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { return func(s *v1.Subscription) { s.Spec.Subscriber = &duckv1.Destination{ @@ -151,6 +172,20 @@ func WithSubscriptionSubscriberRefUsingGroup(gvk metav1.GroupVersionKind, name, } } +func WithSubscriptionSubscriberRefUsingApiVersionAndGroup(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Subscriber = &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: apiVersion(gvk), + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + } + } +} + func WithSubscriptionDeliveryRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { return func(s *v1.Subscription) { s.Spec.Delivery = &eventingduckv1.DeliverySpec{ diff --git a/test/experimental/features/kreference_group/sub_channel.go b/test/experimental/features/kreference_group/sub_channel.go new file mode 100644 index 00000000000..03d22dc6273 --- /dev/null +++ b/test/experimental/features/kreference_group/sub_channel.go @@ -0,0 +1,98 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kreference_group + +import ( + "context" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/test/rekt/resources/channel" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" +) + +// SubscriptionWithChannelGroup tests a scenario where the flow is source -> channel -> sink +// where the channel -> sink subscription uses the KReference.Group field for Subscription.Spec.Channel +func SubscriptionWithChannelGroup() *feature.Feature { + f := feature.NewFeature() + + channelGVK := channel.GVK() + channelGVR := channel.GVR() + channelGroup := channelGVK.GroupKind().Group + _, channelKind := channelGVK.ToAPIVersionAndKind() + + channelName := feature.MakeRandomK8sName("channel") + subName := feature.MakeRandomK8sName("sub") + sinkName := feature.MakeRandomK8sName("sink") + sourceName := feature.MakeRandomK8sName("source") + + ev := cetest.FullEvent() + + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + )) + + f.Setup("Install channel", channel.Install(channelName)) + f.Setup("channel is ready", channel.IsReady(channelName)) + + f.Setup("Install channel -> sink subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, + &messagingv1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subName, + Namespace: namespace, + }, + Spec: messagingv1.SubscriptionSpec{ + Channel: duckv1.KReference{ + Group: channelGroup, + Kind: channelKind, + Name: channelName, + }, + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "Service", + Name: sinkName, + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + f.Setup("subscription channel -> sink is ready", subscription.IsReady(subName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(channelGVR, channelName), + eventshub.InputEvent(ev), + )) + + f.Assert("receive event", assert.OnStore(sinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + + return f +} diff --git a/test/experimental/kreference_group_test.go b/test/experimental/kreference_group_test.go index 99254616243..e394ce6198e 100644 --- a/test/experimental/kreference_group_test.go +++ b/test/experimental/kreference_group_test.go @@ -40,4 +40,5 @@ func TestChannelToChannel(t *testing.T) { ) env.Test(ctx, t, kreference_group.ChannelToChannel()) + env.Test(ctx, t, kreference_group.SubscriptionWithChannelGroup()) }