From 32f1214179040c2eceeb9002070541bb4832807d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 23 Mar 2021 13:54:33 +0100 Subject: [PATCH 1/3] Subscription ref without api version prototype Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/controller.go | 2 + pkg/reconciler/subscription/subscription.go | 51 +++++++++- .../subscription/subscription_test.go | 92 ++++++++++++++++++- pkg/reconciler/testing/v1/unstructured.go | 3 + 4 files changed, 143 insertions(+), 5 deletions(-) diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 63e3931a98f..fddc1338464 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -19,6 +19,7 @@ package subscription import ( "context" + "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -46,6 +47,7 @@ func NewController( r := &Reconciler{ dynamicClientSet: dynamicclient.Get(ctx), + crdLister: customresourcedefinition.Get(ctx).Lister(), subscriptionLister: subscriptionInformer.Lister(), channelLister: channelInformer.Lister(), } diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 3c2a8c018f3..090b3105cd3 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -19,13 +19,18 @@ package subscription import ( "context" "fmt" + "strings" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" @@ -66,6 +71,9 @@ type Reconciler struct { // DynamicClientSet allows us to configure pluggable Build objects dynamicClientSet dynamic.Interface + // crdLister is used to resolve the ref version + crdLister apiextensionsv1lister.CustomResourceDefinitionLister + // listers index properties about resources subscriptionLister listers.SubscriptionLister channelLister listers.ChannelLister @@ -321,11 +329,18 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) { logging.FromContext(ctx).Infow("Getting channel", zap.Any("channel", sub.Spec.Channel)) + if err := r.resolveRefAPIVersion(&sub.Spec.Channel); err != nil { + logging.FromContext(ctx).Warnw("failed to resolve the ref", zap.Any("channel", sub.Spec.Channel), zap.Error(err)) + return nil, err + } + + logging.FromContext(ctx).Infow("Resolved channel ref", zap.Any("channel", sub.Spec.Channel)) + // 1. Track the channel pointed by subscription. // a. If channel is a Channel.messaging.knative.dev obj, err := r.trackAndFetchChannel(ctx, sub, sub.Spec.Channel) if err != nil { - logging.FromContext(ctx).Warnw("failed", zap.Any("channel", sub.Spec.Channel), zap.Error(err)) + logging.FromContext(ctx).Warnw("failed to track and fetch channel", zap.Any("channel", sub.Spec.Channel), zap.Error(err)) return nil, err } @@ -512,3 +527,37 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de } return } + +// TODO(slinkydeveloper) this can be refactored in a separate object that takes care of "ref resolution" +func (r *Reconciler) resolveRefAPIVersion(objRef *corev1.ObjectReference) error { + if objRef.APIVersion == "" || strings.Contains(objRef.APIVersion, "/") { + // Either it's Core v1 or the version is specified manually + return nil + } + + actualGvk := schema.GroupVersionKind{Group: objRef.APIVersion, Kind: objRef.Kind} + pluralGvk, _ := meta.UnsafeGuessKindToResource(actualGvk) + crd, err := r.crdLister.Get(pluralGvk.GroupResource().String()) + if err != nil { + return err + } + + actualGvk.Version, err = findCRDStorageVersion(crd) + if err != nil { + return err + } + + objRef.SetGroupVersionKind(actualGvk) + + return nil +} + +// This function runs under the assumption that there must be exactly one "storage" version +func findCRDStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) (string, error) { + for _, version := range crd.Spec.Versions { + if version.Storage { + return version.Name, nil + } + } + return "", fmt.Errorf("this CRD %s doesn't have a storage version! Kubernetes, you're drunk, go home", crd.Name) +} diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index b1b96425b54..e18b459f2d3 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -26,10 +26,6 @@ import ( "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/network" - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" - eventingclient "knative.dev/eventing/pkg/client/injection/client" - "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" - corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,6 +42,12 @@ import ( logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/resolver" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/apis/messaging" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" + pkgtesting "knative.dev/eventing/pkg/reconciler/testing" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" @@ -117,6 +119,11 @@ var ( Kind: "InMemoryChannel", } + imcGVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Kind: "InMemoryChannel", + } + channelV1GVK = metav1.GroupVersionKind{ Group: "messaging.knative.dev", Version: "v1", @@ -217,6 +224,82 @@ func TestAllCases(t *testing.T) { MarkSubscriptionReady, ), }}, + }, { + Name: "subscription goes ready without api version", + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcGVK, channelName), + WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + // IMC CRD + pkgtesting.NewCustomResourceDefinition(messaging.InMemoryChannelsResource.String(), + pkgtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcGVK, channelName), + WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, }, { Name: "channel does not exist", Objects: []runtime.Object{ @@ -1374,6 +1457,7 @@ func TestAllCases(t *testing.T) { ctx = addressable.WithDuck(ctx) r := &Reconciler{ dynamicClientSet: dynamicclient.Get(ctx), + crdLister: listers.GetCustomResourceDefinitionLister(), subscriptionLister: listers.GetSubscriptionLister(), channelLister: listers.GetMessagingChannelLister(), channelableTracker: duck.NewListableTracker(ctx, channelable.Get, func(types.NamespacedName) {}, 0), diff --git a/pkg/reconciler/testing/v1/unstructured.go b/pkg/reconciler/testing/v1/unstructured.go index 60b3f6d48d5..bd6bcaf8283 100644 --- a/pkg/reconciler/testing/v1/unstructured.go +++ b/pkg/reconciler/testing/v1/unstructured.go @@ -60,6 +60,9 @@ func WithUnstructuredAddressable(hostname string) UnstructuredOption { } func apiVersion(gvk metav1.GroupVersionKind) string { + if gvk.Version == "" { + return gvk.Group + } groupVersion := gvk.Version if gvk.Group != "" { groupVersion = gvk.Group + "/" + gvk.Version From c07e16abae4bdc16e692189f48ce6dba1003d2aa Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 23 Mar 2021 14:00:42 +0100 Subject: [PATCH 2/3] Style Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/controller.go | 2 +- pkg/reconciler/subscription/subscription_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index fddc1338464..65737db8939 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -47,7 +47,7 @@ func NewController( r := &Reconciler{ dynamicClientSet: dynamicclient.Get(ctx), - crdLister: customresourcedefinition.Get(ctx).Lister(), + crdLister: customresourcedefinition.Get(ctx).Lister(), subscriptionLister: subscriptionInformer.Lister(), channelLister: channelInformer.Lister(), } diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index e18b459f2d3..99188278297 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -120,8 +120,8 @@ var ( } imcGVK = metav1.GroupVersionKind{ - Group: "messaging.knative.dev", - Kind: "InMemoryChannel", + Group: "messaging.knative.dev", + Kind: "InMemoryChannel", } channelV1GVK = metav1.GroupVersionKind{ @@ -276,10 +276,10 @@ func TestAllCases(t *testing.T) { // IMC CRD pkgtesting.NewCustomResourceDefinition(messaging.InMemoryChannelsResource.String(), pkgtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ - Name: "v1beta1", + Name: "v1beta1", Storage: false, }, { - Name: "v1", + Name: "v1", Storage: true, }}), ), From 34dd07482a1efd34259001f4066af7a3feb48cc5 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 23 Mar 2021 14:45:50 +0100 Subject: [PATCH 3/3] Add fake injection Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/controller_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/subscription/controller_test.go b/pkg/reconciler/subscription/controller_test.go index 01d961ce950..f1e357b2016 100644 --- a/pkg/reconciler/subscription/controller_test.go +++ b/pkg/reconciler/subscription/controller_test.go @@ -23,10 +23,12 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers + _ "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition/fake" + _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" + _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" - _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" ) func TestNew(t *testing.T) {