diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 5bb6e370b75..4c2a4a8035b 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -91,9 +92,12 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store")) channelStore.WatchConfigs(cmw) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) + // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return channelStore.ToContext(store.ToContext(ctx)) + return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) } return defaulting.NewAdmissionController(ctx, @@ -125,9 +129,13 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher pingstore := pingdefaultconfig.NewStore(logging.FromContext(ctx).Named("ping-config-store")) pingstore.WatchConfigs(cmw) + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) + // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx))) + return featureStore.ToContext(channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx)))) } return validation.NewAdmissionController(ctx, @@ -201,9 +209,12 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store")) channelStore.WatchConfigs(cmw) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) + // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return channelStore.ToContext(store.ToContext(ctx)) + return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) } var ( diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index 36fc042ffd7..6856d9b3351 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -22,3 +22,6 @@ metadata: knative.dev/config-propagation: original knative.dev/config-category: eventing data: + # ALPHA feature: The kreference-group allows you to use the Group field in KReferences. + # For more details: https://github.com/knative/eventing/issues/5086 + kreference-group: "disabled" diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml index 8e035f980e1..ce90004a2b1 100644 --- a/config/core/resources/subscription.yaml +++ b/config/core/resources/subscription.yaml @@ -127,6 +127,7 @@ spec: namespace: description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.' type: string + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature uri: description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. type: string diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go index 5d7b554ea45..fdc49d7e144 100644 --- a/pkg/apis/feature/flag_names.go +++ b/pkg/apis/feature/flag_names.go @@ -15,3 +15,7 @@ limitations under the License. */ package feature + +const ( + KReferenceGroup = "kreference-group" +) diff --git a/pkg/apis/feature/store.go b/pkg/apis/feature/store.go index cfa78baf9fb..a22f313c232 100644 --- a/pkg/apis/feature/store.go +++ b/pkg/apis/feature/store.go @@ -19,6 +19,7 @@ package feature import ( "context" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/configmap" ) @@ -50,7 +51,7 @@ func FromContextOrDefaults(ctx context.Context) Flags { // ToContext attaches the provided Flags to the provided context, returning the // new context with the Flags attached. func ToContext(ctx context.Context, c Flags) context.Context { - return context.WithValue(ctx, cfgKey{}, c) + return fillContextWithFeatureSpecificFlags(context.WithValue(ctx, cfgKey{}, c), c) } // Store is a typed wrapper around configmap.Untyped store to handle our configmaps. @@ -98,3 +99,10 @@ func (s *Store) Load() Flags { } return loaded.(Flags) } + +func fillContextWithFeatureSpecificFlags(ctx context.Context, flags Flags) context.Context { + if flags.IsEnabled(KReferenceGroup) { + ctx = duckv1.KReferenceGroupAllowed(ctx) + } + return ctx +} diff --git a/pkg/apis/messaging/v1/subscription_validation_test.go b/pkg/apis/messaging/v1/subscription_validation_test.go index 9e2f3e3d2c3..e2871f71a93 100644 --- a/pkg/apis/messaging/v1/subscription_validation_test.go +++ b/pkg/apis/messaging/v1/subscription_validation_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -243,6 +244,165 @@ func TestSubscriptionSpecValidation(t *testing.T) { } } +func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { + tests := []struct { + name string + c *SubscriptionSpec + want *apis.FieldError + }{{ + name: "valid", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + }, + want: nil, + }, { + name: "valid with reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: getValidReply(), + }, + want: nil, + }, { + name: "empty Channel", + c: &SubscriptionSpec{ + Channel: duckv1.KReference{}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channel") + fe.Details = "the Subscription must reference a channel" + return fe + }(), + }, { + name: "missing name in Channel", + c: &SubscriptionSpec{ + Channel: duckv1.KReference{ + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + Subscriber: getValidDestination(), + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channel.name") + return fe + }(), + }, { + name: "missing Subscriber and Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply", "subscriber") + fe.Details = "the Subscription must reference at least one of (reply or a subscriber)" + return fe + }(), + }, { + name: "empty Subscriber and Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: &duckv1.Destination{}, + Reply: &duckv1.Destination{}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply", "subscriber") + fe.Details = "the Subscription must reference at least one of (reply or a subscriber)" + return fe + }(), + }, { + name: "missing Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + }, + want: nil, + }, { + name: "empty Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: &duckv1.Destination{}, + }, + want: nil, + }, { + name: "missing Subscriber", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Reply: getValidReply(), + }, + want: nil, + }, { + name: "empty Subscriber", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: &duckv1.Destination{}, + Reply: getValidReply(), + }, + want: nil, + }, { + name: "missing name in channel, and missing subscriber, reply", + c: &SubscriptionSpec{ + Channel: duckv1.KReference{ + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply", "subscriber") + fe.Details = "the Subscription must reference at least one of (reply or a subscriber)" + return apis.ErrMissingField("channel.name").Also(fe) + }(), + }, { + name: "empty", + c: &SubscriptionSpec{}, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channel") + fe.Details = "the Subscription must reference a channel" + return fe + }(), + }, { + name: "missing name in Subscriber.Ref", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Namespace: namespace, + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + }, + }, + want: apis.ErrMissingField("subscriber.ref.name"), + }, { + name: "missing name in Subscriber.Ref", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Namespace: namespace, + Name: "", + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + }, + }, + want: apis.ErrMissingField("reply.ref.name"), + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Allowed, + }) + got := test.c.Validate(ctx) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("%s: validateChannel (-want, +got) = %v", test.name, diff) + } + }) + } +} + func TestSubscriptionImmutable(t *testing.T) { newChannel := getValidChannelRef() newChannel.Name = "newChannel" diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 63e3931a98f..6483e4c7eee 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -19,8 +19,11 @@ package subscription import ( "context" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/kref" "knative.dev/pkg/logging" "knative.dev/pkg/resolver" "knative.dev/pkg/tracker" @@ -44,12 +47,20 @@ func NewController( subscriptionInformer := subscription.Get(ctx) channelInformer := channel.Get(ctx) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) + r := &Reconciler{ dynamicClientSet: dynamicclient.Get(ctx), + kreferenceResolver: kref.NewKReferenceResolver(customresourcedefinition.Get(ctx).Lister()), subscriptionLister: subscriptionInformer.Lister(), channelLister: channelInformer.Lister(), } - impl := subscriptionreconciler.NewImpl(ctx, r) + impl := subscriptionreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) logging.FromContext(ctx).Info("Setting up event handlers") subscriptionInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) diff --git a/pkg/reconciler/subscription/controller_test.go b/pkg/reconciler/subscription/controller_test.go index 01d961ce950..befaed5bf14 100644 --- a/pkg/reconciler/subscription/controller_test.go +++ b/pkg/reconciler/subscription/controller_test.go @@ -19,20 +19,31 @@ package subscription import ( "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing/pkg/apis/feature" + // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" + _ "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" ) func TestNew(t *testing.T) { ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: feature.FlagsConfigName, + }, + }, + )) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index d264cdaeedd..529047b4419 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -31,12 +31,14 @@ import ( "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/kref" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" "knative.dev/pkg/tracker" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/messaging/v1" subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" listers "knative.dev/eventing/pkg/client/listers/messaging/v1" @@ -62,6 +64,9 @@ type Reconciler struct { // DynamicClientSet allows us to configure pluggable Build objects dynamicClientSet dynamic.Interface + // crdLister is used to resolve the ref version + kreferenceResolver *kref.KReferenceResolver + // listers index properties about resources subscriptionLister listers.SubscriptionLister channelLister listers.ChannelLister @@ -201,6 +206,21 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub if subscriber.Ref != nil { subscriber.Ref.Namespace = subscription.Namespace } + + // Resolve the group + if subscriber.Ref != nil && feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { + var err error + subscriber.Ref, err = r.kreferenceResolver.ResolveGroup(subscriber.Ref) + if err != nil { + logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref", + zap.Error(err), + zap.Any("subscriber", subscriber)) + subscription.Status.MarkReferencesNotResolved(subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %v", err) + return pkgreconciler.NewEvent(corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %w", err) + } + logging.FromContext(ctx).Debugw("Group resolved", zap.Any("spec.subscriber.ref", subscriber.Ref)) + } + subscriberURI, err := r.destinationResolver.URIFromDestinationV1(ctx, *subscriber, subscription) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve Subscriber", diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 5c887763cab..7f7c0ce3b90 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -22,8 +22,11 @@ import ( "fmt" "testing" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/utils/pointer" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/injection/clients/dynamicclient" + "knative.dev/pkg/kref" "knative.dev/pkg/network" eventingclient "knative.dev/eventing/pkg/client/injection/client" @@ -46,6 +49,7 @@ import ( messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" "knative.dev/eventing/pkg/duck" + eventingtesting "knative.dev/eventing/pkg/reconciler/testing" . "knative.dev/pkg/reconciler/testing" @@ -205,6 +209,85 @@ func TestAllCases(t *testing.T) { MarkSubscriptionReady, ), }}, + }, { + Name: "subscription goes ready without api version", + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + // IMC CRD + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, }, { Name: "channel does not exist", Objects: []runtime.Object{ @@ -1368,6 +1451,7 @@ func TestAllCases(t *testing.T) { channelLister: listers.GetMessagingChannelLister(), channelableTracker: duck.NewListableTracker(ctx, channelable.Get, func(types.NamespacedName) {}, 0), destinationResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), + kreferenceResolver: kref.NewKReferenceResolver(listers.GetCustomResourceDefinitionLister()), tracker: &FakeTracker{}, } return subscription.NewReconciler(ctx, logger, diff --git a/pkg/reconciler/testing/v1/factory.go b/pkg/reconciler/testing/v1/factory.go index d384bfd62b4..753d0644d69 100644 --- a/pkg/reconciler/testing/v1/factory.go +++ b/pkg/reconciler/testing/v1/factory.go @@ -58,7 +58,12 @@ func MakeFactory(ctor Ctor, unstructured bool, logger *zap.SugaredLogger) Factor return func(t *testing.T, r *TableRow) (controller.Reconciler, ActionRecorderList, EventList) { ls := NewListers(r.Objects) - ctx := context.Background() + var ctx context.Context + if r.Ctx != nil { + ctx = r.Ctx + } else { + ctx = context.Background() + } ctx = logging.WithLogger(ctx, logger) ctx, kubeClient := fakekubeclient.With(ctx, ls.GetKubeObjects()...) diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index b4dd4083a35..2d665b254f9 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -138,6 +138,19 @@ func WithSubscriptionSubscriberRef(gvk metav1.GroupVersionKind, name, namespace } } +func WithSubscriptionSubscriberRefUsingGroup(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Subscriber = &duckv1.Destination{ + Ref: &duckv1.KReference{ + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + } + } +} + func WithSubscriptionDeliveryRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { return func(s *v1.Subscription) { s.Spec.Delivery = &eventingduckv1.DeliverySpec{ diff --git a/test/experimental/config/features.yaml b/test/experimental/config/features.yaml index 36fc042ffd7..1138d90b544 100644 --- a/test/experimental/config/features.yaml +++ b/test/experimental/config/features.yaml @@ -22,3 +22,4 @@ metadata: knative.dev/config-propagation: original knative.dev/config-category: eventing data: + kreference-group: "enabled" diff --git a/test/experimental/features/kreference_group/channel_to_channel.go b/test/experimental/features/kreference_group/channel_to_channel.go new file mode 100644 index 00000000000..31668571d6e --- /dev/null +++ b/test/experimental/features/kreference_group/channel_to_channel.go @@ -0,0 +1,128 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kreference_group + +import ( + "context" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/test/rekt/resources/channel" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" +) + +// ChannelToChannel tests a scenario where the flow is source -> A -> B -> sink +// and A -> B subscription uses the KReference.Group field. +func ChannelToChannel() *feature.Feature { + f := feature.NewFeature() + + channelGVK := channel.GVK() + channelGVR := channel.GVR() + channelGroup := channelGVK.GroupKind().Group + channelAPIVersion, imcKind := channelGVK.ToAPIVersionAndKind() + + channelAName := feature.MakeRandomK8sName("channel-a") + channelBName := feature.MakeRandomK8sName("channel-b") + subAToBName := feature.MakeRandomK8sName("sub-a-b") + subBToSinkName := feature.MakeRandomK8sName("sub-b-sink") + sinkName := feature.MakeRandomK8sName("sink") + sourceName := feature.MakeRandomK8sName("source") + + ev := cetest.FullEvent() + + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + )) + + f.Setup("Install channel A", channel.Install(channelAName)) + f.Setup("Install channel B", channel.Install(channelBName)) + f.Setup("channel A is ready", channel.IsReady(channelAName)) + f.Setup("channel B is ready", channel.IsReady(channelBName)) + + f.Setup("Install channel A -> channel B subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, + &messagingv1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subAToBName, + Namespace: namespace, + }, + Spec: messagingv1.SubscriptionSpec{ + Channel: duckv1.KReference{ + APIVersion: channelAPIVersion, + Kind: imcKind, + Name: channelAName, + }, + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Group: channelGroup, + Kind: imcKind, + Name: channelBName, + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) + f.Setup("Install channel B -> sink subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, + &messagingv1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subBToSinkName, + Namespace: namespace, + }, + Spec: messagingv1.SubscriptionSpec{ + Channel: duckv1.KReference{ + APIVersion: channelAPIVersion, + Kind: imcKind, + Name: channelBName, + }, + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "Service", + Name: sinkName, + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + f.Setup("subscription A -> B is ready", subscription.IsReady(subAToBName)) + f.Setup("subscription B -> Sink is ready", subscription.IsReady(subBToSinkName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(channelGVR, channelAName), + eventshub.InputEvent(ev), + )) + + f.Assert("receive event", assert.OnStore(sinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + + return f +} diff --git a/test/experimental/kreference_group_test.go b/test/experimental/kreference_group_test.go new file mode 100644 index 00000000000..99254616243 --- /dev/null +++ b/test/experimental/kreference_group_test.go @@ -0,0 +1,43 @@ +// +build e2e + +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package experimental + +import ( + "testing" + + "knative.dev/eventing/test/experimental/features/kreference_group" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestChannelToChannel(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, kreference_group.ChannelToChannel()) +} diff --git a/vendor/knative.dev/pkg/kref/knative_reference.go b/vendor/knative.dev/pkg/kref/knative_reference.go new file mode 100644 index 00000000000..14f99c4e67f --- /dev/null +++ b/vendor/knative.dev/pkg/kref/knative_reference.go @@ -0,0 +1,77 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kref + +import ( + "fmt" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +// KReferenceResolver is an object that resolves the KReference.Group field +// Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086 +type KReferenceResolver struct { + crdLister apiextensionsv1lister.CustomResourceDefinitionLister +} + +// NewKReferenceResolver creates a new KReferenceResolver from a crdLister +// Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086 +func NewKReferenceResolver(crdLister apiextensionsv1lister.CustomResourceDefinitionLister) *KReferenceResolver { + return &KReferenceResolver{crdLister: crdLister} +} + +// ResolveGroup resolves the APIVersion of a KReference starting from the Group. +// In order to execute this method, you need RBAC to read the CRD of the Resource referred in this KReference. +// Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086 +func (resolver *KReferenceResolver) ResolveGroup(kr *duckv1.KReference) (*duckv1.KReference, error) { + if kr.Group == "" { + // Nothing to do here + return kr, nil + } + + kr = kr.DeepCopy() + + actualGvk := schema.GroupVersionKind{Group: kr.Group, Kind: kr.Kind} + pluralGvk, _ := meta.UnsafeGuessKindToResource(actualGvk) + crd, err := resolver.crdLister.Get(pluralGvk.GroupResource().String()) + if err != nil { + return nil, err + } + + actualGvk.Version, err = findCRDStorageVersion(crd) + if err != nil { + return nil, err + } + + kr.APIVersion, kr.Kind = actualGvk.ToAPIVersionAndKind() + + return kr, nil +} + +// This function runs under the assumption that there must be exactly one "storage" version +func findCRDStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) (string, error) { + for _, version := range crd.Spec.Versions { + if version.Storage { + return version.Name, nil + } + } + return "", fmt.Errorf("this CRD %s doesn't have a storage version! Kubernetes, you're drunk, go home", crd.Name) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c6c7e511408..e613d8a491d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1074,6 +1074,7 @@ knative.dev/pkg/injection/sharedmain knative.dev/pkg/kflag knative.dev/pkg/kmeta knative.dev/pkg/kmp +knative.dev/pkg/kref knative.dev/pkg/leaderelection knative.dev/pkg/leaderelection/chaosduck knative.dev/pkg/logging